import os import re import time import logging import subprocess from typing import Optional, Dict, List, Tuple, Union from f5_tts.api import F5TTS logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class AgentF5TTS: def __init__(self, ckpt_file: str, vocoder_name: str = "vocos", delay: float = 0, device: str = "mps"): """ Initialize the F5-TTS Agent. :param ckpt_file: Path to the safetensors model checkpoint. :param vocoder_name: Name of the vocoder to use ("vocos" or "bigvgan"). (Ignored in current F5TTS version) :param delay: Delay in seconds between audio generations. :param device: Device to use ("cpu", "cuda", "mps"). """ # vocoder_name is not supported in the installed version of F5TTS.__init__ self.model = F5TTS(ckpt_file=ckpt_file, device=device) self.delay = delay # Delay in seconds def infer(self, ref_file: str, ref_text: str, gen_text: str, file_wave: str, remove_silence: bool = False, speed: float = 1.0): """ Direct inference method wrapping the underlying model. :param ref_file: Path to reference audio file. :param ref_text: Reference text (optional). :param gen_text: Text to generate. :param file_wave: Output wave file path. :param remove_silence: Whether to remove silence from generated audio. :param speed: Speed factor for speech generation. """ self.model.infer( ref_file=ref_file, ref_text=ref_text, gen_text=gen_text, file_wave=file_wave, remove_silence=remove_silence, speed=speed, ) def generate_emotion_speech(self, text_file: str, output_audio_file: str, speaker_emotion_refs: Dict[Tuple[str, str], str], convert_to_mp3: bool = False): """ Generate speech using the F5-TTS model. :param text_file: Path to the input text file. :param output_audio_file: Path to save the combined audio output. :param speaker_emotion_refs: Dictionary mapping (speaker, emotion) tuples to reference audio paths. :param convert_to_mp3: Boolean flag to convert the output to MP3. """ try: with open(text_file, "r", encoding="utf-8") as file: lines = [line.strip() for line in file if line.strip()] except FileNotFoundError: logging.error(f"Text file not found: {text_file}") return if not lines: logging.error("Input text file is empty.") return temp_files = [] os.makedirs(os.path.dirname(output_audio_file), exist_ok=True) for i, line in enumerate(lines): speaker, emotion = self._determine_speaker_emotion(line) ref_audio = speaker_emotion_refs.get((speaker, emotion)) line_clean = re.sub(r'\[speaker:.*?\]\s*', '', line) if not ref_audio or not os.path.exists(ref_audio): logging.error(f"Reference audio not found for speaker '{speaker}', emotion '{emotion}'.") continue ref_text = "" # Placeholder or load corresponding text temp_file = f"{output_audio_file}_line{i + 1}.wav" try: logging.info(f"Generating speech for line {i + 1}: '{line_clean}' with speaker '{speaker}', emotion '{emotion}'") self.model.infer( ref_file=ref_audio, ref_text=ref_text, gen_text=line_clean, file_wave=temp_file, remove_silence=True, ) temp_files.append(temp_file) time.sleep(self.delay) except Exception as e: logging.error(f"Error generating speech for line {i + 1}: {e}") self._combine_audio_files(temp_files, output_audio_file, convert_to_mp3) def generate_speech(self, text_file: str, output_audio_file: str, ref_audio: str, convert_to_mp3: bool = False): try: with open(text_file, 'r', encoding='utf-8') as file: lines = [line.strip() for line in file if line.strip()] except FileNotFoundError: logging.error(f"Text file not found: {text_file}") return if not lines: logging.error("Input text file is empty.") return temp_files = [] os.makedirs(os.path.dirname(output_audio_file), exist_ok=True) for i, line in enumerate(lines): if not ref_audio or not os.path.exists(ref_audio): logging.error(f"Reference audio not found for speaker.") continue temp_file = f"{output_audio_file}_line{i + 1}.wav" try: logging.info(f"Generating speech for line {i + 1}: '{line}'") self.model.infer( ref_file=ref_audio, # No reference audio ref_text="", # No reference text gen_text=line, file_wave=temp_file, ) temp_files.append(temp_file) except Exception as e: logging.error(f"Error generating speech for line {i + 1}: {e}") # Combine temp_files into output_audio_file if needed self._combine_audio_files(temp_files, output_audio_file, convert_to_mp3) def _determine_speaker_emotion(self, text: str) -> Tuple[str, str]: """ Extract speaker and emotion from the text using regex. Default to "speaker1" and "neutral" if not specified. """ speaker, emotion = "speaker1", "neutral" # Default values # Use regex to find [speaker:speaker_name, emotion:emotion_name] match = re.search(r"\[speaker:(.*?), emotion:(.*?)\]", text) if match: speaker = match.group(1).strip() emotion = match.group(2).strip() logging.debug(f"Determined speaker: '{speaker}', emotion: '{emotion}'") return speaker, emotion def _combine_audio_files(self, temp_files: List[str], output_audio_file: str, convert_to_mp3: bool): """Combine multiple audio files into a single file using FFmpeg.""" if not temp_files: logging.error("No audio files to combine.") return list_file = "file_list.txt" with open(list_file, "w") as f: for temp in temp_files: f.write(f"file '{temp}'\n") try: subprocess.run(["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file, "-c", "copy", output_audio_file], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) logging.info(f"Combined audio saved to: {output_audio_file}") if convert_to_mp3: mp3_output = output_audio_file.replace(".wav", ".mp3") subprocess.run(["ffmpeg", "-y", "-i", output_audio_file, "-codec:a", "libmp3lame", "-qscale:a", "2", mp3_output], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) logging.info(f"Converted to MP3: {mp3_output}") for temp in temp_files: os.remove(temp) os.remove(list_file) except Exception as e: logging.error(f"Error combining audio files: {e}") # Example usage, remove from this line on to import into other agents. # make sure to adjust the paths to yourr files. if __name__ == "__main__": env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" model_path = "./F5-TTS/ckpts/pt-br/model_last.safetensors" speaker_emotion_refs = { ("speaker1", "happy"): "ref_audios/speaker1_happy.wav", ("speaker1", "sad"): "ref_audios/speaker1_sad.wav", ("speaker1", "angry"): "ref_audios/speaker1_angry.wav", } # Note: Adjust path if needed if os.path.exists(model_path): agent = AgentF5TTS(ckpt_file=model_path, vocoder_name="vocos", delay=6) # Test generate_emotion_speech # agent.generate_emotion_speech(...) # Test generate_speech # agent.generate_speech(...) else: print(f"Model path {model_path} does not exist. Skipping example execution.")