#!/usr/bin/env python3 """ 音频自动合并脚本 - 腾讯云 TTS 克隆音频 根据音频参数,将多个克隆音频和 BGM 混合并压制到视频中 核心功能: 1. 智能音频处理策略(填充/直接覆盖/提速) 2. 防爆音优化(淡入淡出、压缩、限幅) 3. BGM 背景音乐混合 4. 链式 atempo 处理(突破 FFmpeg 0.5-2.0 限制) 5. 音频压制到视频 """ import logging import math import os import subprocess from dataclasses import dataclass from typing import Dict, List, Optional # 使用 process_worker 的 logger logger = logging.getLogger('process_worker') # ============================================================================ # 常量定义 # ============================================================================ SAFETY_MARGIN = 0.01 # 安全间隙,单位秒 FADE_DURATION = 0.15 # 淡入淡出时长,单位秒 VOLUME_LEVEL = 0.95 # 预降音量级别 COMPRESSOR_THRESHOLD = -12 # 压缩器阈值(dB) COMPRESSOR_RATIO = 4 # 压缩比 LIMITER_LEVEL = 0.95 # 限幅器级别 MAX_SPEED_RATIO = 4.0 # 最大加速倍数,防止极端加速 # ============================================================================ # 数据类定义 # ============================================================================ @dataclass class AudioParam: """音频参数""" start_secs: float # 开始秒(必填) end_secs: float # 结束秒(必填) clone_audio_path: str # 克隆后音频地址(必填) original_audio_length: float # 原始音频长度(必填) clone_audio_length: float # 克隆后音频长度(必填) audio_sort_num: int # 音频序号(必填) def __post_init__(self): """验证参数""" if not self.clone_audio_path: raise ValueError("clone_audio_path 不能为空") if not os.path.exists(self.clone_audio_path): raise FileNotFoundError(f"音频文件不存在: {self.clone_audio_path}") if self.start_secs < 0: raise ValueError(f"start_secs 必须非负,实际值: {self.start_secs}") if self.end_secs <= self.start_secs: raise ValueError(f"end_secs 必须大于 start_secs,start_secs: {self.start_secs}, end_secs: {self.end_secs}") if self.original_audio_length <= 0: raise ValueError(f"original_audio_length 必须大于0,实际值: {self.original_audio_length}") if self.clone_audio_length <= 0: raise ValueError(f"clone_audio_length 必须大于0,实际值: {self.clone_audio_length}") if self.audio_sort_num < 0: raise ValueError(f"audio_sort_num 必须非负,实际值: {self.audio_sort_num}") @dataclass class AudioMerge: """音频合并参数""" output_path: str # 输出路径(必填) bgm_path: str # bgm音频路径(必填) input_path: str # 输入路径(必填) input_type: str = "video" # audio, video speed_strategy: str = "max" # 音频策略:max(默认),mix,normal(可选) audio_params: List[AudioParam] = None # AudioParam数组(必填) def __post_init__(self): """验证参数""" if not self.output_path: raise ValueError("output_path 不能为空") if not self.bgm_path: raise ValueError("bgm_path 不能为空") if not os.path.exists(self.bgm_path): raise FileNotFoundError(f"BGM文件不存在: {self.bgm_path}") if not self.input_path: raise ValueError("input_path 不能为空") if not os.path.exists(self.input_path): raise FileNotFoundError(f"输入文件不存在: {self.input_path}") # 校验输出路径和输入路径必须不同 output_abs = os.path.abspath(self.output_path) input_abs = os.path.abspath(self.input_path) if output_abs == input_abs: raise ValueError(f"output_path 和 input_path 不能相同: {output_abs}") if not self.audio_params or len(self.audio_params) == 0: raise ValueError("audio_params 不能为空") if self.speed_strategy not in ["mix", "normal", "max"]: raise ValueError(f"speed_strategy 必须是 mix/normal/max 之一,实际值: {self.speed_strategy}") # 按序号排序 self.audio_params = sorted(self.audio_params, key=lambda x: x.audio_sort_num) # ============================================================================ # 工具函数 # ============================================================================ def get_audio_duration(audio_path: str) -> float: """使用 ffprobe 获取音频文件的时长""" cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', audio_path ] try: result = subprocess.check_output( cmd, stderr=subprocess.STDOUT, timeout=30 # 30 秒超时 ) return float(result.decode().strip()) except subprocess.TimeoutExpired: raise Exception(f"获取音频时长超时: {audio_path}") except subprocess.CalledProcessError as e: error_output = e.output.decode() if e.output else "未知错误" raise Exception(f"获取音频时长失败: {audio_path}\n{error_output}") def build_atempo_chain(speed_ratio: float) -> str: """构建 atempo 滤镜链,处理超出 [0.5, 2.0] 范围的速度调整""" if speed_ratio == 1.0: return "" if 0.5 <= speed_ratio <= 2.0: return f"atempo={speed_ratio:.6f}," if speed_ratio < 0.5: stages = int(math.ceil(math.log(speed_ratio) / math.log(0.5))) final_ratio = speed_ratio / (0.5 ** (stages - 1)) return "atempo=0.5," * (stages - 1) + f"atempo={final_ratio:.6f}," stages = int(math.ceil(math.log(speed_ratio) / math.log(2.0))) final_ratio = speed_ratio / (2.0 ** (stages - 1)) return "atempo=2.0," * (stages - 1) + f"atempo={final_ratio:.6f}," # ============================================================================ # 音频策略计算 # ============================================================================ def calculate_audio_strategy( audio_duration: float, srt_duration: float, next_gap: Optional[float], speed_strategy: str = 'max', start_time: float = 0.0, end_time: float = 0.0 ) -> Dict: """计算音频处理策略""" next_gap_val = next_gap if next_gap is not None else float('inf') if speed_strategy == 'mix': clone_ratio = audio_duration / srt_duration if srt_duration > 0 else 0 description = ( f'[mix] 保持原音 | 原始: {srt_duration:.3f}s | 克隆: {audio_duration:.3f}s ({clone_ratio:.3f}x) | 处理后: {audio_duration:.3f}s | ' f'速度: {1.0:.3f}x (克隆/处理后 = {audio_duration:.3f}/{audio_duration:.3f}) | ' f'时间轴: {start_time:.3f}s -> {end_time:.3f}s | 超出部分会混音' ) return { 'strategy': 'direct', 'speed_ratio': 1.0, 'target_duration': audio_duration, 'actual_duration': audio_duration, 'description': description } if speed_strategy == 'normal': target_dur = srt_duration + SAFETY_MARGIN if audio_duration <= target_dur: clone_ratio = audio_duration / srt_duration if srt_duration > 0 else 0 description = ( f'[normal] 直接使用 | 原始: {srt_duration:.3f}s | 克隆: {audio_duration:.3f}s ({clone_ratio:.3f}x) | 处理后: {audio_duration:.3f}s | ' f'速度: {1.0:.3f}x (克隆/处理后 = {audio_duration:.3f}/{audio_duration:.3f}) | ' f'时间轴: {start_time:.3f}s -> {end_time:.3f}s | 未超出字幕时长' ) return { 'strategy': 'direct', 'speed_ratio': 1.0, 'target_duration': audio_duration, 'actual_duration': audio_duration, 'description': description } speed_ratio = audio_duration / target_dur # 限制最大加速倍数为4倍 if speed_ratio > MAX_SPEED_RATIO: original_target_dur = target_dur original_speed_ratio = speed_ratio logger.warning( f'⚠️ 加速倍数超过限制 | 原始加速: {original_speed_ratio:.3f}x | ' f'已限制为: {MAX_SPEED_RATIO}x | 音频时长: {audio_duration:.3f}s | ' f'目标时长: {original_target_dur:.3f}s -> {audio_duration / MAX_SPEED_RATIO:.3f}s | ' f'时间轴: {start_time:.3f}s -> {end_time:.3f}s' ) speed_ratio = MAX_SPEED_RATIO target_dur = audio_duration / MAX_SPEED_RATIO clone_ratio = audio_duration / srt_duration if srt_duration > 0 else 0 description = ( f'[normal] 提速到结束 | 原始: {srt_duration:.3f}s | 克隆: {audio_duration:.3f}s ({clone_ratio:.3f}x) | 处理后: {target_dur:.3f}s | ' f'速度: {speed_ratio:.3f}x (克隆/处理后 = {audio_duration:.3f}/{target_dur:.3f}) | ' f'时间轴: {start_time:.3f}s -> {end_time:.3f}s' ) return { 'strategy': 'speedup', 'speed_ratio': speed_ratio, 'target_duration': target_dur, 'actual_duration': audio_duration, 'description': description } if speed_strategy == 'max': max_available_dur = srt_duration + next_gap_val if audio_duration <= max_available_dur: clone_ratio = audio_duration / srt_duration if srt_duration > 0 else 0 description = ( f'[max] 直接使用 | 原始: {srt_duration:.3f}s | 克隆: {audio_duration:.3f}s ({clone_ratio:.3f}x) | 处理后: {audio_duration:.3f}s | ' f'速度: {1.0:.3f}x (克隆/处理后 = {audio_duration:.3f}/{audio_duration:.3f}) | ' f'时间轴: {start_time:.3f}s -> {end_time:.3f}s | 间隙: {next_gap_val:.3f}s' ) return { 'strategy': 'direct', 'speed_ratio': 1.0, 'target_duration': audio_duration, 'actual_duration': audio_duration, 'description': description } target_dur = max_available_dur - SAFETY_MARGIN speed_ratio = audio_duration / target_dur # 限制最大加速倍数为4倍 if speed_ratio > MAX_SPEED_RATIO: original_target_dur = target_dur original_speed_ratio = speed_ratio logger.warning( f'⚠️ 加速倍数超过限制 | 原始加速: {original_speed_ratio:.3f}x | ' f'已限制为: {MAX_SPEED_RATIO}x | 音频时长: {audio_duration:.3f}s | ' f'目标时长: {original_target_dur:.3f}s -> {audio_duration / MAX_SPEED_RATIO:.3f}s | ' f'时间轴: {start_time:.3f}s -> {end_time:.3f}s' ) speed_ratio = MAX_SPEED_RATIO target_dur = audio_duration / MAX_SPEED_RATIO clone_ratio = audio_duration / srt_duration if srt_duration > 0 else 0 description = ( f'[max] 提速到下个 | 原始: {srt_duration:.3f}s | 克隆: {audio_duration:.3f}s ({clone_ratio:.3f}x) | 处理后: {target_dur:.3f}s | ' f'速度: {speed_ratio:.3f}x (克隆/处理后 = {audio_duration:.3f}/{target_dur:.3f}) | ' f'时间轴: {start_time:.3f}s -> {end_time:.3f}s | 间隙: {next_gap_val:.3f}s' ) return { 'strategy': 'speedup', 'speed_ratio': speed_ratio, 'target_duration': target_dur, 'actual_duration': audio_duration, 'description': description } return calculate_audio_strategy(audio_duration, srt_duration, next_gap, 'normal', start_time, end_time) def analyze_audio_tracks( audio_params: List[AudioParam], speed_strategy: str = 'max', task_logger=None ) -> List[Dict]: """分析音频轨道,计算处理策略 使用传入的 start_secs 和 end_secs 计算时间轴和间隙 """ # 使用传入的 logger 或默认的 log = task_logger or logger tracks = [] for idx, param in enumerate(audio_params): # 使用传入的 clone_audio_length(已在 __post_init__ 中验证) audio_duration = param.clone_audio_length # 使用 original_audio_length 作为字幕时长(SRT duration) srt_duration = param.original_audio_length # 使用传入的 start_secs 和 end_secs start_time = param.start_secs end_time = param.end_secs # 计算到下个音频的间隙 next_gap = None if idx < len(audio_params) - 1: # 当前音频的结束时间 current_end_time = end_time # 下一个音频的开始时间 next_param = audio_params[idx + 1] next_start_time = next_param.start_secs # 计算真实间隙:下一个音频开始时间 - 当前音频结束时间 # 如果连续排列,gap = 0;如果有间隙,gap > 0;如果重叠,gap < 0 next_gap = next_start_time - current_end_time # 计算处理策略 # 对于最后一个音频,如果使用 max 策略,回退到 normal 策略(避免 infinity 导致 speed_ratio = 0) effective_strategy = speed_strategy is_last_track = (idx == len(audio_params) - 1) if is_last_track and speed_strategy == 'max': effective_strategy = 'normal' strategy = calculate_audio_strategy( audio_duration, srt_duration, next_gap, effective_strategy, start_time, end_time ) tracks.append({ 'id': param.audio_sort_num, 'audio_file': param.clone_audio_path, 'start_time': start_time, 'end_time': end_time, 'srt_duration': srt_duration, 'audio_duration': audio_duration, 'next_gap': next_gap, 'strategy': strategy, 'param': param }) log.info(f" → 音频 [{param.audio_sort_num:03d}]: {strategy['description']}") return tracks # ============================================================================ # FFmpeg Filter Complex 构建 # ============================================================================ def build_filter_complex_for_video( audio_tracks: List[Dict], has_bgm: bool ) -> str: """构建 FFmpeg filter_complex 字符串(包含视频压制)""" filters = [] # 1. 处理每个克隆音频 for idx, track in enumerate(audio_tracks): input_idx = idx + 1 # 输入索引:[0:视频] [1:音频1] [2:音频2] ... audio_label = f"a{idx}" strategy = track['strategy'] speed_ratio = strategy['speed_ratio'] target_duration = strategy['target_duration'] start_time = track['start_time'] # 构建 atempo 链 atempo_chain = build_atempo_chain(speed_ratio) # 计算安全的淡入淡出时长 safe_fade_dur = min(FADE_DURATION, target_duration / 2.0) # 构建滤镜:变速 → 裁剪 → 重置PTS → 降音量 → 淡入淡出 → 延迟 audio_filter = ( f"[{input_idx}:a]" f"{atempo_chain}" # 变速(如需要) f"atrim=start=0:end={target_duration:.3f}," # 裁剪到目标时长 f"asetpts=PTS-STARTPTS," # 重置时间戳 f"volume={VOLUME_LEVEL}," # 预降音量 f"afade=t=in:st=0:d={safe_fade_dur:.3f}:curve=esin," # 淡入 f"afade=t=out:st={max(0.0, target_duration - safe_fade_dur):.3f}:d={safe_fade_dur:.3f}:curve=esin," # 淡出 f"adelay={int(start_time * 1000)}|{int(start_time * 1000)}" # 延迟对齐(最后一个滤镜,不需要逗号) f"[{audio_label}]" ) filters.append(audio_filter) # 2. 处理 BGM if has_bgm: bgm_input_idx = len(audio_tracks) + 1 # BGM 在最后一个输入 bgm_filter = f"[{bgm_input_idx}:a]volume=1.0[bgm]" filters.append(bgm_filter) # 3. 混音 audio_labels = "".join([f"[a{i}]" for i in range(len(audio_tracks))]) if has_bgm: audio_labels += "[bgm]" mix_input_count = len(audio_tracks) + 1 else: mix_input_count = len(audio_tracks) mix_filter = ( f"{audio_labels}" f"amix=inputs={mix_input_count}:duration=longest:normalize=0[mixed]" ) filters.append(mix_filter) # 4. 动态处理:压缩器 + 限幅器 dynamics_filter = ( f"[mixed]" f"acompressor=threshold={COMPRESSOR_THRESHOLD}dB:ratio={COMPRESSOR_RATIO}:attack=5:release=50," f"alimiter=limit={LIMITER_LEVEL}" f"[mixout]" ) filters.append(dynamics_filter) # 5. 视频流(直接映射,不处理字幕) # 注意:视频流不走 filter,直接映射 0:v # 在命令行中使用 -map 0:v 而不是 -map [vout] # 过滤掉空字符串,避免产生空的滤镜 filters = [f for f in filters if f and f.strip()] return ";".join(filters) def build_filter_complex_for_audio( audio_tracks: List[Dict], has_bgm: bool ) -> str: """ 构建 FFmpeg filter_complex 字符串 处理流程: 1. 每个音频:变速(如需要)→ 裁剪 → 重置时间戳 → 降音量 → 淡入淡出 → 延迟对齐 2. BGM:调整音量 3. 混音:amix 4. 动态处理:压缩器 + 限幅器 Args: audio_tracks: 准备好的音频轨道列表 has_bgm: 是否有 BGM 音轨 Returns: filter_complex 字符串 """ filters = [] # 1. 处理每个克隆音频 for idx, track in enumerate(audio_tracks): input_idx = idx # 输入索引从 0 开始(没有视频输入) audio_label = f"a{idx}" strategy = track['strategy'] speed_ratio = strategy['speed_ratio'] target_duration = strategy['target_duration'] start_time = track['start_time'] # 构建 atempo 链 atempo_chain = build_atempo_chain(speed_ratio) # 计算安全的淡入淡出时长(不超过音频时长的一半) safe_fade_dur = min(FADE_DURATION, target_duration / 2.0) # 构建滤镜:变速 → 裁剪 → 重置PTS → 降音量 → 淡入淡出 → 延迟 audio_filter = ( f"[{input_idx}:a]" f"{atempo_chain}" # 变速(如需要) f"atrim=start=0:end={target_duration:.3f}," # 裁剪到目标时长 f"asetpts=PTS-STARTPTS," # 重置时间戳 f"volume={VOLUME_LEVEL}," # 预降音量 f"afade=t=in:st=0:d={safe_fade_dur:.3f}:curve=esin," # 淡入 f"afade=t=out:st={max(0.0, target_duration - safe_fade_dur):.3f}:d={safe_fade_dur:.3f}:curve=esin," # 淡出 f"adelay={int(start_time * 1000)}|{int(start_time * 1000)}" # 延迟对齐 f"[{audio_label}]" ) filters.append(audio_filter) # 2. 处理 BGM(如果有) if has_bgm: bgm_input_idx = len(audio_tracks) # BGM 在最后一个输入 bgm_filter = f"[{bgm_input_idx}:a]volume=1.0[bgm]" filters.append(bgm_filter) # 3. 混音 audio_labels = "".join([f"[a{i}]" for i in range(len(audio_tracks))]) if has_bgm: audio_labels += "[bgm]" mix_input_count = len(audio_tracks) + 1 else: mix_input_count = len(audio_tracks) mix_filter = ( f"{audio_labels}" f"amix=inputs={mix_input_count}:duration=longest:normalize=0[mixed]" ) filters.append(mix_filter) # 4. 动态处理:压缩器 + 限幅器 dynamics_filter = ( f"[mixed]" f"acompressor=threshold={COMPRESSOR_THRESHOLD}dB:ratio={COMPRESSOR_RATIO}:attack=5:release=50," f"alimiter=limit={LIMITER_LEVEL}" f"[out]" ) filters.append(dynamics_filter) # 过滤掉空字符串,避免产生空的滤镜 filters = [f for f in filters if f and f.strip()] return ";".join(filters) # ============================================================================ # 主函数 # ============================================================================ def audio_auto_merge(audio_merge: AudioMerge, task_logger=None) -> Dict: """ 音频自动合并函数 根据 AudioMerge 参数,将多个克隆音频和 BGM 混合并压制到视频中 Args: audio_merge: 音频合并参数类 task_logger: 带task_id的logger(可选) Returns: 结果字典,包含 success、output_file 等 """ # 使用传入的 logger 或默认的 log = task_logger or logger log.info(f"开始音频合并 (策略: {audio_merge.speed_strategy})") # 验证输入文件(静默) if not os.path.exists(audio_merge.input_path): raise FileNotFoundError(f"输入文件不存在: {audio_merge.input_path}") if not os.path.exists(audio_merge.bgm_path): raise FileNotFoundError(f"BGM文件不存在: {audio_merge.bgm_path}") bgm_duration = get_audio_duration(audio_merge.bgm_path) log.debug(f"BGM 时长: {bgm_duration:.2f}s") # 分析音频轨道 log.info(f"分析 {len(audio_merge.audio_params)} 个音频轨道...") audio_tracks = analyze_audio_tracks(audio_merge.audio_params, audio_merge.speed_strategy, log) # 构建 filter_complex log.debug(f"构建 FFmpeg 滤镜...") if audio_merge.input_type == 'audio': filter_complex = build_filter_complex_for_audio(audio_tracks, True) # 总是有 BGM else: filter_complex = build_filter_complex_for_video(audio_tracks, True) # 总是有 BGM log.debug(f"滤镜长度: {len(filter_complex)} 字符") # 4. 构建 FFmpeg 命令 ffmpeg_cmd = ['ffmpeg', '-nostdin'] # 添加输入文件:视频 + 音频 + BGM if audio_merge.input_type == "video": ffmpeg_cmd.extend(['-i', audio_merge.input_path]) for track in audio_tracks: ffmpeg_cmd.extend(['-i', track['audio_file']]) ffmpeg_cmd.extend(['-i', audio_merge.bgm_path]) if audio_merge.input_type == "audio": ffmpeg_cmd.extend([ '-filter_complex', filter_complex, '-map', '[out]', '-c:a', 'pcm_s16le', # WAV 格式使用 PCM 编码 '-ar', '44100', # 采样率 44.1kHz '-ac', '2', # 双声道 '-y', audio_merge.output_path ]) else: # 添加滤镜和输出设置 ffmpeg_cmd.extend([ '-filter_complex', filter_complex, '-map', '0:v', # 直接映射原始视频流(不走 filter) '-map', '[mixout]', # 映射混合后的音频 '-c:v', 'copy', # 视频流复制,不重新编码 '-movflags', '+faststart', '-c:a', 'aac', # 音频编码为 AAC '-b:a', '128k', # 音频比特率 '-avoid_negative_ts', '1', '-f', 'mp4', '-y', audio_merge.output_path ]) # 执行 FFmpeg log.info(f"执行音频混合和视频合成...") log.debug(f"FFmpeg 命令: {' '.join(ffmpeg_cmd)}") process = None try: # 实时输出 FFmpeg 日志(FFmpeg 输出到 stderr,合并到 stdout) process = subprocess.Popen( ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # 将 stderr 重定向到 stdout universal_newlines=True, bufsize=1 ) # 实时打印输出(仅 DEBUG 级别) try: for line in process.stdout: log.debug(f"FFmpeg: {line.rstrip()}") finally: # 确保 stdout 被关闭 if process.stdout and not process.stdout.closed: process.stdout.close() # 等待进程完成,设置超时(30 分钟) try: process.wait(timeout=1800) except subprocess.TimeoutExpired: log.error(f"FFmpeg 执行超时(30分钟),强制终止进程") process.kill() process.wait() raise Exception("FFmpeg 执行超时(30分钟)") if process.returncode != 0: raise subprocess.CalledProcessError(process.returncode, ffmpeg_cmd) # 6. 验证输出 if not os.path.exists(audio_merge.output_path): raise Exception("输出文件未生成") file_size = os.path.getsize(audio_merge.output_path) if file_size < 1024: raise Exception(f"输出文件异常(大小: {file_size} bytes)") log.info( f"✓ 音频合并完成: {os.path.basename(audio_merge.output_path)} ({file_size / 1024 / 1024:.2f} MB, {len(audio_tracks)} 轨道)") return { 'output_file': audio_merge.output_path, 'file_size': file_size, 'track_count': len(audio_tracks), 'has_bgm': True } except subprocess.CalledProcessError as e: error_msg = f"FFmpeg 执行失败,返回码: {e.returncode}" log.error(f"❌ {error_msg}") raise Exception(error_msg) except Exception as e: log.error(f"❌ 音频合并失败: {e}") raise finally: # 确保子进程被清理 if process is not None: try: # 如果进程还在运行,强制终止 if process.poll() is None: log.warning(f"清理残留 FFmpeg 进程...") try: process.kill() process.wait(timeout=5) except subprocess.TimeoutExpired: log.error(f"FFmpeg 进程无法终止,可能需要手动清理") except Exception as cleanup_error: log.error(f" ⚠️ 清理进程时出错: {cleanup_error}") finally: # 确保 stdout 被关闭 if process.stdout and not process.stdout.closed: try: process.stdout.close() except: pass