F5-TTS-pt-br / AgentF5TTSChunk.py
fuuuzzy's picture
Add files using upload-large-folder tool
cf7fa42 verified
import os
import re
import time
import logging
import subprocess
from typing import Optional, Dict, List, Tuple, Union
from f5_tts.api import F5TTS
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class AgentF5TTS:
def __init__(self, ckpt_file: str, vocoder_name: str = "vocos", delay: float = 0, device: str = "mps"):
"""
Initialize the F5-TTS Agent.
:param ckpt_file: Path to the safetensors model checkpoint.
:param vocoder_name: Name of the vocoder to use ("vocos" or "bigvgan"). (Ignored in current F5TTS version)
:param delay: Delay in seconds between audio generations.
:param device: Device to use ("cpu", "cuda", "mps").
"""
# vocoder_name is not supported in the installed version of F5TTS.__init__
self.model = F5TTS(ckpt_file=ckpt_file, device=device)
self.delay = delay # Delay in seconds
def infer(self, ref_file: str, ref_text: str, gen_text: str, file_wave: str, remove_silence: bool = False, speed: float = 1.0):
"""
Direct inference method wrapping the underlying model.
:param ref_file: Path to reference audio file.
:param ref_text: Reference text (optional).
:param gen_text: Text to generate.
:param file_wave: Output wave file path.
:param remove_silence: Whether to remove silence from generated audio.
:param speed: Speed factor for speech generation.
"""
self.model.infer(
ref_file=ref_file,
ref_text=ref_text,
gen_text=gen_text,
file_wave=file_wave,
remove_silence=remove_silence,
speed=speed,
)
def generate_emotion_speech(self, text_file: str, output_audio_file: str, speaker_emotion_refs: Dict[Tuple[str, str], str], convert_to_mp3: bool = False):
"""
Generate speech using the F5-TTS model.
:param text_file: Path to the input text file.
:param output_audio_file: Path to save the combined audio output.
:param speaker_emotion_refs: Dictionary mapping (speaker, emotion) tuples to reference audio paths.
:param convert_to_mp3: Boolean flag to convert the output to MP3.
"""
try:
with open(text_file, "r", encoding="utf-8") as file:
lines = [line.strip() for line in file if line.strip()]
except FileNotFoundError:
logging.error(f"Text file not found: {text_file}")
return
if not lines:
logging.error("Input text file is empty.")
return
temp_files = []
os.makedirs(os.path.dirname(output_audio_file), exist_ok=True)
for i, line in enumerate(lines):
speaker, emotion = self._determine_speaker_emotion(line)
ref_audio = speaker_emotion_refs.get((speaker, emotion))
line_clean = re.sub(r'\[speaker:.*?\]\s*', '', line)
if not ref_audio or not os.path.exists(ref_audio):
logging.error(f"Reference audio not found for speaker '{speaker}', emotion '{emotion}'.")
continue
ref_text = "" # Placeholder or load corresponding text
temp_file = f"{output_audio_file}_line{i + 1}.wav"
try:
logging.info(f"Generating speech for line {i + 1}: '{line_clean}' with speaker '{speaker}', emotion '{emotion}'")
self.model.infer(
ref_file=ref_audio,
ref_text=ref_text,
gen_text=line_clean,
file_wave=temp_file,
remove_silence=True,
)
temp_files.append(temp_file)
time.sleep(self.delay)
except Exception as e:
logging.error(f"Error generating speech for line {i + 1}: {e}")
self._combine_audio_files(temp_files, output_audio_file, convert_to_mp3)
def generate_speech(self, text_file: str, output_audio_file: str, ref_audio: str, convert_to_mp3: bool = False):
try:
with open(text_file, 'r', encoding='utf-8') as file:
lines = [line.strip() for line in file if line.strip()]
except FileNotFoundError:
logging.error(f"Text file not found: {text_file}")
return
if not lines:
logging.error("Input text file is empty.")
return
temp_files = []
os.makedirs(os.path.dirname(output_audio_file), exist_ok=True)
for i, line in enumerate(lines):
if not ref_audio or not os.path.exists(ref_audio):
logging.error(f"Reference audio not found for speaker.")
continue
temp_file = f"{output_audio_file}_line{i + 1}.wav"
try:
logging.info(f"Generating speech for line {i + 1}: '{line}'")
self.model.infer(
ref_file=ref_audio, # No reference audio
ref_text="", # No reference text
gen_text=line,
file_wave=temp_file,
)
temp_files.append(temp_file)
except Exception as e:
logging.error(f"Error generating speech for line {i + 1}: {e}")
# Combine temp_files into output_audio_file if needed
self._combine_audio_files(temp_files, output_audio_file, convert_to_mp3)
def _determine_speaker_emotion(self, text: str) -> Tuple[str, str]:
"""
Extract speaker and emotion from the text using regex.
Default to "speaker1" and "neutral" if not specified.
"""
speaker, emotion = "speaker1", "neutral" # Default values
# Use regex to find [speaker:speaker_name, emotion:emotion_name]
match = re.search(r"\[speaker:(.*?), emotion:(.*?)\]", text)
if match:
speaker = match.group(1).strip()
emotion = match.group(2).strip()
logging.debug(f"Determined speaker: '{speaker}', emotion: '{emotion}'")
return speaker, emotion
def _combine_audio_files(self, temp_files: List[str], output_audio_file: str, convert_to_mp3: bool):
"""Combine multiple audio files into a single file using FFmpeg."""
if not temp_files:
logging.error("No audio files to combine.")
return
list_file = "file_list.txt"
with open(list_file, "w") as f:
for temp in temp_files:
f.write(f"file '{temp}'\n")
try:
subprocess.run(["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file, "-c", "copy", output_audio_file], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
logging.info(f"Combined audio saved to: {output_audio_file}")
if convert_to_mp3:
mp3_output = output_audio_file.replace(".wav", ".mp3")
subprocess.run(["ffmpeg", "-y", "-i", output_audio_file, "-codec:a", "libmp3lame", "-qscale:a", "2", mp3_output], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
logging.info(f"Converted to MP3: {mp3_output}")
for temp in temp_files:
os.remove(temp)
os.remove(list_file)
except Exception as e:
logging.error(f"Error combining audio files: {e}")
# Example usage, remove from this line on to import into other agents.
# make sure to adjust the paths to yourr files.
if __name__ == "__main__":
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
model_path = "./F5-TTS/ckpts/pt-br/model_last.safetensors"
speaker_emotion_refs = {
("speaker1", "happy"): "ref_audios/speaker1_happy.wav",
("speaker1", "sad"): "ref_audios/speaker1_sad.wav",
("speaker1", "angry"): "ref_audios/speaker1_angry.wav",
}
# Note: Adjust path if needed
if os.path.exists(model_path):
agent = AgentF5TTS(ckpt_file=model_path, vocoder_name="vocos", delay=6)
# Test generate_emotion_speech
# agent.generate_emotion_speech(...)
# Test generate_speech
# agent.generate_speech(...)
else:
print(f"Model path {model_path} does not exist. Skipping example execution.")