千问HappyHorse视频去水印解析代码Python+cloudflare woker 部署js代码

mimo-v2.5-pro写的,有些小瑕疵,问题不大。感兴趣的自己改改。 演示: 千问视频无水印下载 js版: /** * 通义千问 AI 视频无水印下载 - Cloudflare Worker * * 支持两种分享链接: * 类型1: https://www.qianwen.com/share/c...
千问HappyHorse视频去水印解析代码Python+cloudflare woker 部署js代码
千问HappyHorse视频去水印解析代码Python+cloudflare woker 部署js代码

mimo-v2.5-pro写的,有些小瑕疵,问题不大。感兴趣的自己改改。

演示: 千问视频无水印下载

js版:

/**
 * 通义千问 AI 视频无水印下载 - Cloudflare Worker
 *
 * 支持两种分享链接:
 *   类型1: https://www.qianwen.com/share/chat/{id}
 *   类型2: https://activity.qianwen.com/r/ai-studio-mobile/qwen-external-share?shareId=xxx&authorId=xxx
 *
 * 路由:
 *   GET  /           → 前端 UI
 *   POST /api/parse  → 解析分享链接
 *   GET  /api/proxy   → 代理下载
 */

export default {
  async fetch(request, env) {
    const url = new URL(request.url);

    if (request.method === "GET" && url.pathname === "/") {
      return new Response(HTML_PAGE, {
        headers: { "Content-Type": "text/html; charset=utf-8" },
      });
    }

    if (request.method === "POST" && url.pathname === "/api/parse") {
      return handleParse(request);
    }

    if (request.method === "GET" && url.pathname === "/api/proxy") {
      return handleProxy(url);
    }

    // CORS preflight
    if (request.method === "OPTIONS") {
      return new Response(null, {
        headers: {
          "Access-Control-Allow-Origin": "*",
          "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
          "Access-Control-Allow-Headers": "Content-Type",
        },
      });
    }

    return new Response("Not Found", { status: 404 });
  },
};

// ── 解析入口 ──────────────────────────────────────────────

async function handleParse(request) {
  try {
    const { url } = await request.json();
    const linkInfo = detectShareType(url);
    let result;

    if (linkInfo.type === "qianwen_chat") {
      result = await fetchType1(linkInfo.shareId);
    } else {
      result = await fetchType2(linkInfo.shareId, linkInfo.authorId);
    }

    return jsonResponse(result);
  } catch (e) {
    return jsonResponse({ error: e.message }, 500);
  }
}

// ── URL 识别 ──────────────────────────────────────────────

function detectShareType(url) {
  // 类型1: www.qianwen.com/share/chat/{id}
  const m1 = url.match(/\/share\/chat\/([a-f0-9]+)/);
  if (m1) return { type: "qianwen_chat", shareId: m1[1] };

  // 类型2: activity.qianwen.com/...?shareId=xxx&authorId=xxx
  if (url.includes("activity.qianwen.com")) {
    const u = new URL(url);
    const shareId = u.searchParams.get("shareId");
    const authorId = u.searchParams.get("authorId") || "";
    if (shareId) return { type: "activity_share", shareId, authorId };
  }

  throw new Error("无法识别的链接格式,请粘贴千问分享链接");
}

// ── 类型1: chat2-api ──────────────────────────────────────

async function fetchType1(shareId) {
  const resp = await fetch(
    "https://chat2-api.qianwen.com/api/v1/share/info?pr=qwen&fr=web",
    {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        "User-Agent": UA,
        Referer: "https://www.qianwen.com/",
      },
      body: JSON.stringify({ share_id: shareId, biz_id: "ai_qwen" }),
    }
  );

  if (!resp.ok) throw new Error(`chat2-api 返回 ${resp.status}`);
  const data = await resp.json();
  if (data.code !== 0) throw new Error(data.msg || "API 错误");

  return parseType1(data.data);
}

