python+阿里大模型api实现模拟小云雀ai短剧生成工具BfwStoryForge代码

代码语言:python

所属分类:其他

版本1(最新):python+阿里大模型api实现模拟小云雀ai短剧生成工具BfwStoryForge代码

代码描述:python+阿里大模型api实现模拟小云雀ai短剧生成工具BfwStoryForge代码,从剧本生成到角色道具场景音色的确定,最后分镜的生成与分镜图片的生成和视频生成,最后合成视频输出。

代码标签: python 阿里大 模型 api 实现 模拟 小云雀 ai 短剧 生成 工具 BfwStoryFo

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI有声视频生成工作流系统 - Tailwind CSS 商业级UI版本
"""

import os
import json
import uuid
import time
import base64
import urllib.request
import urllib.error
from datetime import datetime
from pathlib import Path
from flask import Flask, request, jsonify, send_from_directory

# ============================================================
# 配置
# ============================================================
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "sk-")
DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", OPENAI_API_KEY)
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1")

WORK_DIR = Path("workspace")
WORK_DIR.mkdir(exist_ok=True)
PROJECTS_FILE = WORK_DIR / "projects.json"

app = Flask(__name__)

# ============================================================
# 辅助工具
# ============================================================
def log_api_call(endpoint, req_data, res_data):
    print(f"\n{'='*20}[API CALL RESULT] {endpoint} {'='*20}")
    print(">>> REQUEST PARAMS:")
    print(json.dumps(req_data, ensure_ascii=False, indent=2) if req_data else "None")
    print("<<< RESPONSE PARAMS:")
    res_str = json.dumps(res_data, ensure_ascii=False, indent=2) if res_data else "None"
    if len(res_str) > 3000:
        res_str = res_str[:3000] + "\n...[DATA TRUNCATED IN LOG] ..."
    print(res_str)
    print("=" * 60 + "\n")


def _get_resolution(aspect_ratio):
    mapping = {
        "16:9": (1280, 720), "9:16": (720, 1280), "4:3": (1024, 768),
        "3:4": (768, 1024), "1:1": (1024, 1024), "3:2": (1080, 720), "2.35:1": (1280, 544)
    }
    return mapping.get(aspect_ratio, (1280, 720))


def get_base64_image_uri(local_filepath: Path) -> str:
    with open(local_filepath, "rb") as f:
        b64_data = base64.b64encode(f.read()).decode("utf-8")
    ext = local_filepath.suffix.lower().replace(".", "")
    if ext in ["jpg", "jpeg"]:
        ext = "jpeg"
    elif ext == "webp":
        ext = "webp"
    else:
        ext = "png"
    return f"data:image/{ext};base64,{b64_data}"


def _normalize_scene_characters(scene: dict):
    chars = scene.get("characters")
    char_ids = scene.get("character_ids")
    if isinstance(chars, list) and chars:
        scene["character_ids"] = chars[:]
    elif isinstance(char_ids, list) and char_ids:
        scene["characters"] = char_ids[:]
    elif scene.get("speaking_character"):
        scene["characters"] = [scene.get("speaking_character")]
        scene["character_ids"] = [scene.get("speaking_character")]
    else:
        scene["characters"] = []
        scene["character_ids"] = []
    if not isinstance(scene.get("props"), list):
        scene["props"] = []
    if not isinstance(scene.get("prop_ids"), list):
        scene["prop_ids"] = scene["props"][:]
    if not isinstance(scene.get("scenes"), list):
        scene["scenes"] = []
    if not isinstance(scene.get("scene_ids"), list):
        scene["scene_ids"] = scene["scenes"][:]
    if not isinstance(scene.get("character_looks"), list):
        scene["character_looks"] = []
    if not isinstance(scene.get("prop_variants"), list):
        scene["prop_variants"] = []
    scene.setdefault("transition_note", "")
    scene.setdefault("continuity_note", "")


def _normalize_script_structure(script: dict):
    script.setdefault("title", "")
    script.setdefault("synopsis", "")
    script.setdefault("characters", [])
    script.setdefault("props", [])
    script.setdefault("scenes", [])
    script.setdefault("segment_outlines", [])
    for c in script.get("characters", []):
        c.setdefault("looks", [])
        for lk in c.get("looks", []):
            lk.setdefault("id", "")
            lk.setdefault("name", "")
            lk.setdefault("description", "")
            lk.setdefault("usage", "")
            lk.setdefault("image_url", "")
            lk.setdefault("image_history", [])
    for p in script.get("props", []):
        p.setdefault("variants", [])
        for v in p.get("variants", []):
            v.setdefault("id", "")
            v.setdefault("name", "")
            v.setdefault("description", "")
            v.setdefault("usage", "")
            v.setdefault("image_url", "")
            v.setdefault("image_history", [])
    for s in script.get("scenes", []):
        s.setdefault("id", "")
        s.setdefault("name", "")
        s.setdefault("description", "")
        s.setdefault("usage", "")
        s.setdefault("image_url", "")
        s.setdefault("image_history", [])
    for seg in script.get("segment_outlines", []):
        seg.setdefault("characters_involved", [])
        seg.setdefault("props_involved", [])
        seg.setdefault("scenes_involved", [])
    return script


def _collect_scene_character_ids(scene: dict):
    ids = []
    if isinstance(scene.get("characters"), list):
        ids.extend([x for x in scene.get("characters", []) if x])
    if isinstance(scene.get("character_ids"), list):
        ids.extend([x for x in scene.get("character_ids", []) if x])
    if scene.get("speaking_character"):
        ids.append(scene.get("speaking_character"))
    result = []
    seen = set()
    for x in ids:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result


def _collect_scene_prop_ids(scene: dict):
    ids = []
    if isinstance(scene.get("props"), list):
        ids.extend([x for x in scene.get("props", []) if x])
    if isinstance(scene.get("prop_ids"), list):
        ids.extend([x for x in scene.get("prop_ids", []) if x])
    result = []
    seen = set()
    for x in ids:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result


def _collect_scene_scene_ids(scene: dict):
    ids = []
    if isinstance(scene.get("scenes"), list):
        ids.extend([x for x in scene.get("scenes", []) if x])
    if isinstance(scene.get("scene_ids"), list):
        ids.extend([x for x in scene.get("scene_ids", []) if x])
    result = []
    seen = set()
    for x in ids:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result


def _find_character(project, char_id):
    for c in project.get("characters", []):
        if str(c.get("id")) == str(char_id):
            return c
    return None


def _find_prop(project, prop_id):
    for p in project.get("props", []):
        if str(p.get("id")) == str(prop_id):
            return p
    return None


def _find_scene(project, scene_id):
    for s in project.get("scenes", []):
        if str(s.get("id")) == str(scene_id):
            return s
    return None


def _find_character_look(project, look_id):
    for c in project.get("characters", []):
        for lk in c.get("looks", []):
            if str(lk.get("id")) == str(look_id):
                return c, lk
    return None, None


def _find_prop_variant(project, variant_id):
    for p in project.get("props", []):
        for v in p.get("variants", []):
            if str(v.get("id")) == str(variant_id):
                return p, v
    return None, None


def _public_or_base64_image(project_id: str, url: str):
    if not url:
        return None
    if url.startswith("/workspace/"):
        local_path = WORK_DIR / project_id / os.path.basename(url)
        if local_path.exists():
            return get_base64_image_uri(local_path)
        return None
    if url.startswith("data:image/"):
        return url
    return url


def combine_images_to_base64(image_list, output_path: Path):
    try:
        from PIL import Image
        from io import BytesIO
        imgs = []
        for item in image_list:
            try:
                if item.startswith("data:image"):
                    b64_data = item.split(",", 1)[1]
                    imgs.append(Image.open(BytesIO(base64.b64decode(b64_data))).convert("RGB"))
                elif os.path.exists(item):
                    imgs.append(Image.open(item).convert("RGB"))
            except Exception as ex:
                print(f"Failed to load image for combining: {ex}")
                continue
        if not imgs:
            return None
        widths, heights = zip(*(i.size for i in imgs))
        total_width = sum(widths)
        max_height = max(heights)
        scale = 1.0
        if total_width > 2048 or max_height > 2048:
            scale = min(2048.0 / total_width, 2048.0 / max_height)
        new_width = int(total_width * scale)
        new_height = int(max_height * scale)
        combined = Image.new('RGB', (new_width, new_height), (255, 255, 255))
        x_offset = 0
        for img in imgs:
            w, h = img.size
            nw, nh = int(w * scale), int(h * scale)
            resized = img.resize((nw, nh), Image.Resampling.LANCZOS)
            combined.paste(resized, (x_offset, 0))
            x_offset += nw
        combined.save(str(output_path), "JPEG", quality=85)
        return get_base64_image_uri(output_path)
    except Exception as e:
        print(f"Combine images error: {e}")
        return None


# ============================================================
# OpenAI / DashScope 统一调用封装
# ============================================================
def call_openai_chat(messages, model="qwen3.5-plus", temperature=0.8, response_format=None):
    print(f"\n{'='*25} [LLM PROMPT SUBMITTED] {'='*25}")
    print(f"Model: {model} | Temperature: {temperature}")
    print("-" * 72)
    for msg in messages:
        role = msg.get("role", "unknown").upper()
        content = msg.get("content", "")
        print(f"[{role}]:\n{content}\n")
    print("=" * 72 + "\n")

    url = f"{OPENAI_BASE_URL}/chat/completions"
    payload = {"model": model, "messages": messages, "temperature": temperature}
    if response_format:
        payload["response_format"] = response_format
    data = json.dumps(payload).encode("utf-8")
    req = urllib.request.Request(url, data=data, method="POST")
    req.add_header("Content-Type", "application/json")
    req.add_header("Authorization", f"Bearer {OPENAI_API_KEY}")
    try:
        with urllib.request.urlopen(req, timeout=620) as resp:
            result = json.loads(resp.read().decode("utf-8"))
            return result["choices"][0]["message"]["content"]
    except urllib.error.HTTPError as e:
        print(f"OpenAI API HTTP Error {e.code}: {e.read().decode('utf-8')}")
        return None
    except Exception as e:
        print(f"OpenAI API Error: {e}")
        return None


def _submit_and_poll_dashscope_task(url, payload, timeout=900, poll_interval=10):
    headers = {
        "Authorization": f"Bearer {DASHSCOPE_API_KEY}",
        "Content-Type": "application/json",
        "X-DashScope-Async": "enable"
    }
    try:
        req = urllib.request.Request(url, data=json.dumps(payload).encode('utf-8'), headers=headers, method="POST")
        with urllib.request.urlopen(req, timeout=30) as resp:
            res_data = json.loads(resp.read().decode('utf-8'))
            task_id = res_data.get("output", {}).get("task_id")
            if not task_id:
                print(f"Task submit failed: {res_data}")
                return None
            print(f"Task submitted: {task_id}")
    except urllib.error.HTTPError as e:
        print(f"Task submit HTTP Error {e.code}: {e.read().decode('utf-8')}")
        return None
    except Exception as e:
        print(f"Task submit error: {e}")
        return None

    start_time = time.time()
    poll_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}"
    poll_headers = {"Authorization": f"Bearer {DASHSCOPE_API_KEY}"}
    while time.time() - start_time < timeout:
        time.sleep(poll_interval)
        try:
            req = urllib.request.Request(poll_url, headers=poll_headers, method="GET")
            with urllib.request.urlopen(req, timeout=30) as resp:
                poll_data = json.loads(resp.read().decode('utf-8'))
                status = poll_data.get("output", {}).get("task_status")
                if status == "SUCCEEDED":
                    return {"success": True, "data": poll_data}
                elif status in ["FAILED", "CANCELED", "EXPIRED", "UNKNOWN"]:
                    print(f"Task failed: {poll_data}")
                    return {"success": False, "data": poll_data}
        except Exception as e:
            print(f"Poll error: {e}")
    print("Task timeout")
    return None


# ============================================================
# AI Media API
# ============================================================
class AIMediaAPI:
    @staticmethod
    def generate_character_image(character_description, style, output_dir):
        img_path = output_dir / f"char_{uuid.uuid4().hex[:8]}.png"
        prompt = (
            f"整体画风必须.........完整代码请登录后点击上方下载按钮下载查看

网友评论0