python+html实现多步骤多项目ai漫剧电影短片生成可视化编辑框架bfwshotmaker代码

代码语言:python

所属分类:其他

代码描述:python+html实现多步骤多项目ai漫剧电影短片生成可视化编辑框架bfwshotmaker代码,这个版本主要是框架流程,具体的生图与声音视频生成需要调用api,可以调用阿里或豆包的seedance2来实现。主要流程是:主题 → 剧本 → 角色 → 音色 → 画面风格 → 分镜画面 → 视频片段 → 合成

代码标签: python html 多步骤 多项目 ai 漫剧 电影 短片 生成 可视化 编辑 框架 bfwsh

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

#!/usr/bin/env python3
"""
AI有声视频生成工作流系统
完整流程:项目管理->主题->剧本(可视化编辑)->角色设定(历史可选)->音色试听(历史可选)->分镜头(历史可选)->视频片段(历史可选)->合成MP4
"""

import os
import json
import uuid
import time
import base64
import traceback
from datetime import datetime
from pathlib import Path
from flask import Flask, request, jsonify, send_file, send_from_directory

# ============================================================
# 配置
# ============================================================
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "sk-")
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1")
WORK_DIR = Path("workspace")
WORK_DIR.mkdir(exist_ok=True)
PROJECTS_FILE = WORK_DIR / "projects.json"

app = Flask(__name__)

# ============================================================
# OpenAI 调用封装
# ============================================================
def call_openai_chat(messages, model="qwen-plus", temperature=0.8, response_format=None):
    """调用OpenAI Chat API"""
    import urllib.request
    url = f"{OPENAI_BASE_URL}/chat/completions"
    payload = {
        "model": model,
        "messages": messages,
        "temperature": temperature,
    }
    if response_format:
        payload["response_format"] = response_format

    data = json.dumps(payload).encode("utf-8")
    req = urllib.request.Request(url, data=data, method="POST")
    req.add_header("Content-Type", "application/json")
    req.add_header("Authorization", f"Bearer {OPENAI_API_KEY}")

    try:
        with urllib.request.urlopen(req, timeout=120) as resp:
            result = json.loads(resp.read().decode("utf-8"))
            return result["choices"][0]["message"]["content"]
    except Exception as e:
        print(f"OpenAI API Error: {e}")
        traceback.print_exc()
        return None

# ============================================================
# API后端 - 预留接口(模拟实现)
# ============================================================
class AIMediaAPI:
    """AI媒体生成API封装 - 预留接口"""

    @staticmethod
    def generate_character_image(character_description: str, style: str, output_dir: Path) -> str:
        img_path = output_dir / f"char_{uuid.uuid4().hex[:8]}.png"
        _create_placeholder_image(img_path, f"[{style}] " + character_description)
        return str(img_path)

    @staticmethod
    def generate_voice_sample(voice_description: str, sample_text: str, output_dir: Path) -> str:
        audio_path = output_dir / f"voice_{uuid.uuid4().hex[:8]}.wav"
        _create_placeholder_audio(audio_path)
        return str(audio_path)

    @staticmethod
    def generate_storyboard_image(scene_description: str, style: str, character_images: dict, output_dir: Path) -> str:
        img_path = output_dir / f"scene_{uuid.uuid4().hex[:8]}.png"
        _create_placeholder_image(img_path, f"[{style}] " + scene_description)
        return str(img_path)

    @staticmethod
    def generate_video_clip(scene_image: str, video_description: str, dialogue_text: str, voice_id: str, duration: float, output_dir: Path) -> str:
        video_path = output_dir / f"clip_{uuid.uuid4().hex[:8]}.mp4"
        _create_placeholder_video(video_path, duration)
        return str(video_path)

    @staticmethod
    def merge_videos(video_clips: list, output_path: str) -> str:
        try:
            return _merge_with_ffmpeg(video_clips, output_path)
        except Exception as e:
            print(f"FFmpeg merge failed: {e}, using simple concatenation")
            return _simple_merge(video_clips, output_path)