function parseType1(apiData) {
  const title = apiData.title || "未知";
  const records = apiData.session?.record_list || [];

  for (const record of records) {
    for (const msg of record.response_messages || []) {
      for (const item of msg.meta_data?.multi_load || []) {
        if (item.type !== "ai_generate_video") continue;

        const content = item.content || {};
        const infos = content.resource_infos || [];
        const layout = (content.layout_list || [])[0] || {};
        const displayRefs = layout.video || [];
        const downloadRefs = layout.download_video || [];
        const coverRefs = layout.cover || [];
        const resMap = {};
        for (const r of infos) resMap[r.refer_id] = r;

        const extra = content.extra_info?.content?.extra || {};
        const all = [];

        // video = 无水印
        for (const id of displayRefs) {
          if (resMap[id]?.url?.includes(".mp4"))
            all.push({ ...resMap[id], label: "无水印", type: "video" });
        }
        // download_video = 有水印
        for (const id of downloadRefs) {
          if (resMap[id]?.url?.includes(".mp4"))
            all.push({ ...resMap[id], label: "有水印", type: "video" });
        }
        // 封面
        for (const id of coverRefs) {
          if (resMap[id]) all.push({ ...resMap[id], label: "封面", type: "image" });
        }
        // 其他
        const known = new Set([...displayRefs, ...downloadRefs, ...coverRefs]);
        for (const r of infos) {
          if (!known.has(r.refer_id))
            all.push({ ...r, label: r.url?.includes(".png") ? "原图" : "其他", type: "image" });
        }

        return {
          source: "qianwen_chat",
          title,
          model: extra.model_name || content.generate_model_name || "",
          scene: content.scene || "",
          duration: content.duration || 0,
          resolution: content.ratio || "",
          prompt: extra.query || content.prompt || "",
          allResources: all,
        };
      }
    }
  }
  throw new Error("未找到视频资源");
}

// ── 类型2: zaodian ────────────────────────────────────────

async function fetchType2(shareId, authorId) {
  const resp = await fetch(
    "https://qwen-api.zaodian.com/api/v1/share/get",
    {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        "User-Agent": UA,
        Referer: "https://activity.qianwen.com/",
      },
      body: JSON.stringify({
        shareId,
        authorId,
        chid: "null",
        product: "ai_studio",
      }),
    }
  );

  if (!resp.ok) throw new Error(`zaodian-api 返回 ${resp.status}`);
  const data = await resp.json();
  if (data.code !== 0) throw new Error(data.msg || "API 错误");

  return parseType2(data.data);
}

function parseType2(data) {
  const title = data.title || data.shareTitle || "未知";
  const prompt = data.content?.prompt || "";
  const scene = data.objectType || "";
  const play = data.playInfo || {};
  const img = data.image || {};
  const all = [];

  if (play.url) all.push({ refer_id: "video_main", url: play.url, label: "视频(无水印)", type: "video" });
  if (play.playUrl) all.push({ refer_id: "video_play", url: play.playUrl, label: "视频(预览)", type: "video" });
  if (play.downloadUrl) all.push({ refer_id: "video_download", url: play.downloadUrl, label: "视频(有水印)", type: "video" });
  if (img.url) all.push({ refer_id: "cover", url: img.url, width: img.width, height: img.height, label: "封面", type: "image" });

  return {
    source: "activity_share",
    title,
    model: "",
    scene,
    duration: play.videoTotalTime || 0,
    resolution: "",
    prompt,
    allResources: all,
  };
}

// ── 代理下载 ──────────────────────────────────────────────

const UA = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36";

const ALLOWED_DOMAINS = [
  "workspace-zb-cdn.quark.cn",
  "quark-aistudio-cdn.quark.cn",
  "img.alicdn.com",
];

async function handleProxy(url) {
  const target = url.searchParams.get("u");
  if (!target) return new Response("Missing u param", { status: 400 });

  const parsed = new URL(target);
  if (!ALLOWED_DOMAINS.some((d) => parsed.hostname.endsWith(d))) {
    return new Response("Domain not allowed", { status: 403 });
  }

  const upstream = await fetch(target, {
    headers: { "User-Agent": UA, Referer: "https://www.qianwen.com/" },
  });

  if (!upstream.ok) return new Response(`Upstream ${upstream.status}`, { status: upstream.status });

  const headers = new Headers();
  headers.set("Content-Type", upstream.headers.get("Content-Type") || "application/octet-stream");
  headers.set("Content-Disposition", 'attachment');
  headers.set("Cache-Control", "public, max-age=3600");
  headers.set("Access-Control-Allow-Origin", "*");

  return new Response(upstream.body, { headers });
}

// ── 工具 ──────────────────────────────────────────────────

