python+agnesai的api实现免费ai短剧生成系统bfwStoryForge代码

代码语言:python

所属分类:其他

版本2(最新):python+agnesai的api实现免费ai短剧生成系统bfwStoryForge代码

版本1(旧版):python+阿里大模型api实现模拟小云雀ai短剧生成工具BfwStoryForge代码

代码描述:python+agnesai的api实现免费ai短剧生成系统bfwStoryForge代码,此版本是针对agnesai的免费文本、视频、图片生成和编辑大模型的api特别 定制的,只要获得一个免费的api就能免费生成 任意ai短剧,包含剧本写作、人物定妆照、道具图片生成、场景图片生成,多图参考生成分镜头、分镜图片生成视频、最后合并成短剧。

代码标签: python agnes ai api 免费 ai 短剧 生成 系统 bfwStoryForge

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

#apikey申请地址;https://platform.agnes-ai.com/settings/apiKeys

import os
import json
import uuid
import time
import base64
import threading
import urllib.request
import urllib.error
from datetime import datetime
from pathlib import Path
from flask import Flask, request, jsonify, send_from_directory

# ============================================================
# 配置
# ============================================================
AGNES_API_KEY = os.environ.get("AGNES_API_KEY",  "sk-")
BASE_URL = "https://apihub.agnes-ai.com"

WORK_DIR = Path("workspace")
WORK_DIR.mkdir(exist_ok=True)
PROJECTS_FILE = WORK_DIR / "projects.json"

app = Flask(__name__)

# 线程锁:保障高并发环境下磁盘写入与全局状态更新的安全
projects_lock = threading.Lock()


# ============================================================
# 底层基础工具函数 与 辅助工具 (置于类定义之前,防止 NameError)
# ============================================================
def log_api_call(endpoint, req_data, res_data):
    """
    记录 API 请求与响应日志
    """
    print(f"\n{'='*20}[API CALL RESULT] {endpoint} {'='*20}")
    print(">>> REQUEST PARAMS:")
    print(json.dumps(req_data, ensure_ascii=False, indent=2) if req_data else "None")
    print("<<< RESPONSE PARAMS:")
    res_str = json.dumps(res_data, ensure_ascii=False, indent=2) if res_data else "None"
    if len(res_str) > 3000:
        res_str = res_str[:3000] + "\n...[DATA TRUNCATED IN LOG] ..."
    print(res_str)
    print("=" * 60 + "\n")


def _get_resolution(aspect_ratio):
    mapping = {
        "16:9": (1280, 720), "9:16": (720, 1280), "4:3": (1024, 768),
        "3:4": (768, 1024), "1:1": (1024, 1024), "3:2": (1080, 720), "2.35:1": (1280, 544)
    }
    return mapping.get(aspect_ratio, (1280, 720))


def _get_resolution_str(aspect_ratio):
    mapping = {
        "16:9": "1280x720", "9:16": "720x1280", "4:3": "1024x768",
        "3:4": "768x1024", "1:1": "1024x1024", "3:2": "1080x720", "2.35:1": "1280x544"
    }
    return mapping.get(aspect_ratio, "1280x720")


def _filter_history(history_list):
    """
    历史记录清洁工具:过滤掉本地降级生成所遗留的 /workspace/ 虚拟路径,确保只向 history 链条添加原始远程 HTTP(S) 网址
    """
    if not history_list or not isinstance(history_list, list):
        return []
    return [str(url) for url in history_list if url and str(url).startswith("http")]


def _normalize_scene_characters(scene: dict):
    chars = scene.get("characters")
    char_ids = scene.get("character_ids")
    if isinstance(chars, list) and chars:
        scene["character_ids"] = chars[:]
    elif isinstance(char_ids, list) and char_ids:
        scene["characters"] = char_ids[:]
    elif scene.get("speaking_character"):
        scene["characters"] = [scene.get("speaking_character")]
        scene["character_ids"] = [scene.get("speaking_character")]
    else:
        scene["characters"] = []
        scene["character_ids"] = []
    if not isinstance(scene.get("props"), list):
        scene["props"] = []
    if not isinstance(scene.get("prop_ids"), list):
        scene["prop_ids"] = scene["props"][:]
    if not isinstance(scene.get("scenes"), list):
        scene["scenes"] = []
    if not isinstance(scene.get("scene_ids"), list):
        scene["scene_ids"] = scene["scenes"][:]
    if not isinstance(scene.get("character_looks"), list):
        scene["character_looks"] = []
    if not isinstance(scene.get("prop_variants"), list):
        scene["prop_variants"] = []
    scene.setdefault("transition_note", "")
    scene.setdefault("continuity_note", "")