def _create_placeholder_image(path, text="placeholder"):
    try:
        from PIL import Image, ImageDraw, ImageFont
        img = Image.new('RGB', (768, 512), color=(40, 40, 60))
        draw = ImageDraw.Draw(img)
        short_text = text[:100] if len(text) > 100 else text
        lines = [short_text[i:i+30] for i in range(0, len(short_text), 30)]
        y = 200
        for line in lines:
            draw.text((50, y), line, fill=(200, 200, 220))
            y += 25
        draw.rectangle([(10, 10), (758, 502)], outline=(100, 100, 140), width=2)
        draw.text((300, 30), "AI Generated Image", fill=(150, 150, 180))
        img.save(str(path), "PNG")
    except ImportError:
        import struct, zlib
        def create_minimal_png():
            signature = b'\x89PNG\r\n\x1a\n'
            ihdr_data = struct.pack('>IIBBBBB', 100, 60, 8, 2, 0, 0, 0)
            ihdr_crc = struct.pack('>I', zlib.crc32(b'IHDR' + ihdr_data) & 0xffffffff)
            ihdr = struct.pack('>I', 13) + b'IHDR' + ihdr_data + ihdr_crc
            raw = b''
            for y_pos in range(60):
                raw += b'\x00' + b'\x28\x28\x3c' * 100
            compressed = zlib.compress(raw)
            idat_crc = struct.pack('>I', zlib.crc32(b'IDAT' + compressed) & 0xffffffff)
            idat = struct.pack('>I', len(compressed)) + b'IDAT' + compressed + idat_crc
            iend_crc = struct.pack('>I', zlib.crc32(b'IEND') & 0xffffffff)
            iend = struct.pack('>I', 0) + b'IEND' + iend_crc
            return signature + ihdr + idat + iend
        with open(str(path), 'wb') as f:
            f.write(create_minimal_png())

def _create_placeholder_audio(path, duration=3.0):
    import struct
    sample_rate = 22050
    num_samples = int(sample_rate * duration)
    import math
    samples =[]
    for i in range(num_samples):
        t = i / sample_rate
        val = int(16000 * math.sin(2 * math.pi * 440 * t) * max(0, 1 - t / duration))
        samples.append(struct.pack('<h', max(-32768, min(32767, val))))
    audio_data = b''.join(samples)
    with open(str(path), 'wb') as f:
        f.write(b'RIFF')
        f.write(struct.pack('<I', 36 + len(audio_data)))
        f.write(b'WAVE')
        f.write(b'fmt ')
        f.write(struct.pack('<I', 16))
        f.write(struct.pack('<HHIIHH', 1, 1, sample_rate, sample_rate * 2, 2, 16))
        f.write(b'data')
        f.write(struct.pack('<I', len(audio_data)))
        f.write(audio_data)

def _create_placeholder_video(path, duration=5.0):
    try:
        import subprocess
        cmd =[
            'ffmpeg', '-y', '-f', 'lavfi', '-i',
            f'color=c=0x28283c:s=768x512:d={duration}:r=24',
            '-f', 'lavfi', '-i', f'sine=frequency=440:duration={duration}',
            '-c:v', 'libx264', '-preset', 'ultrafast', '-pix_fmt', 'yuv420p',
            '-c:a', 'aac', '-b:a', '128k',
            '-shortest', str(path)
        ]
        subprocess.run(cmd, capture_output=True, timeout=30)
        if Path(path).exists():
            return
    except:
        pass
    with open(str(path), 'wb') as f:
        f.write(b'\x00\x00\x00\x1c\x66\x74\x79\x70\x69\x73\x6f\x6d')
        f.write(b'\x00' * 500)

def _merge_with_ffmpeg(video_clips, output_path):
    import subprocess
    list_file = WORK_DIR / f"concat_{uuid.uuid4().hex[:8]}.txt"
    with open(str(list_file), 'w') as f:
        for clip in video_clips:
            f.write(f"file '{os.path.abspath(clip)}'\n")
    cmd =[
        'ffmpeg', '-y', '-f', 'concat', '-safe', '0',
        '-i', str(list_file),
        '-c', 'copy', output_path
    ]
    result = subprocess.run(cmd, capture_output=True, timeout=120)
    if result.returncode != 0:
        cmd =[
            'ffmpeg', '-y', '-f', 'concat', '-safe', '0',
            &.........完整代码请登录后点击上方下载按钮下载查看

网友评论0