function jsonResponse(data, status = 200) {
  return new Response(JSON.stringify(data), {
    status,
    headers: {
      "Content-Type": "application/json",
      "Access-Control-Allow-Origin": "*",
    },
  });
}

// ── 前端 HTML ─────────────────────────────────────────────

const HTML_PAGE = `<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8"/>
<meta name="viewport" content="width=device-width,initial-scale=1"/>
<title>千问视频无水印下载</title>
<style>
*{margin:0;padding:0;box-sizing:border-box}
body{font-family:-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,sans-serif;
  background:#0a0a0a;color:#e0e0e0;min-height:100vh;display:flex;flex-direction:column;align-items:center}
.container{max-width:720px;width:100%;padding:24px 16px}
h1{font-size:1.6rem;font-weight:700;text-align:center;margin-bottom:4px;
  background:linear-gradient(135deg,#6366f1,#a855f7,#ec4899);-webkit-background-clip:text;
  -webkit-text-fill-color:transparent}
.subtitle{text-align:center;color:#888;font-size:.85rem;margin-bottom:28px}
.input-group{display:flex;gap:8px;margin-bottom:20px}
input[type=text]{flex:1;padding:12px 16px;border-radius:10px;border:1px solid #2a2a2a;
  background:#141414;color:#fff;font-size:.95rem;outline:none;transition:border .2s}
input[type=text]:focus{border-color:#6366f1}
input[type=text]::placeholder{color:#555}
button{padding:12px 24px;border-radius:10px;border:none;font-weight:600;font-size:.95rem;
  cursor:pointer;transition:all .2s}
.btn-primary{background:linear-gradient(135deg,#6366f1,#a855f7);color:#fff}
.btn-primary:hover{opacity:.9;transform:translateY(-1px)}
.btn-primary:disabled{opacity:.4;cursor:not-allowed;transform:none}
.btn-download{background:linear-gradient(135deg,#10b981,#06b6d4);color:#fff;
  padding:10px 20px;font-size:.85rem}
.btn-download:hover{opacity:.9}
.btn-download:disabled{opacity:.4;cursor:not-allowed}
.examples{margin-bottom:20px;padding:12px 14px;background:#111;border:1px solid #1a1a1a;
  border-radius:10px;font-size:.78rem;color:#666;line-height:1.8}
.examples b{color:#888}
.examples code{color:#a78bfa;background:#1a1028;padding:1px 5px;border-radius:4px;font-size:.76rem}
.status{text-align:center;padding:20px;color:#888;font-size:.9rem}
.spinner{display:inline-block;width:20px;height:20px;border:2px solid #333;
  border-top-color:#6366f1;border-radius:50%;animation:spin .6s linear infinite;
  vertical-align:middle;margin-right:8px}
@keyframes spin{to{transform:rotate(360deg)}}
.error{color:#f87171;text-align:center;padding:16px;background:#1a0a0a;
  border-radius:10px;margin-bottom:16px;border:1px solid #3a1a1a}
.info-card{background:#141414;border:1px solid #2a2a2a;border-radius:14px;
  padding:20px;margin-bottom:20px}
.info-card h2{font-size:1.1rem;margin-bottom:12px;color:#fff}
.info-row{display:flex;gap:8px;margin-bottom:6px;font-size:.85rem}
.info-label{color:#888;min-width:60px}
.info-value{color:#ccc;word-break:break-all}
.source-tag{display:inline-block;padding:2px 8px;border-radius:4px;font-size:.7rem;
  font-weight:600;margin-left:8px;vertical-align:middle}
.source-qwen{background:#1a1a2a;color:#818cf8;border:1px solid #2a2a4a}
.source-activity{background:#1a2a1a;color:#34d399;border:1px solid #2a4a2a}
.prompt-text{color:#aaa;font-size:.82rem;line-height:1.5;margin-top:8px;
  padding:10px;background:#0f0f0f;border-radius:8px;max-height:80px;overflow-y:auto}
.resources{margin-top:16px}
.resources h3{font-size:.95rem;margin-bottom:12px;color:#ddd}
.resource-item{display:flex;align-items:center;gap:12px;padding:12px 14px;
  background:#141414;border:1px solid #2a2a2a;border-radius:10px;margin-bottom:8px;
  transition:border-color .2s;cursor:pointer}
.resource-item:hover{border-color:#3a3a3a}
.resource-item.selected{border-color:#6366f1;background:#14142a}
.resource-item input[type=checkbox]{width:18px;height:18px;accent-color:#6366f1;
  cursor:pointer;flex-shrink:0}
.resource-thumb{width:56px;height:56px;border-radius:8px;object-fit:cover;
  background:#1a1a1a;flex-shrink:0}
.resource-info{flex:1;min-width:0}
.resource-name{font-size:.85rem;color:#ddd;font-weight:500;
  overflow:hidden;text-overflow:ellipsis;white-space:nowrap}
.resource-meta{font-size:.75rem;color:#888;margin-top:2px}
.tag{display:inline-block;padding:2px 8px;border-radius:4px;font-size:.7rem;
  font-weight:600;margin-left:6px}
.tag-no-wm{background:#0a2a1a;color:#34d399;border:1px solid #1a4a2a}
.tag-wm{background:#2a1a0a;color:#fbbf24;border:1px solid #4a2a1a}
.tag-dl{background:#0a1a2a;color:#60a5fa;border:1px solid #1a2a4a}
.tag-play{background:#2a0a2a;color:#c084fc;border:1px solid #4a1a4a}
.tag-cover{background:#1a1a2a;color:#818cf8;border:1px solid #2a2a4a}
.tag-other{background:#1a1a1a;color:#888;border:1px solid #2a2a2a}
.action-bar{display:flex;align-items:center;justify-content:space-between;
  margin-top:16px;padding:12px 0;border-top:1px solid #2a2a2a}
.select-all{display:flex;align-items:center;gap:8px;font-size:.85rem;color:#aaa;cursor:pointer}
.select-all input{accent-color:#6366f1}
.footer{text-align:center;color:#444;font-size:.75rem;margin-top:auto;padding:20px 0}
@media(max-width:480px){
  .container{padding:16px 12px}
  h1{font-size:1.3rem}
  .input-group{flex-direction:column}
  .btn-primary{width:100%}
}
</style>
</head>
<body>
<div class="container">
  <h1>🎬 千问视频无水印下载</h1>
  <p class="subtitle">支持两种千问分享链接,解析并下载无水印视频</p>

  <div class="examples">
    <b>支持格式:</b><br/>
    ① <code>https://www.qianwen.com/share/chat/xxx</code><br/>
    ② <code>https://activity.qianwen.com/r/ai-studio-mobile/qwen-external-share?shareId=xxx&authorId=xxx</code>
  </div>

  <div class="input-group">
    <input type="text" id="urlInput"
      placeholder="粘贴千问分享链接..."
      onkeydown="if(event.key==='Enter')parseUrl()"/>
    <button class="btn-primary" id="parseBtn" onclick="parseUrl()">解析</button>
  </div>

  <div id="errorBox" class="error" style="display:none"></div>
  <div id="loadingBox" class="status" style="display:none">
    <span class="spinner"></span>正在解析...
  </div>
  <div id="resultBox" style="display:none"></div>
</div>

<div class="footer">仅供学习交流 · 资源版权归原作者所有</div>

<script>
const BASE = location.origin;

async function parseUrl() {
  const input = document.getElementById("urlInput");
  const btn = document.getElementById("parseBtn");
  const errBox = document.getElementById("errorBox");
  const loadBox = document.getElementById("loadingBox");
  const resBox = document.getElementById("resultBox");

  const url = input.value.trim();
  if (!url) { showErr("请输入链接"); return; }

  errBox.style.display = "none";
  resBox.style.display = "none";
  loadBox.style.display = "block";
  btn.disabled = true;

  try {
    const resp = await fetch(BASE + "/api/parse", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ url }),
    });
    const data = await resp.json();
    if (data.error) { showErr(data.error); return; }
    renderResult(data);
  } catch (e) {
    showErr("请求失败: " + e.message);
  } finally {
    loadBox.style.display = "none";
    btn.disabled = false;
  }
}

function showErr(msg) {
  const box = document.getElementById("errorBox");
  box.textContent = msg;
  box.style.display = "block";
  document.getElementById("loadingBox").style.display = "none";
}

function esc(s) {
  if (!s) return "";
  const d = document.createElement("div");
  d.textContent = s;
  return d.innerHTML;
}

function renderResult(data) {
  const box = document.getElementById("resultBox");
  const all = data.allResources || [];

  let html = "";

  // 信息卡片
  const srcTag = data.source === "activity_share"
    ? '<span class="source-tag source-activity">Activity</span>'
    : '<span class="source-tag source-qwen">千问</span>';

  html += '<div class="info-card">';
  html += '<h2>' + esc(data.title) + srcTag + '</h2>';
  if (data.model) html += row("模型", data.model);
  if (data.scene) html += row("场景", data.scene);
  if (data.duration) html += row("时长", data.duration + "秒");
  if (data.resolution) html += row("分辨率", data.resolution);
  if (data.prompt) html += '<div class="prompt-text">💬 ' + esc(data.prompt) + '</div>';
  html += '</div>';

  // 资源列表
  html += '<div class="resources">';
  html += '<h3>📦 可下载资源 (' + all.length + ')</h3>';

  const tagMap = {
    "无水印": "tag-no-wm", "有水印": "tag-wm",
    "视频(有水印)": "tag-dl", "视频(预览)": "tag-play",
    "视频(无水印)": "tag-play", "封面": "tag-cover", "原图": "tag-cover"
  };

  all.forEach((r, i) => {
    const tc = tagMap[r.label] || "tag-other";
    const ext = r.type === "video" ? "MP4" : r.url?.includes(".png") ? "PNG" : "JPG";
    const sz = r.width && r.height ? r.width + "×" + r.height : "";
    const thumb = BASE + "/api/proxy?u=" + encodeURIComponent(
      r.url?.includes(".mp4") ? r.url.split("?")[0] + "?x-oss-process=video/snapshot,t_1000,f_jpg" : r.url
    );
    const autoCheck = (r.label === "无水印" || r.label === "视频(有水印)" || r.label === "视频(无水印)") ? " checked" : "";

    html += '<label class="resource-item" id="item-' + i + '">';
    html += '<input type="checkbox" data-idx="' + i + '" onchange="updSel()"' + autoCheck + '/>';
    html += '<img class="resource-thumb" src="' + thumb + '" onerror="this.style.display=\\'none\\'" loading="lazy"/>';
    html += '<div class="resource-info">';
    html += '<div class="resource-name">' + esc(r.refer_id) + " · " + ext
      + ' <span class="tag ' + tc + '">' + esc(r.label) + '</span></div>';
    html += '<div class="resource-meta">' + sz + (r.md5 ? " · " + r.md5.slice(0,8) : "") + '</div>';
    html += '</div></label>';
  });

  html += '</div>';

  // 操作栏
  html += '<div class="action-bar">';
  html += '<label class="select-all"><input type="checkbox" id="selAll" onchange="toggleAll()"/> 全选</label>';
  html += '<button class="btn-download" id="dlBtn" onclick="dlSel()" disabled>下载选中 (0)</button>';
  html += '</div>';

  box.innerHTML = html;
  box.style.display = "block";
  window._res = all;
  updSel();
}

function row(label, value) {
  return '<div class="info-row"><span class="info-label">' + label + '</span><span class="info-value">' + esc(value) + '</span></div>';
}

function toggleAll() {
  const ck = document.getElementById("selAll").checked;
  document.querySelectorAll('.resource-item input[type=checkbox]').forEach(c => c.checked = ck);
  updSel();
}

function updSel() {
  const cbs = document.querySelectorAll('.resource-item input[type=checkbox]');
  let n = 0;
  cbs.forEach(c => {
    const el = document.getElementById("item-" + c.dataset.idx);
    if (c.checked) { n++; el.classList.add("selected"); } else { el.classList.remove("selected"); }
  });
  const btn = document.getElementById("dlBtn");
  btn.textContent = "下载选中 (" + n + ")";
  btn.disabled = n === 0;
  document.getElementById("selAll").checked = n === cbs.length && cbs.length > 0;
}

async function dlSel() {
  const resources = window._res || [];
  const cbs = document.querySelectorAll('.resource-item input[type=checkbox]:checked');
  for (const cb of cbs) {
    const r = resources[cb.dataset.idx];
    if (!r?.url) continue;
    const ext = r.type === "video" ? "mp4" : (r.url?.match(/\\.([^?.]+)/)?.[1] || "jpg");
    const proxyUrl = BASE + "/api/proxy?u=" + encodeURIComponent(r.url);
    const a = document.createElement("a");
    a.href = proxyUrl;
    a.download = "qianwen_" + r.refer_id + "." + ext;
    a.target = "_blank";
    document.body.appendChild(a);
    a.click();
    document.body.removeChild(a);
    await new Promise(r => setTimeout(r, 500));
  }
}
</script>
</body>
</html>`;

