2025年Datawhale AI夏令营AIGC语音方向学习笔记 & 2025全球AI攻防挑战赛-赛道三比赛方案

赛事链接:

https://tianchi.aliyun.com/competition/entrance/532393/introduction

本次更新是笔者的成绩在82.16。由于比赛仍在进行中,这里公开一部分基础方案,完整比赛方案将于比赛结束后更新。

F5-TTS方案 (baseline)

安装环境

1
2
pip install f5-tts
export HF_ENDPOINT=https://hf-mirror.com

测试一下是否安装成功

1
f5-tts_infer-cli --model F5TTS_v1_Base --ref_audio "./aigc_speech_generation_tasks/reference_1.wav" --gen_text "六"

运行生成脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import shutil
import os, sys
import subprocess
import pandas as pd

os.makedirs("result/", exist_ok=True)
task = pd.read_csv("aigc_speech_generation_tasks/aigc_speech_generation_tasks.csv")

for row in task.iterrows():
if row[1].utt < 10:
subprocess.check_output(
f'f5-tts_infer-cli --model F5TTS_v1_Base --ref_audio "./aigc_speech_generation_tasks/{row[1].reference_speech}" --gen_text "{row[1].text}"',
shell=True
)
shutil.copy("tests/infer_cli_basic.wav", "result/" + str(row[1].utt) + ".wav")

task['synthesized_speech'] = [str(i) + ".wav" for i in range(1, 201)]

task.to_csv("result/result.csv", index=None)

输出的result文件夹即为比赛所需提交的文件,压缩后提交即可完成baseline。

提交结果check脚本

考虑到上传一个不合法的zip会浪费一次宝贵的机会,并且可能需要等很久才能意识到结果不合规。所以在提交之前有必要自己check一下合法性。

于是我写了一个check脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import sys
import zipfile
import csv
import io
import re

def check_submission(zip_path):
if not os.path.exists(zip_path):
print(f"❌ 文件不存在: {zip_path}")
return False

# 检查 zip 文件名
base_name = os.path.basename(zip_path)
if not re.match(r"^(.*-)?result\.zip$", base_name):
print(f"⚠️ 警告: 文件名应为 'result.zip' 或 '<队伍名>-result.zip'")

with zipfile.ZipFile(zip_path, 'r') as zf:
file_list = zf.namelist()

# 查找 CSV 文件
csv_files = [f for f in file_list if f.lower().endswith("-result.csv") or f.lower().endswith("/result.csv") or f.lower() == "result.csv"]
if len(csv_files) != 1:
print("❌ ZIP 包中必须且仅有一个 result.csv 或 <队伍名>-result.csv 文件")
return False
csv_file = csv_files[0]

# 识别CSV所在目录前缀,比如 "result/result.csv" --> "result/"
csv_dir = os.path.dirname(csv_file)
if csv_dir and not csv_dir.endswith("/"):
csv_dir += "/"

# 检查 CSV 编码及表头
try:
with zf.open(csv_file, 'r') as f:
content = f.read().decode("utf-8")
except UnicodeDecodeError:
print(f"❌ {csv_file} 不是 UTF-8 编码")
return False

reader = csv.reader(io.StringIO(content))
try:
header = next(reader)
except StopIteration:
print(f"❌ {csv_file} 文件为空")
return False

required_header = {"utt", "reference_speech", "text", "synthesized_speech"}
if set(header) != required_header:
print(f"❌ CSV 表头必须是 {required_header},当前为: {header}")
return False

idx_synth = header.index("synthesized_speech")

# 检查 synthesized_speech 对应的文件存在性
all_exist = True
for i, row in enumerate(reader, start=2):
if len(row) != len(header):
print(f"❌ 第 {i} 行列数不对,应为 {len(header)} 列")
return False
synth_file = row[idx_synth].strip()
# 拼接目录前缀
synth_path_in_zip = csv_dir + synth_file if csv_dir else synth_file
if synth_path_in_zip not in file_list:
print(f"❌ 第 {i} 行: 合成语音文件 {synth_file} (zip路径: {synth_path_in_zip}) 不在 ZIP 包中")
all_exist = False
elif not synth_path_in_zip.lower().endswith(".wav"):
print(f"❌ 第 {i} 行: 合成语音文件 {synth_file} 不是 WAV 格式")
all_exist = False

