python+豆包大模型api实现ai电影电视剧视频无限生成画布画板Bfwshoter代码

代码语言:python

所属分类:其他

代码描述:python+豆包大模型api实现ai电影电视剧视频无限生成画布画板Bfwshoter代码,支持剧本分镜头、角色、道具、片段分镜头九宫格图片生成及视频生成,支持角色音色生成,一个python代码就能拥有多项目管理的ai电影电视剧无限生成画布,可修改支持seedance2的api。

代码标签: python 豆包 大模型 api ai 电影 电视剧 视频 无限 画布 生成 Bfwshoter

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI有声视频生成工作流bfwshoter系统 - 无限画板版 (Infinite Canvas)
包含: 节点衍生、缩放平移、占位加载动画节点、Ctrl多选视频合并、剧本编辑、豆包大模型全接入
"""

import os
import math
import json
import uuid
import time
import base64
import traceback
import urllib.request
import urllib.error
from datetime import datetime
from pathlib import Path
from flask import Flask, request, jsonify, send_from_directory

# ============================================================
# 配置 - 火山引擎方舟 (Volcengine ARK) 豆包大模型配置
# ============================================================
ARK_API_KEY = os.environ.get("ARK_API_KEY", "")
ARK_BASE_URL = os.environ.get("ARK_BASE_URL", "https://ark.cn-beijing.volces.com/api/v3")

MODEL_CHAT = "doubao-1-5-pro-32k-250115"
MODEL_IMAGE = "doubao-seedream-5-0-260128"
MODEL_VIDEO = "doubao-seedance-1-5-pro-251215"

WORK_DIR = Path("workspace")
WORK_DIR.mkdir(exist_ok=True)
PROJECTS_FILE = WORK_DIR / "projects.json"

app = Flask(__name__)

# ============================================================
# 辅助工具
# ============================================================
def log_api_call(endpoint, req_data, res_data):
    print(f"\n{'='*20}[API CALL] {endpoint} {'='*20}")
    print(">>> REQUEST PARAMS:")
    print(json.dumps(req_data, ensure_ascii=False, indent=2) if req_data else "None")
    print("<<< RESPONSE PARAMS:")
    res_str = json.dumps(res_data, ensure_ascii=False, indent=2) if res_data else "None"
    if len(res_str) > 3000:
        res_str = res_str[:3000] + "\n...[DATA TRUNCATED IN LOG] ..."
    print(res_str)
    print("="*60 + "\n")

def _get_target_resolution(aspect_ratio, target_pixels=4687500):
    mapping = {"16:9": 16.0/9.0, "9:16": 9.0/16.0, "4:3": 4.0/3.0, "3:4": 3.0/4.0, "1:1": 1.0, "3:2": 3.0/2.0}
    r = mapping.get(aspect_ratio, 16.0/9.0)
    h = int(math.sqrt(target_pixels / r))
    w = int(r * h)
    actual_ratio = w / h if h != 0 else r
    if actual_ratio < 1/16: w = int(h / 16)
    elif actual_ratio > 16: w = int(h * 16)
    w = (w // 16) * 16; h = (h // 16) * 16
    if w == 0: w = 16
    if h == 0: h = 16
    return f"{w}x{h}"

def _get_resolution(aspect_ratio):
    mapping = {"16:9": (1280, 720), "9:16": (720, 1280), "4:3": (1024, 768), "3:4": (768, 1024), "1:1": (1024, 1024)}
    return mapping.get(aspect_ratio, (1280, 720))

def get_base64_image_uri(local_filepath: Path) -> str:
    with open(local_filepath, "rb") as f:
        b64_data = base64.b64encode(f.read()).decode("utf-8")
    ext = local_filepath.suffix.lower().replace(".", "")
    ext = "jpeg" if ext in ["jpg", "jpeg"] else ext
    return f"data:image/{ext};base64,{b64_data}"

def _normalize_scene_characters(scene: dict):
    chars = scene.get("characters")
    if isinstance(chars, list) and chars:
        scene["character_ids"] = chars[:]
    elif scene.get("speaking_character"):
        scene["characters"] =[scene["speaking_character"]]
        scene["character_ids"] = [scene["speaking_character"]]
    else:
        scene["characters"] =[]; scene["character_ids"] =[]
    if not isinstance(scene.get("props"), list):
        scene["props"] =[]

def _normalize_script_structure(script: dict):
    script.setdefault("title", "")
    script.setdefault("synopsis", "")
    script.setdefault("characters",[])
    script.setdefault("props",[])
    script.setdefault("scenes",[])
    for scene in script.get("scenes",[]):
        _normalize_scene_characters(scene)
    return script

def _collect_scene_character_ids(scene: dict):
    ids = scene.get("characters", [])[:]
    if scene.get("speaking_character"): ids.append(scene.get("speaking_character"))
    return list(dict.fromkeys([x for x in ids if x]))

def _collect_scene_prop_ids(scene: dict):
    ids = scene.get("props",[])[:]
    return list(dict.fromkeys([x for x in ids if x]))

def _public_or_base64_image(project_id: str, url: str):
    if not url: return None
    if url.startswith("/workspace/"):
        local_path = WORK_DIR / project_id / os.path.basename(url)
        return get_base64_image_uri(local_path) if local_path.exists() else None
    return url

# ============================================================
# API调用封装
# ============================================================
def call_doubao_chat(messages, model=MODEL_CHAT, temperature=0.8, response_format=None):
    url = f"{ARK_BASE_URL}/chat/completions"
    payload = {"model": model, "messages": messages, "temperature": temperature}
    if response_format: payload["response_format"] = response_format
    req = urllib.request.Request(url, data=json.dumps(payload).encode("utf-8"), method="POST")
    req.add_header("Content-Type", "application/json")
    req.add_header("Authorization", f"Bearer {ARK_API_KEY}")
    try:
        with urllib.request.urlopen(req, timeout=120) as resp:
            return json.loads(resp.read().decode("utf-8"))["choices"][0]["message"]["content"]
    except Exception as e: print(f"Doubao Chat API Error: {e}")
    return None

def _call_doubao_image(prompt, ref_images=None, size="2160x2160"):
    url = f"{ARK_BASE_URL}/images/generations"
    payload = {"model": MODEL_IMAGE, "prompt": prompt, "size": size, "output_format": "png", "watermark": False}
    if ref_images and isinstance(ref_images, list) and len(ref_images) > 0:
        payload["image"] =[img for img in ref_images if img]
    req = urllib.request.Request(url, data=json.dumps(payload).encode('utf-8'), method="POST")
    req.add_header("Content-Type", "application/json")
    req.add_header("Authorization", f"Bearer {ARK_API_KEY}")
    try:
        with urllib.request.urlopen(req, timeout=120) as resp:
            data = json.loads(resp.........完整代码请登录后点击上方下载按钮下载查看

网友评论0