Python版:

#!/usr/bin/env python3
"""
通义千问(Qianwen) AI 生成视频无水印下载工具

支持两种分享链接:
  类型1: https://www.qianwen.com/share/chat/{id}
  类型2: https://activity.qianwen.com/r/ai-studio-mobile/qwen-external-share?shareId=xxx&authorId=xxx

用法:
  python qianwen_video_downloader.py <share_url> [-o output.mp4] [--all] [--info]
"""

import re
import json
import sys
import argparse
from pathlib import Path
from urllib.parse import urlparse, parse_qs, unquote

try:
    import requests
except ImportError:
    print("📦 正在安装 requests...")
    import subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "requests", "-q"])
    import requests

# ── 常量 ─────────────────────────────────────────────────────────

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0.0.0 Safari/537.36"
    ),
    "Referer": "https://www.qianwen.com/",
    "Content-Type": "application/json",
}

# 允许代理下载的 CDN 域名白名单
ALLOWED_CDN_DOMAINS = [
    "workspace-zb-cdn.quark.cn",
    "quark-aistudio-cdn.quark.cn",
    "img.alicdn.com",
]


# ── URL 识别 ────────────────────────────────────────────────────

def detect_share_type(url: str) -> dict:
    """
    识别分享链接类型,返回:
    {"type": "qianwen_chat", "share_id": "xxx"}
    或
    {"type": "activity_share", "share_id": "xxx", "author_id": "xxx"}
    """
    parsed = urlparse(url)

    # 类型1: www.qianwen.com/share/chat/{id}
    m = re.search(r'/share/chat/([a-f0-9]+)', url)
    if m:
        return {"type": "qianwen_chat", "share_id": m.group(1)}

    # 类型2: activity.qianwen.com/...?shareId=xxx&authorId=xxx
    if "activity.qianwen.com" in parsed.hostname:
        qs = parse_qs(parsed.query)
        share_id = qs.get("shareId", [None])[0]
        author_id = qs.get("authorId", [None])[0]
        if share_id:
            return {
                "type": "activity_share",
                "share_id": share_id,
                "author_id": author_id or "",
            }

    raise ValueError(f"无法识别的链接格式: {url}")