if not all_exist:
return False

print("✅ 格式检查通过")
return True


if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"用法: python {sys.argv[0]} <提交的zip文件>")
sys.exit(1)
check_submission(sys.argv[1])

baseline最终的结果呢大约是33分左右,实际听了一下生成的音频发现效果确实不咋地,其中还掺杂了不少无效音频,这个分倒是也不冤。

202508121826108.png (386×121)

Cosyvoice方案

Cosyvoice是阿里巴巴通义实验室开源的语音生成大模型,这里我选择使用的是CosyVoice2-0.5B模型。

安装环境

首先克隆仓库

1
2
3
4
git clone --recursive https://github.com/FunAudioLLM/CosyVoice.git
# 若是submodule下载失败,要先进入CosyVoice目录再执行以下命令,可以多次执行直到submodule安装成功,会有Successfully提示。
cd CosyVoice
git submodule update --init --recursive

创建虚拟环境

如果使用的是魔搭社区的notebook 初始应该是未安装conda的,可以通过以下命令进行安装

1
2
3
4
5
6
7
8
9
10
11
mkdir -p /mnt/workspace/miniconda3

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O /mnt/workspace/miniconda3/miniconda.sh

bash /mnt/workspace/miniconda3/miniconda.sh -b -u -p /mnt/workspace/miniconda3

rm /mnt/workspace/miniconda3/miniconda.sh

source /mnt/workspace/miniconda3/bin/activate

conda init --all

安装完conda后,接下来开始创建conda虚拟环境

1
2
3
4
conda create -n cosyvoice python=3.10
conda activate cosyvoice
conda install -y -c conda-forge pynini==2.1.5
pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/ --trusted-host=mirrors.aliyun.com

运行生成脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
from pathlib import Path
import shutil
import pandas as pd
import torchaudio
import torch
import sys

# 确保 third_party path 已经加入 sys.path
sys.path.append('third_party/Matcha-TTS')

from cosyvoice.cli.cosyvoice import CosyVoice2

# 配置路径
TASK_CSV = "data/aigc_speech_generation_tasks.csv"
REFERENCE_DIR = "data"
FALLBACK_WAV = "tests/infer_cli_basic.wav"
OUT_DIR = Path("result")
OUT_DIR.mkdir(parents=True, exist_ok=True)
OUT_CSV = OUT_DIR / "result.csv"

# 初始化 CosyVoice(只做一次)
cosyvoice = CosyVoice2('iic/CosyVoice2-0.5B', load_jit=False, load_trt=False, fp16=False)

# 读取任务表
task = pd.read_csv(TASK_CSV)

synthesized_files = []

for idx, row in task.iterrows():
utt = int(row['utt']) if 'utt' in row.index else idx + 1
text = str(row['text'])
ref_fname = str(row['reference_speech']) if 'reference_speech' in row.index else None

# 统一命名:synthesized_speech_1.wav, synthesized_speech_2.wav ...
out_fname = f"synthesized_speech_{utt}.wav"
out_path = OUT_DIR / out_fname

try:
# 加载并重采样参考音频
prompt_audio = None
if ref_fname:
ref_path = Path(REFERENCE_DIR) / ref_fname
if ref_path.exists():
wav, sr = torchaudio.load(str(ref_path))
target_sr = cosyvoice.sample_rate # 模型需要的采样率
if sr != target_sr:
wav = torchaudio.transforms.Resample(sr, target_sr)(wav)
prompt_audio = wav
else:
print(f"[Warning] reference not found for utt={utt}: {ref_path}")

# 生成语音
gen_iter = cosyvoice.inference_cross_lingual(text, prompt_audio, stream=False)
generated = None
for item in gen_iter:
generated = item.get('tts_speech', None)
if generated is not None:
break

if generated is None:
raise RuntimeError("CosyVoice did not return any tts_speech.")

if isinstance(generated, torch.Tensor):
gen = generated.detach().cpu()
if gen.dim() == 1:
gen = gen.unsqueeze(0)
else:
import numpy as np
gen = torch.from_numpy(np.asarray(generated))
if gen.dim() == 1:
gen = gen.unsqueeze(0)

torchaudio.save(str(out_path), gen, cosyvoice.sample_rate)
synthesized_files.append(out_fname)
print(f"[OK] synthesized utt={utt} -> {out_path}")

