python+阿里通义api实现无限ai短剧电视剧电影生成工具bfwshoter1.2版本代码

代码语言:python

所属分类:其他

代码描述:python+阿里通义api实现无限ai短剧电视剧电影生成工具bfwshoter1.2版本代码,从剧本到角色场景道具到分镜画面和分镜视频最后合成,角儿支持同一个人不同场景下的服饰与造型,支持道具的不同状态下的变种,支持多版本多项目管理多片段。

代码标签: python 阿里 通义 api 无限 ai 短剧 电视剧 电影 生成 工具 bfwshoter 1

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI有声视频生成工作流系统 - 阿里通义万相版本
支持:
1. 角色 / 道具 / 场景基础设定
2. 角色造型(look)与道具变体(variant)
3. 分镜使用场景参考后,scene_description 只描述“在该场景基础上的主体画面”
4. 分镜生成时自动带入上一镜头信息,增强前后衔接
"""

import os
import json
import uuid
import time
import base64
import urllib.request
import urllib.error
from datetime import datetime
from pathlib import Path
from flask import Flask, request, jsonify, send_from_directory

# ============================================================
# 配置
# ============================================================
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "sk-")
DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", OPENAI_API_KEY)
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1")

WORK_DIR = Path("workspace")
WORK_DIR.mkdir(exist_ok=True)
PROJECTS_FILE = WORK_DIR / "projects.json"

app = Flask(__name__)

# ============================================================
# 辅助工具
# ============================================================
def log_api_call(endpoint, req_data, res_data):
    print(f"\n{'='*20}[API CALL] {endpoint} {'='*20}")
    print(">>> REQUEST PARAMS:")
    print(json.dumps(req_data, ensure_ascii=False, indent=2) if req_data else "None")
    print("<<< RESPONSE PARAMS:")
    res_str = json.dumps(res_data, ensure_ascii=False, indent=2) if res_data else "None"
    if len(res_str) > 3000:
        res_str = res_str[:3000] + "\n... [DATA TRUNCATED IN LOG] ..."
    print(res_str)
    print("=" * 60 + "\n")


def _get_resolution(aspect_ratio):
    mapping = {
        "16:9": (1280, 720),
        "9:16": (720, 1280),
        "4:3": (1024, 768),
        "3:4": (768, 1024),
        "1:1": (1024, 1024),
        "3:2": (1080, 720)
    }
    return mapping.get(aspect_ratio, (1280, 720))


def get_base64_image_uri(local_filepath: Path) -> str:
    with open(local_filepath, "rb") as f:
        b64_data = base64.b64encode(f.read()).decode("utf-8")
    ext = local_filepath.suffix.lower().replace(".", "")
    if ext in ["jpg", "jpeg"]:
        ext = "jpeg"
    elif ext == "webp":
        ext = "webp"
    else:
        ext = "png"
    return f"data:image/{ext};base64,{b64_data}"


def _normalize_scene_characters(scene: dict):
    chars = scene.get("characters")
    char_ids = scene.get("character_ids")
    if isinstance(chars, list) and chars:
        scene["character_ids"] = chars[:]
    elif isinstance(char_ids, list) and char_ids:
        scene["characters"] = char_ids[:]
    elif scene.get("speaking_character"):
        scene["characters"] = [scene["speaking_character"]]
        scene["character_ids"] = [scene["speaking_character"]]
    else:
        scene["characters"] = []
        scene["character_ids"] = []

    if not isinstance(scene.get("props"), list):
        scene["props"] = []
    if not isinstance(scene.get("prop_ids"), list):
        scene["prop_ids"] = scene["props"][:]

    if not isinstance(scene.get("scenes"), list):
        scene["scenes"] = []
    if not isinstance(scene.get("scene_ids"), list):
        scene["scene_ids"] = scene["scenes"][:]

    if not isinstance(scene.get("character_looks"), list):
        scene["character_looks"] = []

    if not isinstance(scene.get("prop_variants"), list):
        scene["prop_variants"] = []

    scene.setdefault("transition_note", "")
    scene.setdefault("continuity_note", "")


def _normalize_script_structure(script: dict):
    script.setdefault("title", "")
    script.setdefault("synopsis", "")
    script.setdefault("characters", [])
    script.setdefault("props", [])
    script.setdefault("scenes", [])
    script.setdefault("segment_outlines", [])

    for c in script.get("characters", []):
        c.setdefault("looks", [])
        for lk in c.get("looks", []):
            lk.setdefault("id", "")
            lk.setdefault("name", "")
            lk.setdefault("description", "")
            lk.setdefault("usage", "")
            lk.setdefault("image_url", "")
            lk.setdefault("image_history", [])

    for p in script.get("props", []):
        p.setdefault("variants", [])
        for v in p.get("variants", []):
            v.setdefault("id", "")
            v.setdefault("name", "")
            v.setdefault("description", "")
            v.setdefault("usage", "")
            v.setdefault("image_url", "")
            v.setdefault("image_history", [])

    for s in script.get("scenes", []):
        s.setdefault("id", "")
        s.setdefault("name", "")
        s.setdefault("description", "")
        s.setdefault("usage", "")
        s.setdefault("image_url", "")
        s.setdefault("image_history", [])

    for seg in script.get("segment_outlines", []):
        seg.setdefault("characters_involved", [])
        seg.setdefault("props_involved", [])
        seg.setdefault("scenes_involved", [])

    return script


def _collect_scene_character_ids(scene: dict):
    ids = []
    if isinstance(scene.get("characters"), list):
        ids.extend([x for x in scene.get("characters", []) if x])
    if isinstance(scene.get("character_ids"), list):
        ids.extend([x for x in scene.get("character_ids", []) if x])
    if scene.get("speaking_character"):
        ids.append(scene.get("speaking_character"))
    result = []
    seen = set()
    for x in ids:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result


def _collect_scene_prop_ids(scene: dict):
    ids = []
    if isinstance(scene.get("props"), list):
        ids.extend([x for x in scene.get("props", []) if x])
    if isinstance(scene.get("prop_ids"), list):
        ids.extend([x for x in scene.get("prop_ids", []) if x])
    result = []
    seen = set()
    for x in ids:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result


def _collect_scene_scene_ids(scene: dict):
    ids = []
    if isinstance(scene.get("scenes"), list):
        ids.extend([x for x in scene.get("scenes", []) if x])
    if isinstance(scene.get("scene_ids"), list):
        ids.extend([x for x in scene.get("scene_ids", []) if x])
    result = []
    seen = set()
    for x in ids:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result


def _find_character(project, char_id):
    for c in project.get("characters", []):
        if str(c.get("id")) == str(char_id):
            return c
    return None


def _find_prop(project, prop_id):
    for p in project.get("props", []):
        if str(p.get("id")) == str(prop_id):
            return p
    return None


def _find_scene(project, scene_id):
    for s in project.get("scenes", []):
        if str(s.get("id")) == str(scene_id):
            return s
    return None


def _find_character_look(project, look_id):
    for c in project.get("characters", []):
        for lk in c.get("looks", []):
            if str(lk.get("id")) == str(look_id):
                return c, lk
    return None, None


def _find_prop_variant(project, variant_id):
    for p in project.get("props", []):
        for v in p.get("variants", []):
            if str(v.get("id")) == str(variant_id):
                return p, v
    return None, None


def _public_or_base64_image(project_id: str, url: str):
    if not url:
        return None
    if url.startswith("/workspace/"):
        local_path = WORK_DIR / project_id / os.path.basename(url)
        if local_path.exists():
            return get_base64_image_uri(local_path)
        return None
    if url.startswith("data:image/"):
        return url
    return url


def combine_images_to_base64(image_list, output_path: Path):
    try:
        from PIL import Image
        from io import BytesIO
        imgs = []
        for item in image_list:
            try:
                if item.startswith("data:image"):
                    b64_data = item.split(",", 1)[1]
                    imgs.append(Image.open(BytesIO(base64.b64decode(b64_data))).convert("RGB"))
                elif os.path.exists(item):
                    imgs.append(Image.open(item).convert("RGB"))
            except Exception as ex:
                print(f"Failed to load image for combining: {ex}")
                continue

        if not imgs:
            return None

        widths, heights = zip(*(i.size for i in imgs))
        total_width = sum(widths)
        max_height = max(heights)

        scale = 1.0
        if total_width > 2048 or max_height > 2048:
            scale = min(2048.0 / total_width, 2048.0 / max_height)

        new_width = int(total_width * scale)
        new_height = int(max_height * scale)

        combined = Image.new('RGB', (new_width, new_height), (255, 255, 255))
        x_offset = 0
        for img in imgs:
            w, h = img.size
            nw, nh = int(w * scale), int(h * scale)
            resized = img.resize((nw, nh), Image.Resampling.LANCZOS)
            combined.paste(resized, (x_offset, 0))
            x_offset += nw

        combined.save(str(output_path), "JPEG", quality=85)
        return get_base64_image_uri(output_path)
    except Exception as e:
        print(f"Combine images error: {e}")
        return None


# ============================================================
# OpenAI / DashScope 统一调用封装
# ============================================================
def call_openai_chat(messages, model="qwen3.5-plus", temperature=0.8, response_format=None):
    url = f"{OPENAI_BASE_URL}/chat/completions"
    payload = {
        "model": model,
        "messages": messages,
        "temperature": temperature,
    }
    if response_format:
        payload["response_format"] = response_format
    data = json.dumps(payload).encode("utf-8")
    req = urllib.request.Request(url, data=data, method="POST")
    req.add_header("Content-Type", "application/json")
    req.add_header("Authorization", f"Bearer {OPENAI_API_KEY}")
    try:
        with urllib.request.urlopen(req, timeout=120) as resp:
            result = json.loads(resp.read().decode("utf-8"))
            return result["choices"][0]["message"]["content"]
    except urllib.error.HTTPError as e:
        print(f"OpenAI API HTTP Error {e.code}: {e.read().decode('utf-8')}")
        return None
    except Exception as e:
        print(f"OpenAI API Error: {e}")
        return None


def _submit_and_poll_dashscope_task(url, payload, timeout=900, poll_interval=10):
    headers = {
        "Authorization": f"Bearer {DASHSCOPE_API_KEY}",
        "Content-Type": "application/json",
        "X-DashScope-Async": "enable"
    }
    try:
        req = urllib.request.Request(url, data=json.dumps(payload).encode('utf-8'), headers=headers, method="POST")
        with urllib.request.urlopen(req, timeout=30) as resp:
            res_data = json.loads(resp.read().decode('utf-8'))
            task_id = res_data.get("output", {}).get("task_id")
            if not task_id:
                print(f"Task submit failed: {res_data}")
                return None
            print(f"Task submitted: {task_id}")
    except urllib.error.HTTPError as e:
        print(f"Task submit HTTP Error {e.code}: {e.read().decode('utf-8')}")
        return None
    except Exception as e:
        print(f"Task submit error: {e}")
        return None

    start_time = time.time()
    poll_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}"
    poll_headers = {"Authorization": f"Bearer {DASHSCOPE_API_KEY}"}
    while time.time() - start_time < timeout:
        time.sleep(poll_interval)
        try:
            req = urllib.request.Request(poll_url, headers=poll_headers, method="GET")
            with urllib.request.urlopen(req, timeout=30) as resp:
                poll_data = json.loads(resp.read().decode('utf-8'))
                status = poll_data.get("output", {}).get("task_status")
                if status == "SUCCEEDED":
                    return {"success": True, "data": poll_data}
                elif status in ["FAILED", "CANCELED", "EXPIRED", "UNKNOWN"]:
                    print(f"Task failed: {poll_data}")
                    return {"success": False, "data": poll_data}
        except Exception as e:
            print(f"Poll error: {e}")
    print("Task timeout")
    return None


# ============================================================
# AI Media API
# ============================================================
class AIMediaAPI:
    @staticmethod
    def generate_character_image(character_description: str, style: str, output_dir: Path) -> str:
        img_path = output_dir / f"char_{uuid.uuid4().hex[:8]}.png"
        prompt = (
            f"整体画风必须严格保持为:{style}。"
            f"角色/造型描述:{character_description}。"
            "请生成一张角色全身正面设定图。必须是全身照,正面站立。"
            "背景必须是纯白色背景,干净,无场景,无道具,无杂物。"
        )
        url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text2image/image-synthesis"
        payload = {
            "model": "wanx2.1-t2i-turbo",
            "input": {"prompt": prompt[:1000]},
            "parameters": {"size": "1024*1024", "n": 1}
        }
        res = _submit_and_poll_dashscope_task(url, payload)
        if res and res.get("success"):
            try:
                img_url = res["data"]["output"]["results"][0]["url"]
                urllib.request.urlretrieve(img_url, str(img_path))
                return str(img_path)
            except Exception as e:
                print(f"Failed to download image: {e}")
        _create_character_sheet_placeholder(img_path, character_description, style)
        return str(img_path)

    @staticmethod
    def generate_prop_image(prop_description: str, style: str, output_dir: Path) -> str:
        img_path = output_dir / f"prop_{uuid.uuid4().hex[:8]}.png"
        prompt = (
            f"整体画风必须严格保持为:{style}。"
            f"道具/变体描述:{prop_description}。"
            "请生成一张专业道具设定图,主体居中,道具清晰完整。"
            "背景纯白色,不要人物,不要场景。"
        )
        url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text2image/image-synthesis"
        payload = {
            "model": "wanx2.1-t2i-turbo",
            "input": {"prompt": prompt[:1000]},
            "parameters": {"size": "1024*1024", "n": 1}
        }
        res = _submit_and_poll_dashscope_task(url, payload)
        if res and res.get("success"):
            try:
                img_url = res["data"]["output"]["results"][0]["url"]
                urllib.request.urlretrieve(img_url, str(img_path))
                return str(img_path)
            except Exception as e:
                print(f"Failed to download prop image: {e}")
        _create_prop_placeholder(img_path, prop_description, style)
        return str(img_path)

    @staticmethod
    def generate_scene_image(scene_description: str, style: str, output_dir: Path, aspect_ratio="16:9") -> str:
        img_path = output_dir / f"scene_{uuid.uuid4().hex[:8]}.png"
        w, h = _get_resolution(aspect_ratio)
        size_str = f"{w}*{h}" if w in [1024, 1280, 720, 768, 1080] else "1280*720"
        prompt = (
            f"整体画风必须严格保持为:{style}。"
            f"场景描述:{scene_description}。"
            "请生成一张纯场景环境设定图。"
            "不要出现人物,不要出现近景主体人物,不要出现对白气泡。"
            "重点体现空间结构、环境材质、光线、氛围、色调、陈设布局。"
        )
        url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text2image/image-synthesis"
        payload = {
            "model": "wanx2.1-t2i-turbo",
            "input": {"prompt": prompt[:1000]},
            "parameters": {"size": size_str, "n": 1}
        }
        res = _submit_and_poll_dashscope_task(url, payload)
        if res and res.get("success"):
            try:
                img_url = res["data"]["output"]["results"][0]["url"]
                urllib.request.urlretrieve(img_url, str(img_path))
                return str(img_path)
            except Exception as e:
                print(f"Failed to download scene image: {e}")
        _create_placeholder_image(img_path, f"场景设定图\n{scene_description}", w, h, bg=(235, 240, 245), fg=(20, 20, 20))
        return str(img_path)

    @staticmethod
    def edit_image(image_url: str, edit_instruction: str, output_dir: Path, keep_layout_hint: str = "") -> str:
        img_path = output_dir / f"edit_{uuid.uuid4().hex[:8]}.png"
        url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation"
        payload = {
    .........完整代码请登录后点击上方下载按钮下载查看

网友评论0