# ── API 调用 ────────────────────────────────────────────────────

def fetch_type1(share_id: str) -> dict:
    """类型1: chat2-api.qianwen.com"""
    print(f"📡 类型1 API (chat2-api) share_id={share_id[:16]}...")
    resp = requests.post(
        "https://chat2-api.qianwen.com/api/v1/share/info",
        headers=HEADERS,
        json={"share_id": share_id, "biz_id": "ai_qwen"},
        timeout=30,
    )
    resp.raise_for_status()
    data = resp.json()
    if data.get("code") != 0:
        raise ValueError(f"API 错误: {data.get('msg')}")
    return parse_type1_data(data["data"])


def fetch_type2(share_id: str, author_id: str) -> dict:
    """类型2: qwen-api.zaodian.com"""
    print(f"📡 类型2 API (zaodian) share_id={share_id[:16]}...")
    resp = requests.post(
        "https://qwen-api.zaodian.com/api/v1/share/get",
        headers={**HEADERS, "Referer": "https://activity.qianwen.com/"},
        json={
            "shareId": share_id,
            "authorId": author_id,
            "chid": "null",
            "product": "ai_studio",
        },
        timeout=30,
    )
    resp.raise_for_status()
    data = resp.json()
    if data.get("code") != 0:
        raise ValueError(f"API 错误: {data.get('msg')}")
    return parse_type2_data(data["data"])


