import os import shutil import threading import time from functools import wraps import logging import requests import yaml from flask import Flask, request, jsonify, g from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from services.logger import get_app_logger, RequestLogger, task_id_var, get_process_worker_logger, \ get_upload_worker_logger from services.tts_service import TTSService from services.queue_manager import QueueManager from services.r2_uploader import R2Uploader from services.uvr5_service import UVR5Service from services.merger_service import MergerService logger = get_app_logger() # 加载配置 def load_config(config_path='config.yaml'): with open(config_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) config = load_config() # Auth Helpers def check_auth(username, password): app_config = config.get('app', {}) return username == app_config['api_username'] and password == app_config['api_password'] def authenticate(): return jsonify({'error': 'Authentication required'}), 401, {'WWW-Authenticate': 'Basic realm="Login Required"'} def requires_auth(f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated def send_hook_with_retry(url: str, data: dict, max_retries: int = 3): session = requests.Session() retries = Retry(total=max_retries, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) session.mount('http://', HTTPAdapter(max_retries=retries)) session.mount('https://', HTTPAdapter(max_retries=retries)) try: response = session.post(url, json=data, timeout=10) response.raise_for_status() return response except Exception as e: logger.error(f"Failed to send hook to {url}: {e}") pass def download_file(url: str, path: str): response = requests.get(url, stream=True, timeout=60) response.raise_for_status() with open(path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) # Initialize Flask app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 # Initialize Services queue_manager = QueueManager(config['redis']) r2_uploader = R2Uploader(config['r2']) tts_service = TTSService(config) uvr5_service = UVR5Service(config) merger_service = MergerService(config) # Temp Dir for Videos VIDEO_TEMP_DIR = 'data/temp_videos' os.makedirs(VIDEO_TEMP_DIR, exist_ok=True) # ------------------------------------------------------------------------- # Workers # ------------------------------------------------------------------------- def process_worker(): """ Main Pipeline Worker: 1. Fetch Task 2. Download Video 3. Run TTS Generation 4. Run UVR5 Separation (get BGM) 5. Merge (Video + TTS + BGM) 6. Push to Upload Queue """ worker_logger = get_process_worker_logger() worker_logger.info("Main Process Worker started") while True: try: task = queue_manager.get_process_task() if not task: time.sleep(1) continue task_id = task.get('task_id') token = task_id_var.set(task_id) # Context variables for cleanup local_video_path = None bgm_path = None vocals_path = None task_tts_dir = None final_output_path = None success = False try: worker_logger.info("Processing started.") # 1. Download Video video_url = task['data'].get('video_url') if not video_url: raise ValueError("Missing video_url") local_video_path = os.path.join(VIDEO_TEMP_DIR, f"{task_id}_input.mp4") worker_logger.info(f"Downloading video from {video_url}") download_file(video_url, local_video_path) # 2. Run TTS worker_logger.info("Running TTS...") tts_result = tts_service.process_task(task) segments = tts_result['segments'] task_tts_dir = tts_result['task_dir'] # 3. Run UVR5 worker_logger.info("Running UVR5 Separation...") vocals_path, bgm_path = uvr5_service.process_audio(local_video_path, task_id) if not bgm_path or not os.path.exists(bgm_path): raise Exception("UVR5 failed to produce background music.") # 4. Merge worker_logger.info("Merging Audio and Video...") final_output_path = os.path.join(VIDEO_TEMP_DIR, f"{task_id}_final.mp4") merger_service.merge_video( video_path=local_video_path, bgm_path=bgm_path, segments=segments, output_path=final_output_path ) # 5. Push to Upload upload_task = { 'task_id': task_id, 'file_path': final_output_path, 'hook_url': task['data'].get('hook_url'), } queue_manager.push_upload_task(upload_task) success = True except Exception as e: worker_logger.error(f"Task processing failed: {e}", exc_info=True) if 'hook_url' in task.get('data', {}): hook_url = task['data']['hook_url'] failure_payload = { "task_uuid": task_id, "status": "failed", "timestamp": int(time.time()), "error_message": str(e) } send_hook_with_retry(hook_url, failure_payload) finally: # Cleanup Logic try: if local_video_path and os.path.exists(local_video_path): os.remove(local_video_path) if bgm_path and os.path.exists(bgm_path): os.remove(bgm_path) if vocals_path and os.path.exists(vocals_path): os.remove(vocals_path) if task_tts_dir and os.path.exists(task_tts_dir): shutil.rmtree(task_tts_dir) # Only delete final output if we FAILED. # If success, upload worker handles it. if not success and final_output_path and os.path.exists(final_output_path): os.remove(final_output_path) except Exception as cleanup_err: worker_logger.warning(f"Cleanup error: {cleanup_err}") task_id_var.reset(token) except Exception as e: worker_logger.error(f"Worker Loop Error: {e}") time.sleep(5) def upload_worker(): """ Upload Worker: 1. Upload Final Video 2. Send Success Callback 3. Cleanup Final Video """ worker_logger = get_upload_worker_logger() worker_logger.info("Upload Worker started") while True: try: result = queue_manager.get_upload_task(timeout=5) if not result: continue task_id = result.get('task_id') token = task_id_var.set(task_id) file_path = result.get('file_path') hook_url = result.get('hook_url') try: worker_logger.info(f"Uploading result: {file_path}") file_url = None if file_path and os.path.exists(file_path): object_key = f"{task_id}.mp4" file_url = r2_uploader.upload_file(file_path, object_key=object_key) else: raise FileNotFoundError(f"File to upload not found: {file_path}") if hook_url: success_payload = { "task_uuid": task_id, "status": "success", "timestamp": int(time.time()), "result_url": file_url } worker_logger.info(f"Sending success callback to {hook_url}") send_hook_with_retry(hook_url, success_payload) except Exception as e: worker_logger.error(f"Upload failed: {e}", exc_info=True) if hook_url: failure_payload = { "task_uuid": task_id, "status": "failed", "timestamp": int(time.time()), "error_message": str(e) } send_hook_with_retry(hook_url, failure_payload) finally: # Cleanup the final video file if file_path and os.path.exists(file_path): try: os.remove(file_path) worker_logger.info(f"Removed final video: {file_path}") except Exception as e: worker_logger.warning(f"Failed to remove file: {e}") task_id_var.reset(token) except Exception as e: logger.error(f"Upload Loop Error: {e}") time.sleep(5) # ------------------------------------------------------------------------- # Flask Routes # ------------------------------------------------------------------------- @app.before_request def before_request(): g.start_time = time.time() @app.after_request def after_request(response): if hasattr(g, 'start_time'): duration = time.time() - g.start_time RequestLogger.log_request(request, response, duration) return response @app.route('/dubbing/character', methods=['POST']) @requires_auth def generate(): try: data = request.json # Basic Validation required = ['character_voice', 'content', 'hook_url', 'video_url'] for field in required: if not data.get(field): return jsonify({'error': f'Missing field: {field}'}), 400 priority = data.get('priority', 3) if priority not in range(1, 6): return jsonify({'error': 'Priority must be 1-5'}), 400 task_id = queue_manager.add_task(data, priority) logger.info(f"Created Task: {task_id}") return jsonify({ 'task_uuid': task_id, 'status': 'queued', 'message': 'Task queued successfully' }), 201 except Exception as e: logger.error(f"API Error: {e}") return jsonify({'error': str(e)}), 500 @app.route('/dubbing/character/tasks//cancel', methods=['DELETE']) @requires_auth def cancel_task(task_id: str): try: if queue_manager.delete_process_task(task_id): return jsonify({'message': 'Task canceled'}), 200 return jsonify({'message': 'Task not found or already processed'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 @app.errorhandler(500) def internal_error(error): logger.error(f"500 Error: {error}") return jsonify({'error': 'Internal server error'}), 500 def main(): logger.info("Starting Service...") # Directories os.makedirs(config['tts']['output_dir'], exist_ok=True) os.makedirs(config['tts']['voices_dir'], exist_ok=True) os.makedirs(VIDEO_TEMP_DIR, exist_ok=True) # Threads threading.Thread(target=process_worker, daemon=True).start() threading.Thread(target=upload_worker, daemon=True).start() app.run( host=config['app']['host'], port=config['app']['port'], debug=config['app']['debug'] ) if __name__ == '__main__': main()