def _normalize_script_structure(script: dict):
    script.setdefault("title", "")
    script.setdefault("synopsis", "")
    script.setdefault("characters", [])
    script.setdefault("props", [])
    script.setdefault("scenes", [])
    script.setdefault("segment_outlines", [])
    for c in script.get("characters", []):
        c.setdefault("looks", [])
        for lk in c.get("looks", []):
            lk.setdefault("id", "")
            lk.setdefault("name", "")
            lk.setdefault("description", "")
            lk.setdefault("usage", "")
            lk.setdefault("image_url", "")
            lk.setdefault("image_history", [])
    for p in script.get("props", []):
        p.setdefault("variants", [])
        for v in p.get("variants", []):
            v.setdefault("id", "")
            v.setdefault("name", "")
            v.setdefault("description", "")
            v.setdefault("usage", "")
            v.setdefault("image_url", "")
            v.setdefault("image_history", [])
    for s in script.get("scenes", []):
        s.setdefault("id", "")
        s.setdefault("name", "")
        s.setdefault("description", "")
        s.setdefault("usage", "")
        s.setdefault("image_url", "")
        s.setdefault("image_history", [])
    for seg in script.get("segment_outlines", []):
        seg.setdefault("characters_involved", [])
        seg.setdefault("props_involved", [])
        seg.setdefault("scenes_involved", [])
    return script


def _collect_scene_character_ids(scene: dict):
    ids = []
    if isinstance(scene.get("characters"), list):
        ids.extend([x for x in scene.get("characters", []) if x])
    if isinstance(scene.get("character_ids"), list):
        ids.extend([x for x in scene.get("character_ids", []) if x])
    if scene.get("speaking_character"):
        ids.append(scene.get("speaking_character"))
    result = []
    seen = set()
    for x in ids:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result


def _collect_scene_prop_ids(scene: dict):
    ids = []
    if isinstance(scene.get("props"), list):
        ids.extend([x for x in scene.get("props", []) if x])
    if isinstance(scene.get("prop_ids"), list):
        ids.extend([x for x in scene.get("prop_ids", []) if x])
    result = []
    seen = set()
    for x in ids:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result


def _collect_scene_scene_ids(scene: dict):
    ids = []
    if isinstance(scene.get("scenes"), list):
        ids.extend([x for x in scene.get("scenes", []) if x])
    if isinstance(scene.get("scene_ids"), list):
        ids.extend([x for x in scene.get("scene_ids", []) if x])
    result = []
    seen = set()
    for x in ids:
        if x not in seen:
            seen.add(x)
            result.append(x)
    return result


def _find_character(project, char_id):
    for c in project.get("characters", []):
        if str(c.get("id")) == str(char_id):
            return c
    return None


def _find_prop(project, prop_id):
    for p in project.get("props", []):
        if str(p.get("id")) == str(prop_id):
            return p
    return None


def _find_scene(project, scene_id):
    for s in project.get("scenes", []):
        if str(s.get("id")) == str(scene_id):
            return s
    return None


def _find_character_look(project, look_id):
    for c in project.get("characters", []):
        for lk in c.get("looks", []):
            if str(lk.get("id")) == str(look_id):
                return c, lk
    return None, None


def _find_prop_variant(project, variant_id):
    for p in project.get("props", []):
        for v in p.get("variants", []):
            if str(v.get("id")) == str(variant_id):
                return p, v
    return None, None