# ── 数据解析 ────────────────────────────────────────────────────

def parse_type1_data(api_data: dict) -> dict:
    """解析类型1 API 响应 (chat2-api)"""
    title = api_data.get("title", "未知")
    records = api_data.get("session", {}).get("record_list", [])

    for record in records:
        for msg in record.get("response_messages", []):
            meta = msg.get("meta_data", {})
            for item in meta.get("multi_load", []):
                if item.get("type") != "ai_generate_video":
                    continue

                content = item.get("content", {})
                resource_infos = content.get("resource_infos", [])
                layout_list = content.get("layout_list", [])
                layout = layout_list[0] if layout_list else {}

                display_refs = layout.get("video", [])
                download_refs = layout.get("download_video", [])
                cover_refs = layout.get("cover", [])

                resources = {r["refer_id"]: r for r in resource_infos}
                extra = content.get("extra_info", {}).get("content", {}).get("extra", {})

                all_res = []

                # video = 无水印
                for ref_id in display_refs:
                    if ref_id in resources and ".mp4" in resources[ref_id].get("url", ""):
                        all_res.append({**resources[ref_id], "label": "无水印", "type": "video"})

                # download_video = 有水印
                for ref_id in download_refs:
                    if ref_id in resources and ".mp4" in resources[ref_id].get("url", ""):
                        all_res.append({**resources[ref_id], "label": "有水印", "type": "video"})

                # 封面
                for ref_id in cover_refs:
                    if ref_id in resources:
                        all_res.append({**resources[ref_id], "label": "封面", "type": "image"})

                # 其他
                known = set(display_refs + download_refs + cover_refs)
                for ref_id, res in resources.items():
                    if ref_id not in known:
                        ext = "原图" if ".png" in res.get("url", "") else "其他"
                        all_res.append({**res, "label": ext, "type": "image"})

                return {
                    "source": "qianwen_chat",
                    "title": title,
                    "model": extra.get("model_name", content.get("generate_model_name", "")),
                    "scene": content.get("scene", ""),
                    "duration": content.get("duration", 0),
                    "resolution": content.get("ratio", ""),
                    "prompt": extra.get("query", content.get("prompt", "")),
                    "allResources": all_res,
                }

    raise ValueError("未找到视频资源")


