python+阿里通义万相大模型api实现ai电影电视剧视频无限生成Bfwshoter代码

代码语言:python

所属分类:其他

代码描述:python+阿里通义万相大模型api实现ai电影电视剧视频无限生成Bfwshoter代码,支持剧本分镜头、角色、道具、片段分镜头及视频生成,支持角色音色生成,一个python代码就能拥有多项目管理的ai电影电视剧无限生成框架。

代码标签: python 阿里 通义 万相 大模型 api ai 电影 电视剧 视频 无限 生成 Bfwshot

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI有声视频生成工作流系统 - 阿里通义万相版本
"""

import os
import json
import uuid
import time
import base64
import traceback
import urllib.request
import urllib.error
from datetime import datetime
from pathlib import Path
from flask import Flask, request, jsonify, send_from_directory

# ============================================================
# 配置
# ============================================================
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY",  "sk-")
DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", OPENAI_API_KEY)
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1")

WORK_DIR = Path("workspace")
WORK_DIR.mkdir(exist_ok=True)
PROJECTS_FILE = WORK_DIR / "projects.json"

app = Flask(__name__)

# ============================================================
# 辅助工具
# ============================================================
def log_api_call(endpoint, req_data, res_data):
    """打印API请求和响应的日志参数,方便提示和调试"""
    print(f"\n{'='*20}[API CALL] {endpoint} {'='*20}")
    print(">>> REQUEST PARAMS:")
    print(json.dumps(req_data, ensure_ascii=False, indent=2) if req_data else "None")
    print("<<< RESPONSE PARAMS:")
    # 截断过长的base64数据以保持日志可读
    res_str = json.dumps(res_data, ensure_ascii=False, indent=2) if res_data else "None"
    if len(res_str) > 3000:
        res_str = res_str[:3000] + "\n... [DATA TRUNCATED IN LOG] ..."
    print(res_str)
    print("="*60 + "\n")

def _get_resolution(aspect_ratio):
    mapping = {
        "16:9": (1280, 720),
        "9:16": (720, 1280),
        "4:3": (1024, 768),
        "3:4": (768, 1024),
        "1:1": (1024, 1024),
        "3:2": (1080, 720)
    }
    return mapping.get(aspect_ratio, (1280, 720))

def get_base64_image_uri(local_filepath: Path) -> str:
    with open(local_filepath, "rb") as f:
        b64_data = base64.b64encode(f.read()).decode("utf-8")
    ext = local_filepath.suffix.lower().replace(".", "")
    if ext in ["jpg", "jpeg"]:
        ext = "jpeg"
    elif ext == "webp":
        ext = "webp"
    else:
        ext = "png"
    return f"data:image/{ext};base64,{b64_data}"

def _normalize_scene_characters(scene: dict):
    chars = scene.get("characters")
    char_ids = scene.get("character_ids")
    if isinstance(chars, list) and chars:
        scene["character_ids"] = chars[:]
    elif isinstance(char_ids, list) and char_ids:
        scene["characters"] = char_ids[:]
    elif scene.get("speaking_character"):
        scene["characters"] = [scene["speaking_character"]]
        scene["character_ids"] = [scene["speaking_character"]]
    else:
        scene["characters"] = []
        scene["character_ids"] = []
    if not isinstance(scene.get("props"), list):
        scene["props"] =[]
    if not isinstance(scene.get("prop_ids"), list):
        scene["prop_ids"] = scene["props"][:]

def _normalize_script_structure(script: dict):
    script.setdefault("title", "")
    script.setdefault("synopsis", "")
    script.setdefault("characters",[])
    script.setdefault("props", [])
    script.setdefault("scenes",[])
    for scene in script.get("scenes",[]):
        _normalize_scene_characters(scene)
    return script

def _collect_scene_character_ids(scene: dict):
    ids =[]
    if isinstance(scene.get("characters"), list):
        ids.extend([x for x in scene.get("characters", []) if x])
    if isinstance(scene.get("character_ids"), list):
        ids.extend([x for x in scene.get("character_ids", []) if x])
    if scene.get("speaking_character"):
        ids.append(scene.get("speaking_character"))
    result =[]
    seen = set()
    for x in ids:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result

def _collect_scene_prop_ids(scene: dict):
    ids =[]
    if isinstance(scene.get("props"), list):
        ids.extend([x for x in scene.get("props", []) if x])
    if isinstance(scene.get("prop_ids"), list):
        ids.extend([x for x in scene.get("prop_ids", []) if x])
    result =[]
    seen = set()
    for x in ids:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result

def _public_or_base64_image(project_id: str, url: str):
    if not url:
        return None
    if url.startswith("/workspace/"):
        local_path = WORK_DIR / project_id / os.path.basename(url)
        if local_path.exists():
            return get_base64_image_uri(local_path)
        return None
    if url.startswith("data:image/"):
        return url
    return url

def combine_images_to_base64(image_list, output_path: Path):
    """
    将多个图片合成一张图作为参考图,最大宽高限制为2048像素
    """
    try:
        from PIL import Image
        from io import BytesIO
        imgs =[]
        for item in image_list:
            try:
                if item.startswith("data:image"):
                    b64_data = item.split(",", 1)[1]
                    imgs.append(Image.open(BytesIO(base64.b64decode(b64_data))).convert("RGB"))
                elif os.path.exists(item):
                    imgs.append(Image.open(item).convert("RGB"))
            except Exception as ex:
                print(f"Failed to load image for combining: {ex}")
                continue

        if not imgs:
            return None

        # 水平拼接
        widths, heights = zip(*(i.size for i in imgs))
        total_width = sum(widths)
        max_height = max(heights)

        scale = 1.0
        if total_width > 2048 or max_height > 2048:
            scale = min(2048.0 / total_width, 2048.0 / max_height)

        new_width = int(total_width * scale)
        new_height = int(max_height * scale)

        combined = Image.new('RGB', (new_width, new_height), (255, 255, 255))
        x_offset = 0
        for img in imgs:
            w, h = img.size
            nw, nh = int(w * scale), int(h * scale)
            resized = img.resize((nw, nh), Image.Resampling.LANCZOS)
            combined.paste(resized, (x_offset, 0))
            x_offset += nw

        combined.save(str(output_path), "JPEG", quality=85)
        return get_base64_image_uri(output_path)
    except Exception as e:
        print(f"Combine images error: {e}")
        return None


# ============================================================
# OpenAI / DashScope 统一调用封装
# ============================================================
def call_openai_chat(messages, model="qwen3.5-plus", temperature=0.8, response_format=None):
    url = f"{OPENAI_BASE_URL}/chat/completions"
    payload = {
        "model": model,
        "messages": messages,
        "temperature": temperature,
    }
    if response_format:
        payload["response_format"] = response_format
    data = json.dumps(payload).encode("utf-8")
    req = urllib.request.Request(url, data=data, method="POST")
    req.add_header("Content-Type", "application/json")
    req.add_header("Authorization", f"Bearer {OPENAI_API_KEY}")
    try:
        with urllib.request.urlopen(req, timeout=120) as resp:
            result = json.loads(resp.read().decode("utf-8"))
            return result["choices"][0]["message"]["content"]
    except urllib.error.HTTPError as e:
        print(f"OpenAI API HTTP Error {e.code}: {e.read().decode('utf-8')}")
        return None
    except Exception as e:
        print(f"OpenAI API Error: {e}")
        return None

def _submit_and_poll_dashscope_task(url, payload, timeout=900, poll_interval=10):
    headers = {
        "Authorization": f"Bearer {DASHSCOPE_API_KEY}",
        "Content-Type": "application/json",
        "X-DashScope-Async": "enable"
    }
    try:
        req = urllib.request.Request(url, data=json.dumps(payload).encode('utf-8'), headers=headers, method="POST")
        with urllib.request.urlopen(req, timeout=30) as resp:
            res_data = json.loads(resp.read().decode('utf-8'))
            task_id = res_data.get("output", {}).get("task_id")
            if not task_id:
                print(f"Task submit failed: {res_data}")
                return None
            print(f"Task submitted: {task_id}")
    except urllib.error.HTTPError as e:
        print(f"Task submit HTTP Error {e.code}: {e.read().decode('utf-8')}")
        return None
    except Exception as e:
        print(f"Task submit error: {e}")
        return None

    start_time = time.time()
    poll_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}"
    poll_headers = {"Authorization": f"Bearer {DASHSCOPE_API_KEY}"}
    while time.time() - start_time < timeout:
        time.sleep(poll_interval)
        try:
            req = urllib.request.Request(poll_url, headers=poll_headers, method="GET")
            with urllib.request.urlopen(req, timeout=30) as resp:
                poll_data = json.loads(resp.read().decode('utf-8'))
                status = poll_data.get("output", {}).get("task_status")
                if status == "SUCCEEDED":
                    return {"success": True, "data": poll_data}
                elif status in ["FAILED", "CANCELED", "EXPIRED", "UNKNOWN"]:
                    print(f"Task failed: {poll_data}")
                    return {"success": False, "data": poll_data}
        except Exception as e:
            print(f"Poll error: {e}")
    print("Task timeout")
    return None

# ============================================================
# AI Media API
# ============================================================
class AIMediaAPI:
    @staticmethod
    def generate_character_image(character_description: str, style: str, output_dir: Path) -> str:
        img_path = output_dir / f"char_{uuid.uuid4().hex[:8]}.png"
        prompt = (
            f"整体画风必须严格保持为:{style}。"
            f"角色描述:{character_description}。"
            "请生成一张角色全身正面设定图。必须是全身照,正面站立。"
            "背景必须是纯白色背景,干净,无场景,无道具,无杂物。"
        )
        url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text2image/image-synthesis"
        payload = {
            "model": "wanx2.1-t2i-turbo",
            "input": {"prompt": prompt[:1000]},
            "parameters": {"size": "1024*1024", "n": 1}
        }
        res = _submit_and_poll_dashscope_task(url, payload)
        if res and res.get("success"):
            try:
                img_url = res["data"]["output"]["results"][0]["url"]
                urllib.request.urlretrieve(img_url, str(img_path))
                return str(img_path)
            except Exception as e:
                print(f"Failed to download image: {e}")
        _create_character_sheet_placeholder(img_path, character_description, style)
        return str(img_path)

    @staticmethod
    def generate_prop_image(prop_description: str, style: str, output_dir: Path) -> str:
        img_path = output_dir / f"prop_{uuid.uuid4().hex[:8]}.png"
        prompt = (
            f"整体画风必须严格保持为:{style}。"
            f"道具描述:{prop_description}。"
            "请生成一张专业道具设定图,主体居中,道具清晰完整。"
            "背景纯白色,不要人物,不要场景。"
        )
        url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text2image/image-synthesis"
        payload = {
            "model": "wanx2.1-t2i-turbo",
            "input": {"prompt": prompt[:1000]},
            "parameters": {"size": "1024*1024", "n": 1}
        }
        res = _submit_and_poll_dashscope_task(url, payload)
        if res and res.get("success"):
            try:
                img_url = res["data"]["output"]["results"][0]["url"]
                urllib.request.urlretrieve(img_url, str(img_path))
                return str(img_path)
            except Exception as e:
                print(f"Failed to download prop image: {e}")
        _create_prop_placeholder(img_path, prop_description, style)
        return str(img_path)

    @staticmethod
    def edit_image(image_url: str, edit_instruction: str, output_dir: Path, keep_layout_hint: str = "") -> str:
        img_path = output_dir / f"edit_{uuid.uuid4().hex[:8]}.png"
        url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation"
        payload = {
            "model": "qwen-image-2.0-pro",
            "input": {
                "messages": [
                    {
                        "role": "user",
                        "content":[
                            {"image": image_url},
                            {"text": f"请基于这张图片进行修改:{edit_instruction}。{keep_layout_hint}"}
                        ]
                    }
                ]
            },
            .........完整代码请登录后点击上方下载按钮下载查看

网友评论0