python实时识别视频mp4翻译显示字幕ai视频播放器代码

代码语言:python

所属分类:人工智能

代码描述:python实时识别视频mp4翻译显示字幕ai视频播放器代码,一边播放视频,一边实时识别视频中的说话声音翻译为字幕实时显示的ai视频播放器,无需单独翻译字幕,直接边播放视频边识别翻译显示字幕。

代码标签: python 实时 识别 mp4 翻译 显示 字幕 ai 视频 播放器 代码

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

#!/usr/local/python3/bin/python3
# -*- coding: utf-8 -*
# -*- coding: utf-8 -*-
import time
import threading
import queue
import sys
from collections import deque
from dataclasses import dataclass
from typing import List, Tuple, Optional

import numpy as np
import av
import cv2
import sounddevice as sd
from faster_whisper import WhisperModel

# 兼容不同 PyAV 版本的 AudioResampler
try:
    AudioResampler = av.audio.resampler.AudioResampler  # type: ignore[attr-defined]
except Exception:
    AudioResampler = av.AudioResampler  # type: ignore[attr-defined]


# ----------------------------- utils -----------------------------
def text_wrap_by_width(text: str, font, font_scale: float, thickness: int, max_width: int) -> List[str]:
    # 简单英文按单词换行
    words = text.strip().split()
    lines = []
    line = ""
    for w in words:
        candidate = (line + " " + w).strip()
        (w_width, _), _ = cv2.getTextSize(candidate, font, font_scale, thickness)
        if w_width <= max_width or not line:
            line = candidate
        else:
            lines.append(line)
            line = w
    if line:
        lines.append(line)
    return lines


def draw_subtitle(frame: np.ndarray, text: str, margin=18, font_scale=1.0, thickness=2) -> np.ndarray:
    # 在底部绘制半透明黑底 + 白字字幕
    if not text:
        return frame
    h, w = frame.shape[:2]
    font = cv2.FONT_HERSHEY_SIMPLEX
    max_text_width = int(w * 0.9)

    lines = text_wrap_by_width(text, font, font_scale, thickness, max_text_width)
    if not lines:
        return frame

    # 文本区域高度
    line_height = int(24 * font_scale) + 4
    block_height = len(lines) * line_height + margin * 2
    y0 = h - block_height

    # 半透明底
    overlay = frame.copy()
    cv2.rectangle(overlay, (0, y0), (w, h), (0, 0, 0), -1)
    alpha = 0.5
    frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)

    # 逐行放字
    y = y0 + margin + line_height
    for line in lines:
        (tw, th), _ = cv2.getTextSize(line, font, font_scale, thickness)
        x = (w - tw) // 2
        cv2.putText(frame, line, (x, y), font, font_scale, (255, 255, 255), thickness, cv2.LINE_AA)
        y += line_height

    return frame


def probe_media_duration_seconds(path: str) -> Optional[float]:
    """
    尽力探测媒体总时长(秒)。先读容器,再回退到各流。
    可能存在容器不提供准确时长的情况。
    """
    try:
        container = av.open(path)
    except av.AVError:
        return None

    best = None

    # 容器级时长(单位为 av.time_base)
    try:
        dur = getattr(container, "duration", None)
        if dur and isinstance(dur, (int, np.integer)) and dur > 0:
            tb = getattr(av, "time_base", 1.0 / 1000000.0)
            best = float(dur * tb)
    except Exception:
        pass

    # 各流级时长
    try:
        for s in container.streams:
            d = getattr(s, "duration", None)
            tb = getattr(s, "time_base", None)
            if d is not None and tb:
                try:
                    sec = float(d * tb)
                    if sec and sec > 0:
                        if best is None or sec > best:
                            best = sec
                except Exception:
                    pass
    except Exception:
        pass

    # 进一步尝试:视频帧/音频采样估计(尽力)
    try:
        if best is None:
            v = next((s for s in container.streams if s.type == "video"), None)
            if v and getattr(v, "average_rate", None) and getattr(v, "frames", 0):
                try:
                    fps = float(v.average_rate)
                    frames = int(v.frames)
                    if fps > 0 and frames > 0:
                        best = frames / fps
                except Exception:
                    pass
    except Exception:
        pass

    try:
        if best is None:
            a = next((s for s in container.streams if s.type == "audio"), None)
            if a and getattr(a, "rate", None) and getattr(a, "frames", 0):
                try:
                    sr = float(a.rate)
                    frames = int(a.frames)
                    if sr > 0 and frames > 0:
                        best = frames / sr
                except Exception:
                    pass
    except Exception:
        pass

    try:
        container.close()
    except Exception:
        pass
    return best


# ------------------------ audio buffer for ASR --------------------
class ASRAudioBuffer:
    """
    Ring buffer for 16k mono float32 audio aligned to media time.
    支持按绝对媒体时间获取任意窗口(避免仅看尾窗导致漏识别)。
    """
    def __init__(self, sr=16000, keep_seconds=240):
        self.sr = sr
        self.keep_seconds = keep_seconds
        self.chunks: deque[np.ndarray] = deque()  # 每个为 float32 (N,)
        self.total_samples_accum = 0  # 自开始累计的总样本数
        self.first_sample_time_media: Optional[float] = None
        self.lock = threading.Lock()

    def append(self, samples: np.ndarray, start_time_media_if_first: Optional[float] = None):
        assert samples.ndim == 1
        samples = samples.astype(np.float32, copy=False)
        with self.lock:
            if self.first_sample_time_media is None and start_time_media_if_first is not None:
                self.first_sample_time_media = float(start_time_media_if_first)
            self.chunks.append(samples)
            self.total_samples_accum += len(samples)
            self._trim_old_locked()

    def _trim_old_locked(self):
        # 保留最近 keep_seconds 的音频
        want = int(self.keep_seconds * self.sr)
        kept = sum(len(c) for c in self.chunks)
        while kept > want and self.chunks:
            drop = len(self.chunks[0])
       .........完整代码请登录后点击上方下载按钮下载查看

网友评论0