def parse_type2_data(data: dict) -> dict:
    """解析类型2 API 响应 (zaodian)"""
    title = data.get("title", data.get("shareTitle", "未知"))
    prompt = data.get("content", {}).get("prompt", "")
    scene = data.get("objectType", "")

    all_res = []
    play_info = data.get("playInfo") or {}
    image_info = data.get("image") or {}

    # 视频资源
    if play_info.get("url"):
        all_res.append({
            "refer_id": "video_main",
            "url": play_info["url"],
            "label": "视频(主)",
            "type": "video",
        })
    if play_info.get("playUrl"):
        all_res.append({
            "refer_id": "video_play",
            "url": play_info["playUrl"],
            "label": "视频(播放)",
            "type": "video",
        })
    if play_info.get("downloadUrl"):
        all_res.append({
            "refer_id": "video_download",
            "url": play_info["downloadUrl"],
            "label": "视频(下载)",
            "type": "video",
        })

    # 封面图
    if image_info.get("url"):
        all_res.append({
            "refer_id": "cover",
            "url": image_info["url"],
            "width": image_info.get("width"),
            "height": image_info.get("height"),
            "label": "封面",
            "type": "image",
        })

    return {
        "source": "activity_share",
        "title": title,
        "model": "",
        "scene": scene,
        "duration": play_info.get("videoTotalTime") or 0,
        "resolution": "",
        "prompt": prompt,
        "allResources": all_res,
    }


# ── 下载 ────────────────────────────────────────────────────────