except Exception as e:
print(f"[Error] generation failed for utt={utt}: {e}. Using fallback audio.")
if Path(FALLBACK_WAV).exists():
shutil.copy(FALLBACK_WAV, out_path)
synthesized_files.append(out_fname)
else:
print(f"[Error] fallback file {FALLBACK_WAV} not found. Skipping utt={utt}.")
synthesized_files.append("")

# 保存结果 CSV(第四列为相对路径)
task['synthesized_speech'] = synthesized_files
task.to_csv(OUT_CSV, index=False, encoding='utf-8')
print(f"✅ Done. Results written to {OUT_DIR} and {OUT_CSV}")

运行结束之后,压缩一下提交即可。

1
zip -r result.zip result/

check脚本

同样,我们可以对生成的zip先check一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import sys
import zipfile
import csv
import io
import re

def check_submission(zip_path):
if not os.path.exists(zip_path):
print(f"❌ 文件不存在: {zip_path}")
return False

# 检查 zip 文件名
base_name = os.path.basename(zip_path)
if not re.match(r"^(.*-)?result\.zip$", base_name):
print(f"⚠️ 警告: 文件名应为 'result.zip' 或 '<队伍名>-result.zip'")

with zipfile.ZipFile(zip_path, 'r') as zf:
file_list = zf.namelist()

# 查找 CSV 文件
csv_files = [f for f in file_list if f.lower().endswith("-result.csv") or f.lower().endswith("/result.csv") or f.lower() == "result.csv"]
if len(csv_files) != 1:
print("❌ ZIP 包中必须且仅有一个 result.csv 或 <队伍名>-result.csv 文件")
return False
csv_file = csv_files[0]

# 识别CSV所在目录前缀,比如 "result/result.csv" --> "result/"
csv_dir = os.path.dirname(csv_file)
if csv_dir and not csv_dir.endswith("/"):
csv_dir += "/"

# 检查 CSV 编码及表头
try:
with zf.open(csv_file, 'r') as f:
content = f.read().decode("utf-8")
except UnicodeDecodeError:
print(f"❌ {csv_file} 不是 UTF-8 编码")
return False

reader = csv.reader(io.StringIO(content))
try:
header = next(reader)
except StopIteration:
print(f"❌ {csv_file} 文件为空")
return False

required_header = {"utt", "reference_speech", "text", "synthesized_speech"}
if set(header) != required_header:
print(f"❌ CSV 表头必须是 {required_header},当前为: {header}")
return False

idx_synth = header.index("synthesized_speech")

# 检查 synthesized_speech 对应的文件存在性
all_exist = True
for i, row in enumerate(reader, start=2):
if len(row) != len(header):
print(f"❌ 第 {i} 行列数不对,应为 {len(header)} 列")
return False
synth_file = row[idx_synth].strip()
# 拼接目录前缀
synth_path_in_zip = csv_dir + synth_file if csv_dir else synth_file
if synth_path_in_zip not in file_list:
print(f"❌ 第 {i} 行: 合成语音文件 {synth_file} (zip路径: {synth_path_in_zip}) 不在 ZIP 包中")
all_exist = False
elif not synth_path_in_zip.lower().endswith(".wav"):
print(f"❌ 第 {i} 行: 合成语音文件 {synth_file} 不是 WAV 格式")
all_exist = False

if not all_exist:
return False

print("✅ 格式检查通过")
return True


if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"用法: python {sys.argv[0]} <提交的zip文件>")
sys.exit(1)
check_submission(sys.argv[1])

本方案生成的成绩如下:

优先级递减的级联调用方案(新baseline)

