Files
Podcast-Generator/podcast_generator.py
hex2077 719eb14927 feat: 添加播客生成器Web应用基础架构
实现基于Next.js的播客生成器Web应用,包含以下主要功能:
- 完整的Next.js项目结构配置
- 播客生成API接口
- 音频文件服务API
- TTS配置管理
- 响应式UI组件
- 本地存储和状态管理
- 音频可视化组件
- 全局样式和主题配置

新增配置文件包括:
- Next.js、Tailwind CSS、ESLint等工具配置
- 环境变量示例文件
- 启动脚本和构建检查脚本
- 类型定义和工具函数库
2025-08-14 23:44:18 +08:00

622 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# podcast_generator.py
import argparse # Import argparse for command-line arguments
import os
import json
import time
import glob
import sys
import subprocess # For calling external commands like ffmpeg
import requests # For making HTTP requests to TTS API
import uuid # For generating unique filenames for temporary audio files
from datetime import datetime
from openai_cli import OpenAICli # Moved to top for proper import
import urllib.parse # For URL encoding
import re # For regular expression operations
from typing import Optional, Tuple
from tts_adapters import TTSAdapter, IndexTTSAdapter, EdgeTTSAdapter, FishAudioAdapter, MinimaxAdapter, DoubaoTTSAdapter, GeminiTTSAdapter # Import TTS adapters
# Global configuration
output_dir = "output"
file_list_path = os.path.join(output_dir, "file_list.txt")
tts_providers_config_path = 'config/tts_providers.json'
def read_file_content(filepath):
"""Reads content from a given file path."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
raise FileNotFoundError(f"Error: File not found at {filepath}")
def _load_json_config(file_path: str) -> dict:
"""Loads a JSON configuration file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
raise FileNotFoundError(f"Error: Configuration file not found at {file_path}")
except json.JSONDecodeError as e:
raise ValueError(f"Error decoding JSON from {file_path}: {e}")
def select_json_config(config_dir='config', return_file_path=False):
"""
Reads JSON files from the specified directory and allows the user to select one.
Returns the content of the selected JSON file.
If return_file_path is True, returns a tuple of (file_path, content).
"""
json_files = glob.glob(os.path.join(config_dir, '*.json'))
if not json_files:
raise FileNotFoundError(f"Error: No JSON files found in {config_dir}")
valid_json_files = []
print(f"Found JSON configuration files in '{config_dir}':")
for i, file_path in enumerate(json_files):
file_name = os.path.basename(file_path)
if file_name != os.path.basename(tts_providers_config_path):
valid_json_files.append(file_path)
print(f"{len(valid_json_files)}. {file_name}")
if not valid_json_files:
raise FileNotFoundError(f"Error: No valid JSON files (excluding tts_providers.json) found in {config_dir}")
while True:
try:
choice_str = input("Enter the number of the configuration file to use: ")
if not choice_str: # Allow empty input to raise an error
raise ValueError("No input provided. Please enter a number.")
choice = int(choice_str)
if 1 <= choice <= len(valid_json_files):
selected_file = valid_json_files[choice - 1]
print(f"Selected: {os.path.basename(selected_file)}")
with open(selected_file, 'r', encoding='utf-8') as f:
content = json.load(f)
if return_file_path:
return selected_file, content
else:
return content
else:
raise ValueError("Invalid choice. Please enter a number within the range.")
except FileNotFoundError as e:
raise FileNotFoundError(f"Error loading selected JSON file: {e}")
except json.JSONDecodeError as e:
raise ValueError(f"Error decoding JSON from selected file: {e}")
except ValueError as e:
print(f"Invalid input: {e}. Please enter a number.")
def generate_speaker_id_text(pod_users, voices_list):
"""
Generates a text string mapping speaker IDs to their names/aliases based on podUsers and voices.
Optimized by converting voices_list to a dictionary for faster lookups.
"""
voice_map = {voice.get("code"): voice for voice in voices_list if voice.get("code")}
speaker_info = []
for speaker_id, pod_user in enumerate(pod_users):
pod_user_code = pod_user.get("code")
role = pod_user.get("role", "") # Default to "未知角色" if role is not provided
found_name = None
voice = voice_map.get(pod_user_code)
if voice:
found_name = voice.get("usedname") or voice.get("alias") or voice.get("name")
if found_name:
if role:
speaker_info.append(f"speaker_id={speaker_id}的名叫{found_name},是一个{role}")
else:
speaker_info.append(f"speaker_id={speaker_id}的名叫{found_name}")
else:
raise ValueError(f"语音code '{pod_user_code}' (speaker_id={speaker_id}) 未找到对应名称或alias。请检查 config/edge-tts.json 中的 voices 配置。")
return "".join(speaker_info) + ""
def merge_audio_files():
output_audio_filename = f"podcast_{int(time.time())}.wav"
output_audio_filepath = "/".join([output_dir, output_audio_filename])
# Use ffmpeg to concatenate audio files
# Check if ffmpeg is available
try:
subprocess.run(["ffmpeg", "-version"], check=True, capture_output=True)
except FileNotFoundError:
raise RuntimeError("FFmpeg is not installed or not in your PATH. Please install FFmpeg to merge audio files. You can download FFmpeg from: https://ffmpeg.org/download.html")
print(f"\nMerging audio files into {output_audio_filename}...")
try:
command = [
"ffmpeg",
"-f", "concat",
"-safe", "0",
"-i", os.path.basename(file_list_path),
"-acodec", "pcm_s16le",
"-ar", "44100",
"-ac", "2",
output_audio_filename
]
# Execute ffmpeg from the output_dir to correctly resolve file paths in file_list.txt
process = subprocess.run(command, check=True, cwd=output_dir, capture_output=True, text=True)
print(f"Audio files merged successfully into {output_audio_filepath}!")
print("FFmpeg stdout:\n", process.stdout)
print("FFmpeg stderr:\n", process.stderr)
return output_audio_filename
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Error merging audio files with FFmpeg: {e.stderr}")
finally:
# Clean up temporary audio files and the file list
for item in os.listdir(output_dir):
if item.startswith("temp_audio"):
try:
os.remove(os.path.join(output_dir, item))
except OSError as e:
print(f"Error removing temporary audio file {item}: {e}") # This should not stop the process
try:
os.remove(file_list_path)
except OSError as e:
print(f"Error removing file list {file_list_path}: {e}") # This should not stop the process
print("Cleaned up temporary files.")
def get_audio_duration(filepath: str) -> Optional[float]:
"""
Uses ffprobe to get the duration of an audio file in seconds.
Returns None if duration cannot be determined.
"""
try:
# Check if ffprobe is available
subprocess.run(["ffprobe", "-version"], check=True, capture_output=True, text=True)
except FileNotFoundError:
print("Error: ffprobe is not installed or not in your PATH. Please install FFmpeg (which includes ffprobe) to get audio duration.")
return None
try:
command = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
filepath
]
result = subprocess.run(command, check=True, capture_output=True, text=True)
duration = float(result.stdout.strip())
return duration
except subprocess.CalledProcessError as e:
print(f"Error calling ffprobe for {filepath}: {e.stderr}")
return None
except ValueError:
print(f"Could not parse duration from ffprobe output for {filepath}.")
return None
except Exception as e:
print(f"An unexpected error occurred while getting audio duration for {filepath}: {e}")
return None
def _parse_arguments():
"""Parses command-line arguments."""
parser = argparse.ArgumentParser(description="Generate podcast script and audio using OpenAI and local TTS.")
parser.add_argument("--api-key", help="OpenAI API key.")
parser.add_argument("--base-url", default="https://api.openai.com/v1", help="OpenAI API base URL (default: https://api.openai.com/v1).")
parser.add_argument("--model", default="gpt-3.5-turbo", help="OpenAI model to use (default: gpt-3.5-turbo).")
parser.add_argument("--threads", type=int, default=1, help="Number of threads to use for audio generation (default: 1).")
parser.add_argument("--output-language", type=str, default="Chinese", help="Language for the podcast overview and script (default: Chinese).")
parser.add_argument("--usetime", type=str, default=None, help="Specific time to be mentioned in the podcast script, e.g., '今天', '昨天'.")
return parser.parse_args()
def _load_configuration():
"""Selects and loads JSON configuration, and infers tts_provider from the selected file name."""
print("Podcast Generation Script")
selected_file_path, config_data = select_json_config(return_file_path=True)
# 从文件名中提取 tts_provider
# 假设文件名格式为 'provider-name.json'
file_name = os.path.basename(selected_file_path)
tts_provider = os.path.splitext(file_name)[0] # 移除 .json 扩展名
config_data["tts_provider"] = tts_provider # 将 tts_provider 添加到配置数据中
print("\nLoaded Configuration: " + tts_provider)
return config_data
def _load_configuration_path(config_path: str) -> dict:
"""Loads JSON configuration from a specified path and infers tts_provider from the file name."""
config_data = _load_json_config(config_path)
# 从文件名中提取 tts_provider
file_name = os.path.basename(config_path)
tts_provider = os.path.splitext(file_name)[0] # 移除 .json 扩展名
config_data["tts_provider"] = tts_provider # 将 tts_provider 添加到配置数据中
print(f"\nLoaded Configuration: {tts_provider} from {config_path}")
return config_data
def _prepare_openai_settings(args, config_data):
"""Determines final OpenAI API key, base URL, and model based on priority."""
api_key = args.api_key or config_data.get("api_key") or os.getenv("OPENAI_API_KEY")
base_url = args.base_url or config_data.get("base_url") or os.getenv("OPENAI_BASE_URL")
model = args.model or config_data.get("model") # Allow model to be None if not provided anywhere
if not model:
model = "gpt-3.5-turbo"
print(f"Using default model: {model} as it was not specified via command-line, config, or environment variables.")
if not api_key:
raise ValueError("Error: OpenAI API key is not set. Please provide it via --api-key, in your config file, or as an environment variable (OPENAI_API_KEY).")
return api_key, base_url, model
def _read_prompt_files():
"""Reads content from input, overview, and podcast script prompt files."""
input_prompt = read_file_content('input.txt')
overview_prompt = read_file_content('prompt/prompt-overview.txt')
original_podscript_prompt = read_file_content('prompt/prompt-podscript.txt')
return input_prompt, overview_prompt, original_podscript_prompt
def _extract_custom_content(input_prompt_content):
"""Extracts custom content from the input prompt."""
custom_content = ""
custom_begin_tag = '```custom-begin'
custom_end_tag = '```custom-end'
start_index = input_prompt_content.find(custom_begin_tag)
if start_index != -1:
end_index = input_prompt_content.find(custom_end_tag, start_index + len(custom_begin_tag))
if end_index != -1:
custom_content = input_prompt_content[start_index + len(custom_begin_tag):end_index].strip()
input_prompt_content = input_prompt_content[end_index + len(custom_end_tag):].strip()
return custom_content, input_prompt_content
def _prepare_podcast_prompts(config_data, original_podscript_prompt, custom_content, usetime: Optional[str] = None, output_language: Optional[str] = None):
"""Prepares the podcast script prompts with speaker info and placeholders."""
pod_users = config_data.get("podUsers", [])
voices = config_data.get("voices", [])
turn_pattern = config_data.get("turnPattern", "random")
original_podscript_prompt = original_podscript_prompt.replace("{{numSpeakers}}", str(len(pod_users)))
original_podscript_prompt = original_podscript_prompt.replace("{{turnPattern}}", turn_pattern)
original_podscript_prompt = original_podscript_prompt.replace("{{usetime}}", usetime if usetime is not None else "5-6 minutes")
original_podscript_prompt = original_podscript_prompt.replace("{{outlang}}", output_language if output_language is not None else "Make sure the input language is set as the output language")
speaker_id_info = generate_speaker_id_text(pod_users, voices)
podscript_prompt = speaker_id_info + "\n\n" + custom_content + "\n\n" + original_podscript_prompt
return podscript_prompt, pod_users, voices, turn_pattern # Return voices for potential future use or consistency
def _generate_overview_content(api_key, base_url, model, overview_prompt, input_prompt, output_language: Optional[str] = None) -> Tuple[str, str, str]:
"""Generates overview content using OpenAI CLI, and extracts title and tags."""
print(f"\nGenerating overview with OpenAI CLI (Output Language: {output_language})...")
try:
# Replace the placeholder with the actual output language
formatted_overview_prompt = overview_prompt.replace("{{outlang}}", output_language if output_language is not None else "Make sure the input language is set as the output language")
openai_client_overview = OpenAICli(api_key=api_key, base_url=base_url, model=model, system_message=formatted_overview_prompt)
overview_response_generator = openai_client_overview.chat_completion(messages=[{"role": "user", "content": input_prompt}])
overview_content = "".join([chunk.choices[0].delta.content for chunk in overview_response_generator if chunk.choices and chunk.choices[0].delta.content])
print("Generated Overview:")
print(overview_content[:100])
# Extract title (first line) and tags (second line)
lines = overview_content.strip().split('\n')
title = lines[0].strip() if len(lines) > 0 else ""
tags = lines[1].strip() if len(lines) > 1 else ""
print(f"Extracted Title: {title}")
print(f"Extracted Tags: {tags}")
return overview_content, title, tags
except Exception as e:
raise RuntimeError(f"Error generating overview: {e}")
def _generate_podcast_script(api_key, base_url, model, podscript_prompt, overview_content):
"""Generates and parses podcast script JSON using OpenAI CLI."""
print("\nGenerating podcast script with OpenAI CLI...")
# Initialize podscript_json_str outside try block to ensure it's always defined
podscript_json_str = ""
try:
openai_client_podscript = OpenAICli(api_key=api_key, base_url=base_url, model=model, system_message=podscript_prompt)
# Generate the response string first
podscript_json_str = "".join([chunk.choices[0].delta.content for chunk in openai_client_podscript.chat_completion(messages=[{"role": "user", "content": overview_content}]) if chunk.choices and chunk.choices[0].delta.content])
podcast_script = None
decoder = json.JSONDecoder()
idx = 0
valid_json_str = ""
while idx < len(podscript_json_str):
try:
obj, end = decoder.raw_decode(podscript_json_str[idx:])
if isinstance(obj, dict) and "podcast_transcripts" in obj:
podcast_script = obj
valid_json_str = podscript_json_str[idx : idx + end]
break
idx += end
except json.JSONDecodeError:
idx += 1
next_brace = podscript_json_str.find('{', idx)
if next_brace != -1:
idx = next_brace
else:
break
if podcast_script is None:
raise ValueError(f"Error: Could not find a valid podcast script JSON object with 'podcast_transcripts' key in response. Raw response: {podscript_json_str}")
print("\nGenerated Podcast Script Length:"+ str(len(podcast_script.get("podcast_transcripts") or [])))
print(valid_json_str[:100] + "...")
if not podcast_script.get("podcast_transcripts"):
raise ValueError("Error: 'podcast_transcripts' array is empty or not found in the generated script. Nothing to convert to audio.")
return podcast_script
except json.JSONDecodeError as e:
raise ValueError(f"Error decoding JSON from podcast script response: {e}. Raw response: {podscript_json_str}")
except Exception as e:
raise RuntimeError(f"Error generating podcast script: {e}")
def generate_audio_for_item(item, config_data, tts_adapter: TTSAdapter, max_retries: int = 3):
"""Generate audio for a single podcast transcript item using the provided TTS adapter."""
speaker_id = item.get("speaker_id")
dialog = item.get("dialog")
voice_code = None
volume_adjustment = 0.0 # 默认值为 0.0
speed_adjustment = 0.0 # 默认值为 0.0
if config_data and "podUsers" in config_data and 0 <= speaker_id < len(config_data["podUsers"]):
pod_user_entry = config_data["podUsers"][speaker_id]
voice_code = pod_user_entry.get("code")
# 从 voices 列表中获取对应的 volume_adjustment
voice_map = {voice.get("code"): voice for voice in config_data.get("voices", []) if voice.get("code")}
volume_adjustment = voice_map.get(voice_code, {}).get("volume_adjustment", 0.0)
speed_adjustment = voice_map.get(voice_code, {}).get("speed_adjustment", 0.0)
if not voice_code:
raise ValueError(f"No voice code found for speaker_id {speaker_id}. Cannot generate audio for this dialog.")
# print(f"dialog-before: {dialog}")
dialog = re.sub(r'[^\w\s\-,.。?!\u4e00-\u9fa5]', '', dialog)
print(f"dialog: {dialog}")
for attempt in range(max_retries):
try:
print(f"Calling TTS API for speaker {speaker_id} ({voice_code}) (Attempt {attempt + 1}/{max_retries})...")
temp_audio_file = tts_adapter.generate_audio(
text=dialog,
voice_code=voice_code,
output_dir=output_dir,
volume_adjustment=volume_adjustment, # 传递音量调整参数
speed_adjustment=speed_adjustment # 传递速度调整参数
)
return temp_audio_file
except RuntimeError as e: # Catch specific RuntimeError from TTS adapters
print(f"Error generating audio for speaker {speaker_id} ({voice_code}) on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
raise RuntimeError(f"Max retries ({max_retries}) reached for speaker {speaker_id} ({voice_code}). Audio generation failed.")
except Exception as e: # Catch other unexpected errors
raise RuntimeError(f"An unexpected error occurred for speaker {speaker_id} ({voice_code}) on attempt {attempt + 1}: {e}")
def _generate_all_audio_files(podcast_script, config_data, tts_adapter: TTSAdapter, threads):
"""Orchestrates the generation of individual audio files."""
os.makedirs(output_dir, exist_ok=True)
print("\nGenerating audio files...")
# test script
# podcast_script = json.loads("{\"podcast_transcripts\":[{\"speaker_id\":0,\"dialog\":\"欢迎收听来生小酒馆客官不进来喝点吗今天咱们来唠唠AI。 小希,你有什么新鲜事来分享吗?\"},{\"speaker_id\":1,\"dialog\":\"当然了, AI 编程工具 Cursor 给开发者送上了一份大礼,付费用户现在可以限时免费体验 GPT 5 的强大编码能力\"}]}")
transcripts = podcast_script.get("podcast_transcripts", [])
max_retries = config_data.get("tts_max_retries", 3) # 从配置中获取最大重试次数默认3次
from concurrent.futures import ThreadPoolExecutor, as_completed
audio_files_dict = {}
with ThreadPoolExecutor(max_workers=threads) as executor:
future_to_index = {
executor.submit(generate_audio_for_item, item, config_data, tts_adapter, max_retries): i
for i, item in enumerate(transcripts)
}
exception_caught = None
for future in as_completed(future_to_index):
index = future_to_index[future]
try:
result = future.result()
if result:
audio_files_dict[index] = result
except Exception as e:
exception_caught = RuntimeError(f"Error generating audio for item {index}: {e}")
# An error occurred, we should stop.
break
# If we broke out of the loop due to an exception, cancel other futures.
if exception_caught:
print(f"An error occurred: {exception_caught}. Cancelling outstanding tasks.")
for f in future_to_index:
if not f.done():
f.cancel()
raise exception_caught
audio_files = [audio_files_dict[i] for i in sorted(audio_files_dict.keys())]
print(f"\nFinished generating individual audio files. Total files: {len(audio_files)}")
return audio_files
def _create_ffmpeg_file_list(audio_files):
"""Creates the file list for FFmpeg concatenation."""
if not audio_files:
raise ValueError("No audio files were generated to merge.")
print(f"Creating file list for ffmpeg at: {file_list_path}")
with open(file_list_path, 'w', encoding='utf-8') as f:
for audio_file in audio_files:
f.write(f"file '{os.path.basename(audio_file)}'\n")
print("Content of file_list.txt:")
with open(file_list_path, 'r', encoding='utf-8') as f:
print(f.read())
from typing import cast # Add import for cast
def _initialize_tts_adapter(config_data: dict, tts_providers_config_content: Optional[str] = None) -> TTSAdapter:
"""
根据配置数据初始化并返回相应的 TTS 适配器。
"""
tts_provider = config_data.get("tts_provider")
if not tts_provider:
raise ValueError("TTS provider is not specified in the configuration.")
tts_providers_config = {}
try:
if tts_providers_config_content:
tts_providers_config = json.loads(tts_providers_config_content)
else:
tts_providers_config_content = read_file_content(tts_providers_config_path)
tts_providers_config = json.loads(tts_providers_config_content)
except Exception as e:
print(f"Warning: Could not load tts_providers.json: {e}")
# 获取当前 tts_provider 的额外参数
current_tts_extra_params = tts_providers_config.get(tts_provider.split('-')[0], {}) # 例如 'doubao-tts' -> 'doubao'
if tts_provider == "index-tts":
api_url = config_data.get("apiUrl")
if not api_url:
raise ValueError("IndexTTS apiUrl is not configured.")
return IndexTTSAdapter(api_url_template=cast(str, api_url), tts_extra_params=cast(dict, current_tts_extra_params))
elif tts_provider == "edge-tts":
api_url = config_data.get("apiUrl")
if not api_url:
raise ValueError("EdgeTTS apiUrl is not configured.")
return EdgeTTSAdapter(api_url_template=cast(str, api_url), tts_extra_params=cast(dict, current_tts_extra_params))
elif tts_provider == "fish-audio":
api_url = config_data.get("apiUrl")
headers = config_data.get("headers")
request_payload = config_data.get("request_payload")
if not all([api_url, headers, request_payload]):
raise ValueError("FishAudio requires apiUrl, headers, and request_payload configuration.")
return FishAudioAdapter(api_url=cast(str, api_url), headers=cast(dict, headers), request_payload_template=cast(dict, request_payload), tts_extra_params=cast(dict, current_tts_extra_params))
elif tts_provider == "minimax":
api_url = config_data.get("apiUrl")
headers = config_data.get("headers")
request_payload = config_data.get("request_payload")
if not all([api_url, headers, request_payload]):
raise ValueError("Minimax requires apiUrl, headers, and request_payload configuration.")
return MinimaxAdapter(api_url=cast(str, api_url), headers=cast(dict, headers), request_payload_template=cast(dict, request_payload), tts_extra_params=cast(dict, current_tts_extra_params))
elif tts_provider == "doubao-tts":
api_url = config_data.get("apiUrl")
headers = config_data.get("headers")
request_payload = config_data.get("request_payload")
if not all([api_url, headers, request_payload]):
raise ValueError("DoubaoTTS requires apiUrl, headers, and request_payload configuration.")
return DoubaoTTSAdapter(api_url=cast(str, api_url), headers=cast(dict, headers), request_payload_template=cast(dict, request_payload), tts_extra_params=cast(dict, current_tts_extra_params))
elif tts_provider == "gemini-tts":
api_url = config_data.get("apiUrl")
headers = config_data.get("headers")
request_payload = config_data.get("request_payload")
if not all([api_url, headers, request_payload]):
raise ValueError("GeminiTTS requires apiUrl, headers, and request_payload configuration.")
return GeminiTTSAdapter(api_url=cast(str, api_url), headers=cast(dict, headers), request_payload_template=cast(dict, request_payload), tts_extra_params=cast(dict, current_tts_extra_params))
else:
raise ValueError(f"Unsupported TTS provider: {tts_provider}")
def generate_podcast_audio():
args = _parse_arguments()
config_data = _load_configuration()
api_key, base_url, model = _prepare_openai_settings(args, config_data)
input_prompt_content, overview_prompt, original_podscript_prompt = _read_prompt_files()
custom_content, input_prompt = _extract_custom_content(input_prompt_content)
podscript_prompt, pod_users, voices, turn_pattern = _prepare_podcast_prompts(config_data, original_podscript_prompt, custom_content, args.usetime, args.output_language)
print(f"\nInput Prompt (input.txt):\n{input_prompt[:100]}...")
print(f"\nOverview Prompt (prompt-overview.txt):\n{overview_prompt[:100]}...")
print(f"\nPodscript Prompt (prompt-podscript.txt):\n{podscript_prompt[:1000]}...")
overview_content, title, tags = _generate_overview_content(api_key, base_url, model, overview_prompt, input_prompt, args.output_language)
podcast_script = _generate_podcast_script(api_key, base_url, model, podscript_prompt, overview_content)
tts_adapter = _initialize_tts_adapter(config_data) # 初始化 TTS 适配器
audio_files = _generate_all_audio_files(podcast_script, config_data, tts_adapter, args.threads)
_create_ffmpeg_file_list(audio_files)
output_audio_filepath = merge_audio_files()
return {
"output_audio_filepath": output_audio_filepath,
"overview_content": overview_content,
"podcast_script": podcast_script,
"podUsers": pod_users,
}
def generate_podcast_audio_api(args, config_path: str, input_txt_content: str, tts_providers_config_content: str, podUsers_json_content: str) -> dict:
"""
Generates a podcast audio file based on the provided parameters.
Args:
api_key (str): OpenAI API key.
base_url (str): OpenAI API base URL.
model (str): OpenAI model to use.
threads (int): Number of threads for audio generation.
config_path (str): Path to the configuration JSON file.
input_txt_content (str): Content of the input prompt.
output_language (str): Language for the podcast overview and script (default: Chinese).
Returns:
str: The path to the generated audio file.
"""
print("Starting podcast audio generation...")
config_data = _load_configuration_path(config_path)
podUsers = json.loads(podUsers_json_content)
config_data["podUsers"] = podUsers
final_api_key, final_base_url, final_model = _prepare_openai_settings(args, config_data)
input_prompt, overview_prompt, original_podscript_prompt = _read_prompt_files()
custom_content, input_prompt = _extract_custom_content(input_txt_content)
# Assuming `output_language` is passed directly to the function
podscript_prompt, pod_users, voices, turn_pattern = _prepare_podcast_prompts(config_data, original_podscript_prompt, custom_content, args.usetime, args.output_language)
print(f"\nInput Prompt (from provided content):\n{input_prompt[:100]}...")
print(f"\nOverview Prompt (prompt-overview.txt):\n{overview_prompt[:100]}...")
print(f"\nPodscript Prompt (prompt-podscript.txt):\n{podscript_prompt[:1000]}...")
overview_content, title, tags = _generate_overview_content(final_api_key, final_base_url, final_model, overview_prompt, input_prompt, args.output_language)
podcast_script = _generate_podcast_script(final_api_key, final_base_url, final_model, podscript_prompt, overview_content)
tts_adapter = _initialize_tts_adapter(config_data, tts_providers_config_content) # 初始化 TTS 适配器
audio_files = _generate_all_audio_files(podcast_script, config_data, tts_adapter, args.threads)
_create_ffmpeg_file_list(audio_files)
output_audio_filepath = merge_audio_files()
audio_duration_seconds = get_audio_duration(os.path.join(output_dir, output_audio_filepath))
formatted_duration = "00:00"
if audio_duration_seconds is not None:
minutes = int(audio_duration_seconds // 60)
seconds = int(audio_duration_seconds % 60)
formatted_duration = f"{minutes:02}:{seconds:02}"
task_results = {
"output_audio_filepath": output_audio_filepath,
"overview_content": overview_content,
"podcast_script": podcast_script,
"podUsers": podUsers,
"audio_duration": formatted_duration,
"title": title,
"tags": tags,
}
return task_results
if __name__ == "__main__":
start_time = time.time()
try:
generate_podcast_audio()
except Exception as e:
print(f"\nError: An unexpected error occurred during podcast generation: {e}", file=sys.stderr)
sys.exit(1)
finally:
end_time = time.time()
execution_time = end_time - start_time
print(f"\nTotal execution time: {execution_time:.2f} seconds")