def download_file(url: str, output_path: str, desc: str = "文件") -> str:
    """下载文件,带进度条"""
    print(f"⬇️  正在下载{desc}...")

    resp = requests.get(url, headers={"User-Agent": HEADERS["User-Agent"]}, stream=True, timeout=120)
    resp.raise_for_status()

    total = int(resp.headers.get("content-length", 0))
    downloaded = 0

    with open(output_path, "wb") as f:
        for chunk in resp.iter_content(chunk_size=8192):
            f.write(chunk)
            downloaded += len(chunk)
            if total > 0:
                pct = downloaded / total * 100
                filled = int(pct // 2)
                bar = "█" * filled + "░" * (50 - filled)
                print(f"\r   [{bar}] {pct:.1f}% ({downloaded:,}/{total:,} bytes)", end="", flush=True)

    print()
    size_mb = Path(output_path).stat().st_size / (1024 * 1024)
    print(f"✅ 已保存: {output_path} ({size_mb:.2f} MB)")
    return output_path


def generate_filename(info: dict, suffix: str = "", is_video: bool = True) -> str:
    parts = ["qianwen"]
    if info.get("model"):
        parts.append(info["model"].replace(" ", "_").replace("/", "_"))
    if suffix:
        parts.append(suffix)
    ext = "mp4" if is_video else "jpg"
    return "_".join(parts) + f".{ext}"


def print_info(info: dict):
    print(f"\n{'='*60}")
    print(f"🎬 视频信息 [{info['source']}]")
    print(f"{'='*60}")
    print(f"   标题:     {info['title']}")
    if info.get("model"):
        print(f"   模型:     {info['model']}")
    if info.get("scene"):
        print(f"   场景:     {info['scene']}")
    if info.get("duration"):
        print(f"   时长:     {info['duration']}秒")
    if info.get("resolution"):
        print(f"   分辨率:   {info['resolution']}")
    if info.get("prompt"):
        prompt = info["prompt"]
        if len(prompt) > 120:
            prompt = prompt[:120] + "..."
        print(f"   提示词:   {prompt}")

    print(f"\n📦 资源列表:")
    for i, res in enumerate(info["allResources"]):
        w = res.get("width", "?")
        h = res.get("height", "?")
        url = res.get("url", "")
        ext = "MP4" if (res.get("type") == "video" or ".mp4" in url) else "PNG" if ".png" in url else "JPG"
        size_str = f"{w}×{h}" if w != "?" else ""

        tag_class = {"无水印": "✅", "有水印": "⚠️", "封面": "🖼️", "原图": "🖼️", "视频(主)": "🎬", "视频(播放)": "▶️", "视频(下载)": "📥"}.get(res["label"], "📎")
        print(f"   [{i}] {tag_class} {res['label']} · {ext} {size_str}")


# ── 主流程 ───────────────────────────────────────────────────────

def main():
    parser = argparse.ArgumentParser(
        description="通义千问 AI 视频无水印下载工具 (支持两种分享链接)",
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument("url", help="千问分享链接 (支持 qianwen.com 和 activity.qianwen.com)")
    parser.add_argument("-o", "--output", help="输出文件名")
    parser.add_argument("--all", action="store_true", help="下载所有资源")
    parser.add_argument("--info", action="store_true", help="仅显示信息,不下载")
    parser.add_argument("-i", "--index", type=int, default=-1, help="下载指定索引的资源 (从 --info 列表中选)")
    parser.add_argument("--watermark", action="store_true", help="下载带水印版本 (类型1)")

    args = parser.parse_args()

    try:
        link_info = detect_share_type(args.url)

        if link_info["type"] == "qianwen_chat":
            info = fetch_type1(link_info["share_id"])
        else:
            info = fetch_type2(link_info["share_id"], link_info["author_id"])

        print_info(info)

        if args.info:
            print(f"\n(--info 模式,不下载)")
            return

        print(f"\n{'='*60}")

        all_res = info["allResources"]

        if args.all:
            for res in all_res:
                url = res.get("url", "")
                is_vid = res.get("type") == "video" or ".mp4" in url
                ext = "mp4" if is_vid else ("png" if ".png" in url else "jpg")
                fname = generate_filename(info, res["refer_id"], is_vid).replace(".mp4", f".{ext}")
                download_file(url, fname, f"{res['label']}")
        elif args.index >= 0:
            if args.index >= len(all_res):
                print(f"❌ 索引 {args.index} 超出范围 (共 {len(all_res)} 个)")
                sys.exit(1)
            res = all_res[args.index]
            fname = args.output or generate_filename(info, res["refer_id"], res.get("type") == "video")
            download_file(res["url"], fname, res["label"])
        else:
            # 默认下载第一个无水印视频
            target = None
            for res in all_res:
                if "无水印" in res["label"] or "下载" in res["label"]:
                    target = res
                    break
            if not target:
                target = all_res[0] if all_res else None
            if not target:
                print("❌ 没有可下载的资源")
                sys.exit(1)

            fname = args.output or generate_filename(info, is_video=target.get("type") == "video")
            download_file(target["url"], fname, target["label"])

        print(f"\n🎉 完成!")

    except Exception as e:
        print(f"\n❌ 错误: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()

1 个帖子 - 1 位参与者

阅读完整话题

来源: linux.do查看原文