本方案为社区大佬慷慨分析,作为比赛的新baseline,有许多我们可以借鉴之处,这里附上原文地址

  1. 多模型级联策略
    我们的方案按以下优先级顺序尝试生成语音:
  • 第一梯队(核心模型):Coqui-AI XTTS - 实现高质量声音克隆
    • 为什么是它? XTTS是一个强大的多语言零样本语音克隆模型。它能直接利用参考音频来驱动TTS(文本到语音)合成,完美契合赛题要求。这是我们冲击“语音相似度”得分的主力。
    • 优势:音色相似度高,支持中英文混合。
    • 挑战:模型较大,对环境有一定要求,且在某些极端情况下可能生成失败。
  • 第二梯队(备用方案):Microsoft Edge TTS - 保证高质量与自然度
    • 为什么是它? 当XTTS失败时,我们不能提交一个错误的音频。Edge-TTS是微软提供的一项高质量在线TTS服务,其合成的语音在自然度和清晰度上表现都非常出色。
    • 优势:合成质量极高,非常稳定,处理长文本能力强。
    • 劣势:它不具备声音克隆能力。这是一个“保底”方案,牺牲“相似度”分数,但能稳拿“可懂度”和“自然度”的分数,避免在某个任务上得零分。
  • 第三梯队(终极保障):pyttsx3 - 离线运行,永不失败
    • 为什么是它? 在极端情况下(如网络中断导致Edge-TTS失败),我们需要一个万无一失的方案。pyttsx3是一个纯本地的TTS库。
    • 优势:完全离线,100%能生成结果。
    • 劣势:音质和自然度较差,是所有方案中的下下策。它的存在只为一个目的:确保每个任务都有对应的输出文件,保证程序的完整性。
  1. 音频处理流程
    无论是参考音频还是生成的音频,我们都设计了处理环节来提升最终效果。
  • 预处理 (preprocess_audio):对输入的参考音频进行标准化。无论原始音频是何种采样率、通道数,我们都统一转换为16kHz单声道,并进行归一化。这能确保送入模型的音频格式是一致的,避免兼容性问题。
  • 后处理 (enhance_audio_quality):对模型生成的音频进行增强。我们使用预加重(Pre-emphasis)滤波器来提升高频成分,这能有效增强语音的清晰度,降低低频噪音的干扰。

安装环境

1
2
3
4
# 建议使用torch 2.3.1版本
pip install torch==2.3.1 torchaudio
# 安装其他依赖
pip install pandas numpy librosa soundfile tqdm pydub TTS edge-tts pyttsx3

注意:如果你在中国大陆,访问Hugging Face(TTS模型库的来源)可能会很慢。可以设置环境变量来使用镜像。

1
export HF_ENDPOINT=https://hf-mirror.com

代码拆解

核心函数

音频预处理 preprocess_audio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def preprocess_audio(audio_path, target_sr=16000):
"""Preprocess audio to ensure consistent format"""
try:
# 使用librosa加载,它能更好地处理各种格式
audio, sr = librosa.load(audio_path, sr=None)

# 如果采样率不是16kHz,则重采样
if sr != target_sr:
audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sr)

# 如果是立体声,则转为单声道
if len(audio.shape) > 1:
audio = librosa.to_mono(audio)

# 归一化,将振幅调整到[-0.9, 0.9]之间
audio = audio / np.max(np.abs(audio)) * 0.9

return audio, target_sr
except Exception as e:
print(f"Error processing {audio_path}: {e}")
return None, None

音频质量增强 enhance_audio_quality

1
2
3
4
5
6
7
8
9
10
11
12
13
def enhance_audio_quality(audio, sr):
"""Apply basic audio enhancement techniques"""
try:
# 应用预加重滤波器,提升语音清晰度
audio_filtered = librosa.effects.preemphasis(audio, coef=0.95)

# 再次归一化,确保音量均衡
audio_normalized = librosa.util.normalize(audio_filtered)

return audio_normalized
except Exception as e:
print(f"Error enhancing audio: {e}")
return audio

模型加载逻辑

代码使用try-except结构来动态加载模型,如果某个库不存在,它会尝试自动安装。这增强了代码的可移植性。

1
2
3
4
5
6
7
8
9
10
11
# 以XTTS为例
vits_available = False
try:
from TTS.api import TTS
# 注意:XTTS v2模型较大,首次运行会下载
tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2", gpu=torch.cuda.is_available())
vits_available = True
print("XTTS model loaded successfully")
except Exception as e:
print(f"XTTS model not available: {e}")
# 可以在这里加入自动安装逻辑

其他模型(edge-tts, pyttsx3)的加载逻辑与此类似。

主处理循环与级联调用

这是整个脚本的核心,它遍历所有任务并按优先级调用TTS模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
for idx, row in tqdm(task.iterrows(), total=len(task)):
# ... 获取任务信息 ...

# 预处理参考音频
ref_audio, sr = preprocess_audio(ref_audio_path)
if ref_audio is None:
# 如果参考音频处理失败,直接跳到备用方案
# ...
continue

success = False