def _ensure_local_video(url, project_id, output_dir):
    """
    延迟下载:只有在真正需要拼接合并视频时,才从远程 Agnes 链接中缓存至本地。
    """
    if not url:
        return None
    if url.startswith("/workspace/"):
        local_path = WORK_DIR / project_id / os.path.basename(url)
        if local_path.exists():
            return str(local_path)
        return None
    # 外部高清晰度有声 HTTPS 链接下载
    local_name = f"downloaded_{uuid.uuid4().hex[:8]}.mp4"
    local_path = output_dir / local_name
    try:
        print(f"Downloading remote video clip: {url} -> {local_path}")
        urllib.request.urlretrieve(url, str(local_path))
        return str(local_path)
    except Exception as e:
        print(f"Failed to retrieve remote clip {url}: {e}")
        return None


# ============================================================
# Agnes-AI API 统一调用封装
# ============================================================
def call_agnes_chat(messages, model="agnes-2.0-flash", temperature=0.8, response_format=None):
    print(f"\n{'='*25} [LLM PROMPT SUBMITTED TO AGNES] {'='*25}")
    print(f"Model: {model} | Temperature: {temperature}")
    print("-" * 72)
    for msg in messages:
        role = msg.get("role", "unknown").upper()
        content = msg.get("content", "")
        print(f"[{role}]:\n{content}\n")
    print("=" * 72 + "\n")

    url = f"{BASE_URL}/v1/chat/completions"
    payload = {"model": model, "messages": messages, "temperature": temperature}
    if response_format:
        payload["response_format"] = response_format
    data = json.dumps(payload).encode("utf-8")
    req = urllib.request.Request(url, data=data, method="POST")
    req.add_header("Content-Type", "application/json")
    req.add_header("Authorization", f"Bearer {AGNES_API_KEY}")
    try:
        with urllib.request.urlopen(req, timeout=120) as resp:
            result = json.loads(resp.read().decode("utf-8"))
            return result["choices"][0]["message"]["content"]
    except urllib.error.HTTPError as e:
        print(f"Agnes Chat API HTTP Error {e.code}: {e.read().decode('utf-8')}")
        return None
    except Exception as e:
        print(f"Agnes Chat API Error: {e}")
        return None


def call_agnes_image_gen(prompt, size="1024x1024", ref_images=None):
    url = f"{BASE_URL}/v1/images/generations"
    headers = {
        "Authorization": f"Bearer {AGNES_API_KEY}",
        "Content-Type": "application/json"
    }
    payload = {
        "model": "agnes-image-2.1-flash",
        "prompt": prompt,
        "size": size
    }
    if ref_images:
        payload["extra_body"] = {
            "image": ref_images,
            "response_format": "url"
        }
    try:
        req = urllib.request.Request(url, data=json.dumps(payload).encode('utf-8'), headers=headers, method="POST")
        with urllib.request.urlopen(req, timeout=120) as resp:
            res_data = json.loads(resp.read().decode('utf-8'))
            return res_data["data"][0]["url"]
    except urllib.error.HTTPError as e:
        print(f"Agnes Image API HTTP Error {e.code}: {e.read().decode('utf-8')}")
        return None
    except Exception as e:
        print(f"Agnes Image API Error: {e}")
        return None


def _submit_and_poll_agnes_video_task(payload, timeout=300, poll_interval=4):
    headers = {
        "Authorization": f"Bearer {AGNES_API_KEY}",
        "Content-Type": "application/json"
    }
    url = f"{BASE_URL}/v1/videos"
    try:
        req = urllib.request.Request(url, data=json.dumps(payload).encode('utf-8'), headers=headers, method="POST")
        with urllib.request.urlopen(req, timeout=30) as resp:
            res_data = json.loads(resp.read().decode('utf-8'))
            task_id = res_data.get("id") or res_data.get("task_id")
            if not task_id:
                print(f"Agnes Video Task submit failed: {res_data}")
                return None
            print(f"Agnes Video Task submitted: {task_id}")
    except urllib.error.HTTPError as e:
        print(f"Agnes Video submit HTTP Error {e.code}: {e.read().decode('utf-8')}")
        return None
    except Exception as e:
        print(f"Agnes Video submit error: {e}")
        return None

    start_time = time.time()
    poll_url = f"{BASE_URL}/v1/videos/{task_id}"
    poll_headers = {"Authorization": f"Bearer {AGNES_API_KEY}"}
    while time.tim.........完整代码请登录后点击上方下载按钮下载查看

网友评论0