python+agnesai的api实现免费ai短剧生成系统bfwStoryForge代码
代码语言:python
所属分类:其他
版本2(最新):python+agnesai的api实现免费ai短剧生成系统bfwStoryForge代码
版本1(旧版):python+阿里大模型api实现模拟小云雀ai短剧生成工具BfwStoryForge代码
代码描述:python+agnesai的api实现免费ai短剧生成系统bfwStoryForge代码,此版本是针对agnesai的免费文本、视频、图片生成和编辑大模型的api特别 定制的,只要获得一个免费的api就能免费生成 任意ai短剧,包含剧本写作、人物定妆照、道具图片生成、场景图片生成,多图参考生成分镜头、分镜图片生成视频、最后合并成短剧。
代码标签: python agnes ai api 免费 ai 短剧 生成 系统 bfwStoryForge 代
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI有声视频生成工作流系统 - Agnes-AI 商业级UI版本
"""
#apikey申请地址;https://platform.agnes-ai.com/settings/apiKeys
import os
import json
import uuid
import time
import base64
import urllib.request
import urllib.error
from datetime import datetime
from pathlib import Path
from flask import Flask, request, jsonify, send_from_directory
# ============================================================
# 配置
# ============================================================
# 请设置您的 AGNES_API_KEY 环境变量,或在下方直接替换
AGNES_API_KEY = os.environ.get("AGNES_API_KEY", "sk-")
BASE_URL = "https://apihub.agnes-ai.com"
WORK_DIR = Path("workspace")
WORK_DIR.mkdir(exist_ok=True)
PROJECTS_FILE = WORK_DIR / "projects.json"
app = Flask(__name__)
# ============================================================
# 辅助工具
# ============================================================
def log_api_call(endpoint, req_data, res_data):
print(f"\n{'='*20}[API CALL RESULT] {endpoint} {'='*20}")
print(">>> REQUEST PARAMS:")
print(json.dumps(req_data, ensure_ascii=False, indent=2) if req_data else "None")
print("<<< RESPONSE PARAMS:")
res_str = json.dumps(res_data, ensure_ascii=False, indent=2) if res_data else "None"
if len(res_str) > 3000:
res_str = res_str[:3000] + "\n...[DATA TRUNCATED IN LOG] ..."
print(res_str)
print("=" * 60 + "\n")
def _get_resolution_str(aspect_ratio):
mapping = {
"16:9": "1280x720", "9:16": "720x1280", "4:3": "1024x768",
"3:4": "768x1024", "1:1": "1024x1024", "3:2": "1080x720", "2.35:1": "1280x544"
}
return mapping.get(aspect_ratio, "1280x720")
def get_base64_image_uri(local_filepath: Path) -> str:
with open(local_filepath, "rb") as f:
b64_data = base64.b64encode(f.read()).decode("utf-8")
ext = local_filepath.suffix.lower().replace(".", "")
if ext in ["jpg", "jpeg"]:
ext = "jpeg"
elif ext == "webp":
ext = "webp"
else:
ext = "png"
return f"data:image/{ext};base64,{b64_data}"
def _normalize_scene_characters(scene: dict):
chars = scene.get("characters")
char_ids = scene.get("character_ids")
if isinstance(chars, list) and chars:
scene["character_ids"] = chars[:]
elif isinstance(char_ids, list) and char_ids:
scene["characters"] = char_ids[:]
elif scene.get("speaking_character"):
scene["characters"] = [scene.get("speaking_character")]
scene["character_ids"] = [scene.get("speaking_character")]
else:
scene["characters"] = []
scene["character_ids"] = []
if not isinstance(scene.get("props"), list):
scene["props"] = []
if not isinstance(scene.get("prop_ids"), list):
scene["prop_ids"] = scene["props"][:]
if not isinstance(scene.get("scenes"), list):
scene["scenes"] = []
if not isinstance(scene.get("scene_ids"), list):
scene["scene_ids"] = scene["scenes"][:]
if not isinstance(scene.get("character_looks"), list):
scene["character_looks"] = []
if not isinstance(scene.get("prop_variants"), list):
scene["prop_variants"] = []
scene.setdefault("transition_note", "")
scene.setdefault("continuity_note", "")
def _normalize_script_structure(script: dict):
script.setdefault("title", "")
script.setdefault("synopsis", "")
script.setdefault("characters", [])
script.setdefault("props", [])
script.setdefault("scenes", [])
script.setdefault("segment_outlines", [])
for c in script.get("characters", []):
c.setdefault("looks", [])
for lk in c.get("looks", []):
lk.setdefault("id", "")
lk.setdefault("name", "")
lk.setdefault("description", "")
lk.setdefault("usage", "")
lk.setdefault("image_url", "")
lk.setdefault("image_history", [])
for p in script.get("props", []):
p.setdefault("variants", [])
for v in p.get("variants", []):
v.setdefault("id", "")
v.setdefault("name", "")
v.setdefault("description", "")
v.setdefault("usage", "")
v.setdefault("image_url", "")
v.setdefault("image_history", [])
for s in script.get("scenes", []):
s.setdefault("id", "")
s.setdefault("name", "")
s.setdefault("description", "")
s.setdefault("usage", "")
s.setdefault("image_url", "")
s.setdefault("image_history", [])
for seg in script.get("segment_outlines", []):
seg.setdefault("characters_involved", [])
seg.setdefault("props_involved", [])
seg.setdefault("scenes_involved", [])
return script
def _collect_scene_character_ids(scene: dict):
ids = []
if isinstance(scene.get("characters"), list):
ids.extend([x for x in scene.get("characters", []) if x])
if isinstance(scene.get("character_ids"), list):
ids.extend([x for x in scene.get("character_ids", []) if x])
if scene.get("speaking_character"):
ids.append(scene.get("speaking_character"))
result = []
seen = set()
for x in ids:
if x not in seen:
seen.add(x)
result.append(x)
return result
def _collect_scene_prop_ids(scene: dict):
ids = []
if isinstance(scene.get("props"), list):
ids.extend([x for x in scene.get("props", []) if x])
if isinstance(scene.get("prop_ids"), list):
ids.extend([x for x in scene.get("prop_ids", []) if x])
result = []
seen = set()
for x in ids:
if x not in seen:
seen.add(x)
result.append(x)
return result
def _collect_scene_scene_ids(scene: dict):
ids = []
if isinstance(scene.get("scenes"), list):
ids.extend([x for x in scene.get("scenes", []) if x])
if isinstance(scene.get("scene_ids"), list):
ids.extend([x for x in scene.get("scene_ids", []) if x])
result = []
seen = set()
for x in ids:
if x not in seen:
seen.add(x)
result.append(x)
return result
def _find_character(project, char_id):
for c in project.get("characters", []):
if str(c.get("id")) == str(char_id):
return c
return None
def _find_prop(project, prop_id):
for p in project.get("props", []):
if str(p.get("id")) == str(prop_id):
return p
return None
def _find_scene(project, scene_id):
for s in project.get("scenes", []):
if str(s.get("id")) == str(scene_id):
return s
return None
def _find_character_look(project, look_id):
for c in project.get("characters", []):
for lk in c.get("looks", []):
if str(lk.get("id")) == str(look_id):
return c, lk
return None, None
def _find_prop_variant(project, variant_id):
for p in project.get("props", []):
for v in p.get("variants", []):
if str(v.get("id")) == str(variant_id):
return p, v
return None, None
def _public_or_base64_image(project_id: str, url: str):
if not url:
return None
if url.startswith("/workspace/"):
local_path = WORK_DIR / project_id / os.path.basename(url)
if local_path.exists():
return get_base64_image_uri(local_path)
return None
if url.startswith("data:image/"):
return url
return url
def combine_images_to_base64(image_list, output_path: Path):
try:
from PIL import Image
from io import BytesIO
imgs = []
for item in image_list:
try:
if item.startswith("data:image"):
b64_data = item.split(",", 1)[1]
imgs.append(Image.open(BytesIO(base64.b64decode(b64_data))).convert("RGB"))
elif os.path.exists(item):
imgs.append(Image.open(item).convert("RGB"))
except Exception as ex:
print(f"Failed to load image for combining: {ex}")
continue
if not imgs:
return None
widths, heights = zip(*(i.size for i in imgs))
total_width = sum(widths)
max_height = max(heights)
scale = 1.0
if total_width > 2048 or max_height > 2048:
scale = min(2048.0 / total_width, 2048.0 / max_height)
new_width = int(total_width * scale)
new_height = int(max_height * scale)
combined = Image.new('RGB', (new_width, new_height), (255, 255, 255))
x_offset = 0
for img in imgs:
w, h = img.size
nw, nh = int(w * scale), int(h * scale)
resized = img.resize((nw, nh), Image.Resampling.LANCZOS)
combined.paste(resized, (x_offset, 0))
x_offset += nw
combined.save(str(output_path), "JPEG", quality=85)
return get_base64_image_uri(output_path)
except Exception as e:
print(f"Combine images error: {e}")
return None
# ============================================================
# Agnes-AI 统一调用封装
# ============================================================
def call_agnes_chat(messages, model="agnes-2.0-flash", temperature=0.8, response_format=None):
print(f"\n{'='*25} [LLM PROMPT SUBMITTED TO AGNES] {'='*25}")
print(f"Model: {model} | Temperature: {temperature}")
print("-" * 72)
for msg in messages:
role = msg.get("role", "unknown").upper()
content = msg.get("content", "")
print(f"[{role}]:\n{content}\n")
print("=" * 72 + "\n")
url = f"{BASE_URL}/v1/chat/completions"
payload = {"model": model, "messages": messages, "temperature": temperature}
if response_format:
payload["response_format"] = response_format
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", f"Bearer {AGNES_API_KEY}")
try:
with urllib.request.urlopen(req, timeout=120) as resp:
result = json.loads(resp.read().decode("utf-8"))
return result["choices"][0]["message"]["content"]
except urllib.error.HTTPError as e:
print(f"Agnes Chat API HTTP Error {e.code}: {e.read().decode('utf-8')}")
return None
except Exception as e:
print(f"Agnes Chat API Error: {e}")
return None
def call_agnes_image_gen(prompt, size="1024x1024", ref_images=None):
url = f"{BASE_URL}/v1/images/generations"
headers = {
"Authorization": f"Bearer {AGNES_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": "agnes-image-2.1-flash",
"prompt": prompt,
"size": size
}
if ref_images:
# 图生图或多图合成时提供图像
payload["extra_body"] = {
"image": ref_images,
"response_format": "url"
}
try:
req = urllib.request.Request(url, data=json.dumps(payload).encode('utf-8'), headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=120) as resp:
res_data = json.loads(resp.read().decode('utf-8'))
return res_data["data"][0]["url"]
except urllib.error.HTTPError as e:
print(f"Agnes Image API HTTP Error {e.code}: {e.read().decode('utf-8')}")
return None
except Exception as e:
print(f"Agnes Image API Error: {e}")
return None
def _submit_and_poll_agnes_video_task(payload, timeout=300, poll_interval=4):
headers = {
"Authorization": f"Bearer {AGNES_API_KEY}",
"Content-Type": "application/json"
}
url = f"{BASE_URL}/v1/videos"
try:
req = urllib.request.Request(url, data=json.dumps(payload).encode('utf-8'), headers=headers, method="POST")
with urllib.reque.........完整代码请登录后点击上方下载按钮下载查看














网友评论0