# 1. 尝试使用XTTS
if vits_available:
try:
# ... 生成逻辑 ...
# XTTS需要参考音频路径,我们使用tempfile创建一个临时文件
tts.tts_to_file(
text=text,
file_path=output_path,
speaker_wav=temp_ref_path, # 临时参考音频文件
language="zh" if any('\u4e00' <= c <= '\u9fff' for c in text) else "en"
)
success = True
except Exception as e:
print(f"XTTS failed for task {utt_id}: {e}")

# 2. 如果XTTS失败,尝试Edge-TTS
if not success and edge_tts_available:
try:
# Edge-TTS使用asyncio,需要特别处理
import asyncio

async def edge_tts_generate():
# ...
communicate = edge_tts.Communicate(text, voice)
await communicate.save(temp_output) # 保存为mp3

asyncio.run(edge_tts_generate())

# 将mp3转换为wav
audio = AudioSegment.from_mp3(temp_output)
audio.export(output_path, format="wav")

success = True
except Exception as e:
print(f"Edge TTS failed for task {utt_id}: {e}")

# 3. 如果全部失败,使用pyttsx3作为最后保障
if not success and pyttsx3_available:
# ... pyttsx3生成逻辑 ...
success = True

# 4. 如果连pyttsx3都失败了(极罕见),生成一个静音文件
if not success:
print(f"All TTS models failed for task {utt_id}, using fallback")
# 创建一个2秒的静音音频,采样率为16kHz
silence = np.zeros(16000 * 2)
sf.write(output_path, silence, 16000)

结果打包

最后,脚本会更新pandas的DataFrame,添加生成的文件名,保存为result.csv,并调用zip命令打包整个result文件夹,生成最终的提交文件。

1
zip -r result.zip result/

完整脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import os
import sys
import subprocess
import pandas as pd
import shutil
import numpy as np
import torch
import torchaudio
from tqdm import tqdm
import librosa
import soundfile as sf
from pydub import AudioSegment
import tempfile
import time
import warnings
warnings.filterwarnings("ignore")

# 检查PyTorch版本
print(f"PyTorch版本: {torch.__version__}")

# 创建必要目录
os.makedirs("result/", exist_ok=True)

# 读取任务数据
print("正在读取任务数据...")
task = pd.read_csv("aigc_speech_generation_tasks/aigc_speech_generation_tasks.csv")

# 预处理音频的函数定义
def preprocess_audio(audio_path, target_sr=16000):
"""预处理音频以确保格式一致"""
try:
# 使用librosa加载音频处理不同格式
audio, sr = librosa.load(audio_path, sr=None)

# 重新采样
if sr != target_sr:
audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sr)

# 转换为单声道
if len(audio.shape) > 1:
audio = librosa.to_mono(audio)

# 归一化音频
audio = audio / np.max(np.abs(audio)) * 0.9

return audio, target_sr
except Exception as e:
print(f"处理音频{audio_path}时出错: {e}")
return None, None

# 音频质量增强函数
def enhance_audio_quality(audio, sr):
"""应用基础音频增强技术"""
try:
# 应用高通滤波器减少低频噪声
audio_filtered = librosa.effects.preemphasis(audio, coef=0.95)

# 标准化音频电平
audio_normalized = librosa.util.normalize(audio_filtered)

return audio_normalized
except Exception as e:
print(f"音频增强出错: {e}")
return audio

# 尝试加载TTS模型 - 使用VITS因其广泛可用性
print("正在加载TTS模型...")

vits_available = False
try:
from TTS.api import TTS
tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2", gpu=torch.cuda.is_available())
vits_available = True
print("XTTS模型加载成功")
except Exception as e:
print(f"XTTS模型不可用: {e}")
print("正在尝试安装TTS...")
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "TTS"])
from TTS.api import TTS
tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2", gpu=torch.cuda.is_available())
vits_available = True
print("安装后XTTS模型加载成功")
except Exception as e:
print(f"安装和加载XTTS失败: {e}")

# 尝试加载edge-tts作为备用方案(Microsoft Edge TTS)
edge_tts_available = False
try:
import edge_tts
edge_tts_available = True
print("Edge TTS可用")
except ImportError:
print("Edge TTS不可用,正在尝试安装...")
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "edge-tts"])
import edge_tts
edge_tts_available = True
print("Edge TTS安装成功")
except Exception as e:
print(f"安装Edge TTS失败: {e}")

# 尝试加载pyttsx3作为最后方案
pyttsx3_available = False
try:
import pyttsx3
pyttsx3_available = True
print("pyttsx3可用")
except ImportError:
print("pyttsx3不可用,正在尝试安装...")
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "pyttsx3"])
import pyttsx3
pyttsx3_available = True
print("pyttsx3安装成功")
except Exception as e:
print(f"安装pyttsx3失败: {e}")

# 处理每个任务
print(f"正在处理{len(task)}个任务...")
for idx, row in tqdm(task.iterrows(), total=len(task)):
utt_id = row['utt']
ref_audio_path = os.path.join("aigc_speech_generation_tasks", row['reference_speech'])
text = row['text']
output_path = os.path.join("result", f"{utt_id}.wav")

# 处理参考音频
ref_audio, sr = preprocess_audio(ref_audio_path)
if ref_audio is None:
print(f"警告:无法处理任务{utt_id}的参考音频,使用备用方案")
if pyttsx3_available:
try:
engine = pyttsx3.init()
engine.save_to_file(text, output_path)
engine.runAndWait()
continue
except:
pass

if os.path.exists("tests/infer_cli_basic.wav"):
shutil.copy("tests/infer_cli_basic.wav", output_path)
continue

# 尝试不同TTS模型按优先级顺序
success = False

# 优先尝试XTTS
if vits_available:
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_ref:
temp_ref_path = temp_ref.name
sf.write(temp_ref_path, ref_audio, sr)

# 生成语音
tts.tts_to_file(
text=text,
file_path=output_path,
speaker_wav=temp_ref_path,
language="zh" if any('\u4e00' <= c <= '\u9fff' for c in text) else "en"
)

os.unlink(temp_ref_path)

if os.path.exists(output_path):
gen_audio, gen_sr = preprocess_audio(output_path)
if gen_audio is not None:
enhanced_audio = enhance_audio_quality(gen_audio, gen_sr)
sf.write(output_path, enhanced_audio, gen_sr)
success = True
except Exception as e:
print(f"任务{utt_id}的XTTS生成失败: {e}")

# Edge TTS作为第二选择
if not success and edge_tts_available:
try:
temp_output = f"temp_edge_{utt_id}.mp3"

# 使用asyncio运行Edge TTS
import asyncio

async def edge_tts_generate():
voice = "zh-CN-XiaoxiaoNeural" # 默认中文发音
if not any('\u4e00' <= c <= '\u9fff' for c in text): # 非中文
voice = "en-US-AriaNeural" # 英文发音

communicate = edge_tts.Communicate(text, voice)
await communicate.save(temp_output)

asyncio.run(edge_tts_generate())

if os.path.exists(temp_output):
audio = AudioSegment.from_mp3(temp_output)
audio.export(output_path, format="wav")
os.remove(temp_output)

gen_audio, gen_sr = preprocess_audio(output_path)
if gen_audio is not None:
enhanced_audio = enhance_audio_quality(gen_audio, gen_sr)
sf.write(output_path, enhanced_audio, gen_sr)
success = True
except Exception as e:
print(f"任务{utt_id}的Edge TTS生成失败: {e}")

# 最后使用pyttsx3
if not success and pyttsx3_available:
try:
engine = pyttsx3.init()
engine.save_to_file(text, output_path)
engine.runAndWait()
success = os.path.exists(output_path)
except Exception as e:
print(f"任务{utt_id}的pyttsx3生成失败: {e}")

# 全部失败时使用备用方案
if not success:
print(f"任务{utt_id}所有TTS模型均失败,使用备用方案")
if os.path.exists("tests/infer_cli_basic.wav"):
shutil.copy("tests/infer_cli_basic.wav", output_path)
else:
silence = np.zeros(sr * 2) # 创建2秒静音
sf.write(output_path, silence, sr)

time.sleep(0.1)

# 更新任务数据表中的合成语音文件名
task['synthesized_speech'] = [f"{i}.wav" for i in task['utt']]

# 保存结果CSV文件
task.to_csv("result/result.csv", index=None, encoding="utf-8")

# 打包结果文件夹
print("正在创建提交的压缩包...")
subprocess.run("zip -r result.zip result/", shell=True)

print("完成!提交文件已生成:result.zip")