27 Commits

Author SHA1 Message Date
刘鑫
dc6b6d1d1c Fx: capture compile error on Windows 2025-09-18 19:23:13 +08:00
刘鑫
cef6aefb3d remove \n from input text 2025-09-18 14:57:45 +08:00
周逸轩
1a46c5d1ad update README 2025-09-18 14:53:37 +08:00
周逸轩
5257ec3dc5 FX: noise point 2025-09-18 14:50:01 +08:00
刘鑫
bdd516b579 remove target text anotation 2025-09-18 13:07:43 +08:00
刘鑫
11568f0776 remove target text anotation 2025-09-18 12:58:27 +08:00
刘鑫
e5bcb735f0 Remove segment text logic 2025-09-18 12:02:37 +08:00
周逸轩
1fa9e2ca02 update README 2025-09-18 01:21:45 +08:00
周逸轩
10f48ba330 update README 2025-09-17 19:36:32 +08:00
周逸轩
639b2272ab update README 2025-09-17 19:34:08 +08:00
周逸轩
7e8f754ba1 update README 2025-09-17 19:33:37 +08:00
刘鑫
032c7fe403 capture torch compile error 2025-09-17 18:09:09 +08:00
刘鑫
5390a47862 Merge branch 'dev'; Replace the text normalization library 2025-09-16 22:17:30 +08:00
刘鑫
e7012f1a94 Replace the text normalization library 2025-09-16 22:17:14 +08:00
刘鑫
82332cfc99 Replace the text normalization library 2025-09-16 22:17:14 +08:00
刘鑫
605ac2d8e4 Replace the text normalization library 2025-09-16 22:16:40 +08:00
周逸轩
0fa8d894d1 update README 2025-09-16 21:33:57 +08:00
周逸轩
776c0d19fb FX: typo 2025-09-16 19:40:27 +08:00
周逸轩
ed6e6b4dac FX: typo 2025-09-16 19:37:55 +08:00
周逸轩
e3108d4a12 FX: typo 2025-09-16 19:36:17 +08:00
周逸轩
59fe3f30a1 update README 2025-09-16 19:05:00 +08:00
周逸轩
6f2fb45756 ModelScope 2025-09-16 17:12:52 +08:00
周逸轩
91128d823d ModelScope 2025-09-16 17:12:52 +08:00
刘鑫
436e8cd6e5 set default repo id 2025-09-16 16:52:42 +08:00
刘鑫
11574ae93d surport load model from local path 2025-09-16 16:46:44 +08:00
zengguoyang
706403187e update requirements for zipenhancer 2025-09-16 16:15:10 +08:00
zengguoyang
38a76704ee update requirements for zipenhancer 2025-09-16 16:06:03 +08:00
8 changed files with 180 additions and 145 deletions

View File

@@ -1,13 +1,20 @@
## 🎙️ VoxCPM: Tokenizer-Free TTS for Context-Aware Speech Generation and True-to-Life Voice Cloning
[![Project Page](https://img.shields.io/badge/Project%20Page-GitHub-blue)](https://github.com/OpenBMB/VoxCPM/) [![Hugging Face](https://img.shields.io/badge/%F0%9F%A4%97%20Hugging%20Face-OpenBMB-yellow)](https://huggingface.co/openbmb/VoxCPM-0.5B) [![Live Playground](https://img.shields.io/badge/Live%20PlayGround-Demo-orange)](https://huggingface.co/spaces/OpenBMB/VoxCPM-Demo) [![Samples](https://img.shields.io/badge/Page-Samples-red)](https://thuhcsi.github.io/VoxCPM/)
[![Project Page](https://img.shields.io/badge/Project%20Page-GitHub-blue)](https://github.com/OpenBMB/VoxCPM/) [![Hugging Face](https://img.shields.io/badge/%F0%9F%A4%97%20Hugging%20Face-OpenBMB-yellow)](https://huggingface.co/openbmb/VoxCPM-0.5B) [![ModelScope](https://img.shields.io/badge/ModelScope-OpenBMB-purple)](https://modelscope.cn/models/OpenBMB/VoxCPM-0.5B) [![Live Playground](https://img.shields.io/badge/Live%20PlayGround-Demo-orange)](https://huggingface.co/spaces/OpenBMB/VoxCPM-Demo) [![Samples](https://img.shields.io/badge/Page-Samples-red)](https://openbmb.github.io/VoxCPM-demopage)
<div align="center">
<img src="assets/voxcpm_logo.png" alt="VoxCPM Logo" width="40%">
</div>
<div align="center">
👋 Contact us on [WeChat](assets/wechat.png)
</div>
## News
* [2025.09.16] 🔥 🔥 🔥 We Open Source the VoxCPM-0.5B [weights](https://huggingface.co/openbmb/VoxCPM-0.5B)!
* [2025.09.16] 🎉 🎉 🎉 We Provide the [Gradio PlayGround](https://huggingface.co/spaces/OpenBMB/VoxCPM-Demo) for VoxCPM-0.5B, try it now!
@@ -32,11 +39,6 @@ Unlike mainstream approaches that convert speech to discrete tokens, VoxCPM uses
## Quick Start
### 🔧 Install from PyPI
@@ -87,10 +89,10 @@ After installation, the entry point is `voxcpm` (or use `python -m voxcpm.cli`).
```bash
# 1) Direct synthesis (single text)
voxcpm --text "Hello VoxCPM" --output out.wav
voxcpm --text "VoxCPM is an innovative end-to-end TTS model from ModelBest, designed to generate highly expressive speech." --output out.wav
# 2) Voice cloning (reference audio + transcript)
voxcpm --text "Hello" \
voxcpm --text "VoxCPM is an innovative end-to-end TTS model from ModelBest, designed to generate highly expressive speech." \
--prompt-audio path/to/voice.wav \
--prompt-text "reference transcript" \
--output out.wav \
@@ -238,6 +240,13 @@ VoxCPM achieves competitive results on public zero-shot TTS benchmarks:
## 📝TO-DO List
Please stay tuned for updates!
- [ ] Release the VoxCPM technical report.
- [ ] Support higher sampling rate (next version).
## 📄 License
The VoxCPM model weights and code are open-sourced under the [Apache-2.0](LICENSE) license.
@@ -258,10 +267,14 @@ This project is developed by the following institutions:
- <img src="assets/thuhcsi_logo.png" width="28px"> [THUHCSI](https://github.com/thuhcsi)
## ⭐ Star History
[![Star History Chart](https://api.star-history.com/svg?repos=OpenBMB/VoxCPM&type=Date)](https://star-history.com/#OpenBMB/VoxCPM&Date)
## 📚 Citation
The techical report is coming soon, please wait for the release 😊
If you find our model helpful, please consider citing our projects 📝 and staring us ⭐️!
```bib

11
app.py
View File

@@ -170,7 +170,7 @@ def create_demo_interface(demo: VoxCPMDemo):
# Pro Tips
with gr.Accordion("💡 Pro Tips |使用建议", open=False, elem_id="acc_tips"):
gr.Markdown(f"""
gr.Markdown("""
### Prompt Speech Enhancement参考语音降噪
- **Enable** to remove background noise for a clean, studio-like voice, with an external ZipEnhancer component.
**启用**:通过 ZipEnhancer 组件消除背景噪音,获得更好的音质。
@@ -194,10 +194,6 @@ def create_demo_interface(demo: VoxCPMDemo):
**调低**:合成速度更快。
- **Higher** for better synthesis quality.
**调高**:合成质量更佳。
### Long Text (e.g., >5 min speech)|长文本 (如 >5分钟的合成语音)
While VoxCPM can handle long texts directly, we recommend using empty lines to break very long content into paragraphs; the model will then synthesize each paragraph individually.
虽然 VoxCPM 支持直接生成长文本,但如果目标文本过长,我们建议使用换行符将内容分段;模型将对每个段落分别合成。
""")
# Main controls
@@ -206,7 +202,7 @@ def create_demo_interface(demo: VoxCPMDemo):
prompt_wav = gr.Audio(
sources=["upload", 'microphone'],
type="filepath",
label="Prompt Speech",
label="Prompt Speech (Optional, or let VoxCPM improvise)",
value="./examples/example.wav",
)
DoDenoisePromptAudio = gr.Checkbox(
@@ -244,14 +240,13 @@ def create_demo_interface(demo: VoxCPMDemo):
text = gr.Textbox(
value="VoxCPM is an innovative end-to-end TTS model from ModelBest, designed to generate highly realistic speech.",
label="Target Text",
info="Default processing splits text on \\n into paragraphs; each is synthesized as a chunk and then concatenated into the final audio."
)
with gr.Row():
DoNormalizeText = gr.Checkbox(
value=False,
label="Text Normalization",
elem_id="chk_normalize",
info="We use WeTextPorcessing library to normalize the input text."
info="We use wetext library to normalize the input text."
)
audio_output = gr.Audio(label="Output Audio")

BIN
assets/wechat.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.5 KiB

View File

@@ -34,11 +34,14 @@ dependencies = [
"gradio",
"inflect",
"addict",
"WeTextProcessing",
"modelscope",
"wetext",
"modelscope>=1.22.0",
"datasets>=3,<4",
"huggingface-hub",
"pydantic",
"tqdm",
"simplejson",
"sortedcontainers",
"soundfile",
"funasr",
"spaces"

View File

@@ -1,13 +1,10 @@
import torch
import torchaudio
import os
import re
import tempfile
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from huggingface_hub import snapshot_download
from .model.voxcpm import VoxCPMModel
from .utils.text_normalize import TextNormalizer
class VoxCPM:
def __init__(self,
@@ -27,16 +24,16 @@ class VoxCPM:
"""
print(f"voxcpm_model_path: {voxcpm_model_path}, zipenhancer_model_path: {zipenhancer_model_path}, enable_denoiser: {enable_denoiser}")
self.tts_model = VoxCPMModel.from_local(voxcpm_model_path)
self.text_normalizer = TextNormalizer()
self.text_normalizer = None
if enable_denoiser and zipenhancer_model_path is not None:
self.denoiser = pipeline(
Tasks.acoustic_noise_suppression,
model=zipenhancer_model_path)
from .zipenhancer import ZipEnhancer
self.denoiser = ZipEnhancer(zipenhancer_model_path)
else:
self.denoiser = None
print("Warm up VoxCPMModel...")
self.tts_model.generate(
target_text="Hello, this is the first test sentence."
target_text="Hello, this is the first test sentence.",
max_len=10,
)
@classmethod
@@ -50,7 +47,7 @@ class VoxCPM:
"""Instantiate ``VoxCPM`` from a Hugging Face Hub snapshot.
Args:
hf_model_id: Explicit Hugging Face repository id (e.g. "org/repo").
hf_model_id: Explicit Hugging Face repository id (e.g. "org/repo") or local path.
load_denoiser: Whether to initialize the denoiser pipeline.
zipenhancer_model_id: Denoiser model id or path for ModelScope
acoustic noise suppression.
@@ -67,14 +64,19 @@ class VoxCPM:
``hf_model_id`` is provided.
"""
repo_id = hf_model_id
if not repo_id or repo_id.strip() == "":
raise ValueError("You must provide a valid hf_model_id")
if not repo_id:
raise ValueError("You must provide hf_model_id")
local_path = snapshot_download(
repo_id=repo_id,
cache_dir=cache_dir,
local_files_only=local_files_only,
)
# Load from local path if provided
if os.path.isdir(repo_id):
local_path = repo_id
else:
# Otherwise, try from_pretrained (Hub); exit on failure
local_path = snapshot_download(
repo_id=repo_id,
cache_dir=cache_dir,
local_files_only=local_files_only,
)
return cls(
voxcpm_model_path=local_path,
@@ -82,12 +84,6 @@ class VoxCPM:
enable_denoiser=load_denoiser,
)
def _normalize_loudness(self, wav_path: str):
audio, sr = torchaudio.load(wav_path)
loudness = torchaudio.functional.loudness(audio, sr)
normalized_audio = torchaudio.functional.gain(audio, -20-loudness)
torchaudio.save(wav_path, normalized_audio, sr)
def generate(self,
text : str,
prompt_wav_path : str = None,
@@ -125,9 +121,18 @@ class VoxCPM:
Returns:
numpy.ndarray: 1D waveform array (float32) on CPU.
"""
texts = text.split("\n")
texts = [t.strip() for t in texts if t.strip()]
final_wav = []
if not text.strip() or not isinstance(text, str):
raise ValueError("target text must be a non-empty string")
if prompt_wav_path is not None:
if not os.path.exists(prompt_wav_path):
raise FileNotFoundError(f"prompt_wav_path does not exist: {prompt_wav_path}")
if (prompt_wav_path is None) != (prompt_text is None):
raise ValueError("prompt_wav_path and prompt_text must both be provided or both be None")
text = text.replace("\n", " ")
text = re.sub(r'\s+', ' ', text)
temp_prompt_wav_path = None
try:
@@ -135,9 +140,7 @@ class VoxCPM:
if denoise and self.denoiser is not None:
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file:
temp_prompt_wav_path = tmp_file.name
self.denoiser(prompt_wav_path, output_path=temp_prompt_wav_path)
self._normalize_loudness(temp_prompt_wav_path)
self.denoiser.enhance(prompt_wav_path, output_path=temp_prompt_wav_path)
prompt_wav_path = temp_prompt_wav_path
fixed_prompt_cache = self.tts_model.build_prompt_cache(
prompt_wav_path=prompt_wav_path,
@@ -146,32 +149,25 @@ class VoxCPM:
else:
fixed_prompt_cache = None # will be built from the first inference
for sub_text in texts:
if sub_text.strip() == "":
continue
print("sub_text:", sub_text)
if normalize:
sub_text = self.text_normalizer.normalize(sub_text)
wav, target_text_token, generated_audio_feat = self.tts_model.generate_with_prompt_cache(
target_text=sub_text,
prompt_cache=fixed_prompt_cache,
min_len=2,
max_len=max_length,
inference_timesteps=inference_timesteps,
cfg_value=cfg_value,
retry_badcase=retry_badcase,
retry_badcase_max_times=retry_badcase_max_times,
retry_badcase_ratio_threshold=retry_badcase_ratio_threshold,
)
if fixed_prompt_cache is None:
fixed_prompt_cache = self.tts_model.merge_prompt_cache(
original_cache=None,
new_text_token=target_text_token,
new_audio_feat=generated_audio_feat
)
final_wav.append(wav)
if normalize:
if self.text_normalizer is None:
from .utils.text_normalize import TextNormalizer
self.text_normalizer = TextNormalizer()
text = self.text_normalizer.normalize(text)
return torch.cat(final_wav, dim=1).squeeze(0).cpu().numpy()
wav, target_text_token, generated_audio_feat = self.tts_model.generate_with_prompt_cache(
target_text=text,
prompt_cache=fixed_prompt_cache,
min_len=2,
max_len=max_length,
inference_timesteps=inference_timesteps,
cfg_value=cfg_value,
retry_badcase=retry_badcase,
retry_badcase_max_times=retry_badcase_max_times,
retry_badcase_ratio_threshold=retry_badcase_ratio_threshold,
)
return wav.squeeze(0).cpu().numpy()
finally:
if temp_prompt_wav_path and os.path.exists(temp_prompt_wav_path):

View File

@@ -148,12 +148,20 @@ class VoxCPMModel(nn.Module):
def optimize(self):
if self.device == "cuda":
try:
if self.device != "cuda":
raise ValueError("VoxCPMModel can only be optimized on CUDA device")
try:
import triton
except:
raise ValueError("triton is not installed")
self.base_lm.forward_step = torch.compile(self.base_lm.forward_step, mode="reduce-overhead", fullgraph=True)
self.residual_lm.forward_step = torch.compile(self.residual_lm.forward_step, mode="reduce-overhead", fullgraph=True)
self.feat_encoder_step = torch.compile(self.feat_encoder, mode="reduce-overhead", fullgraph=True)
self.feat_decoder.estimator = torch.compile(self.feat_decoder.estimator, mode="reduce-overhead", fullgraph=True)
else:
except Exception as e:
print(f"Error: {e}")
print("Warning: VoxCPMModel can not be optimized by torch.compile, using original forward_step functions")
self.base_lm.forward_step = self.base_lm.forward_step
self.residual_lm.forward_step = self.residual_lm.forward_step
self.feat_encoder_step = self.feat_encoder
@@ -276,7 +284,10 @@ class VoxCPMModel(nn.Module):
break
else:
break
return self.audio_vae.decode(latent_pred.to(torch.float32)).squeeze(1).cpu()
decode_audio = self.audio_vae.decode(latent_pred.to(torch.float32)).squeeze(1).cpu()
decode_audio = decode_audio[..., 640:-640] # trick: trim the start and end of the audio
return decode_audio
@torch.inference_mode()
def build_prompt_cache(
@@ -314,7 +325,7 @@ class VoxCPMModel(nn.Module):
audio = torch.nn.functional.pad(audio, (0, patch_len - audio.size(1) % patch_len))
# extract audio features
audio_feat = self.audio_vae.encode(audio.cuda(), self.sample_rate).cpu()
audio_feat = self.audio_vae.encode(audio.to(self.device), self.sample_rate).cpu()
audio_feat = audio_feat.view(
self.audio_vae.latent_dim,
@@ -460,6 +471,7 @@ class VoxCPMModel(nn.Module):
else:
break
decode_audio = self.audio_vae.decode(latent_pred.to(torch.float32)).squeeze(1).cpu()
decode_audio = decode_audio[..., 640:-640] # trick: trim the start and end of the audio
return (
decode_audio,
@@ -572,7 +584,6 @@ class VoxCPMModel(nn.Module):
pred_feat_seq = torch.cat(pred_feat_seq, dim=1) # b, t, p, d
feat_pred = rearrange(pred_feat_seq, "b t p d -> b d (t p)", b=B, p=self.patch_size)
feat_pred = feat_pred[..., 1:-1] # trick: remove the first and last token
return feat_pred, pred_feat_seq.squeeze(0).cpu()
@classmethod

View File

@@ -3,40 +3,7 @@ import re
import regex
import inflect
from functools import partial
from tn.chinese.normalizer import Normalizer as ZhNormalizer
from tn.english.normalizer import Normalizer as EnNormalizer
def normal_cut_sentence(text):
# 先处理括号内的逗号,将其替换为特殊标记
text = re.sub(r'([(][^)]*)([,])([^)]*[)])', r'\1&&&\3', text)
text = re.sub('([。!,?\?])([^’”])',r'\1\n\2',text)#普通断句符号且后面没有引号
text = re.sub('(\.{6})([^’”])',r'\1\n\2',text)#英文省略号且后面没有引号
text = re.sub('(\{2})([^’”])',r'\1\n\2',text)#中文省略号且后面没有引号
text = re.sub('([. ,。!;?\?\.{6}\{2}][’”])([^’”])',r'\1\n\2',text)#断句号+引号且后面没有引号
# 处理英文句子的分隔
text = re.sub(r'([.,!?])([^’”\'"])', r'\1\n\2', text) # 句号、感叹号、问号后面没有引号
text = re.sub(r'([.!?][’”\'"])([^’”\'"])', r'\1\n\2', text) # 句号、感叹号、问号加引号后面的部分
text = re.sub(r'([(][^)]*)(&&&)([^)]*[)])', r'\1\3', text)
text = [t for t in text.split("\n") if t]
return text
def cut_sentence_with_fix_length(text : str, length : int):
sentences = normal_cut_sentence(text)
cur_length = 0
res = ""
for sentence in sentences:
if not sentence:
continue
if cur_length > length or cur_length + len(sentence) > length:
yield res
res = ""
cur_length = 0
res += sentence
cur_length += len(sentence)
if res:
yield res
from wetext import Normalizer
chinese_char_pattern = re.compile(r'[\u4e00-\u9fff]+')
@@ -195,8 +162,8 @@ def clean_text(text):
class TextNormalizer:
def __init__(self, tokenizer=None):
self.tokenizer = tokenizer
self.zh_tn_model = ZhNormalizer(remove_erhua=False, full_to_half=False, remove_interjections=False, overwrite_cache=True)
self.en_tn_model = EnNormalizer()
self.zh_tn_model = Normalizer(lang="zh", operator="tn", remove_erhua=True)
self.en_tn_model = Normalizer(lang="en", operator="tn")
self.inflect_parser = inflect.engine()
def normalize(self, text, split=False):
@@ -207,38 +174,12 @@ class TextNormalizer:
text = text.replace("=", "等于") # 修复 ”550 + 320 等于 870 千卡。“ 被错误正则为 ”五百五十加三百二十等于八七十千卡.“
if re.search(r'([\d$%^*_+≥≤≠×÷?=])', text): # 避免 英文连字符被错误正则为减
text = re.sub(r'(?<=[a-zA-Z0-9])-(?=\d)', ' - ', text) # 修复 x-2 被正则为 x负2
text = self.zh_tn_model.normalize(text)
text = re.sub(r'(?<=[a-zA-Z0-9])-(?=\d)', ' - ', text) # 修复 x-2 被正则为 x负2
text = self.zh_tn_model.normalize(text)
text = replace_blank(text)
text = replace_corner_mark(text)
text = remove_bracket(text)
text = re.sub(r'[,]+$', '', text)
else:
text = self.en_tn_model.normalize(text)
text = spell_out_number(text, self.inflect_parser)
if split is False:
return text
if __name__ == "__main__":
text_normalizer = TextNormalizer()
text = r"""今天我们学习一元二次方程。一元二次方程的标准形式是:
ax2+bx+c=0ax^2 + bx + c = 0ax2+bx+c=0
其中aaa、bbb 和 ccc 是常数xxx 是变量。这个方程的解可以通过求根公式来找到。
一元二次方程的解法有几种:
- 因式分解法通过将方程因式分解来求解。我们首先尝试将方程表达成两个括号的形式解决方程的解。比如方程x25x+6=0x^2 - 5x + 6 = 0x25x+6=0可以因式分解为(x2)(x3)=0(x - 2)(x - 3) = 0(x2)(x3)=0因此根为2和3。
- 配方法:通过配方将方程转化为完全平方的形式,从而解出。我们通过加上或减去适当的常数来完成这一过程,使得方程可以直接写成一个完全平方的形式。
- 求根公式:我们可以使用求根公式直接求出方程的解。这个公式适用于所有的一元二次方程,即使我们无法通过因式分解或配方法来解决时,也能使用该公式。
公式x=b±b24ac2ax = \frac{-b \pm \sqrt{b^2 - 4ac}}{2a}x=2ab±b24ac这个公式可以帮助我们求解任何一元二次方程的根。
对于一元二次方程,我们需要了解判别式。判别式的作用是帮助我们判断方程的解的个数和性质。判别式 Δ\DeltaΔ 由下式给出:Δ=b24ac\Delta = b^2 - 4acΔ=b24ac 根据判别式的值,我们可以知道:
- 如果 Δ>0\Delta > 0Δ>0方程有两个不相等的实数解。这是因为判别式大于0时根号内的值是正数所以我们可以得到两个不同的解。
- 如果 Δ=0\Delta = 0Δ=0方程有一个实数解。这是因为根号内的值为零导致两个解相等也就是说方程有一个解。
- 如果 Δ<0\Delta < 0Δ<0方程没有实数解。这意味着根号内的值是负数无法进行实数运算因此方程没有实数解可能有复数解。"""
texts = ["这是一个公式 (a+b)³=a³+3a²b+3ab²+b³ S=(a×b)÷2", "这样的发展为AI仅仅作为“工具”这一观点提出了新的挑战", "550 + 320 = 870千卡。", "解一元二次方程3x^2+x-2=0", "你好啊"]
texts = [text]
for text in texts:
text = text_normalizer.normalize(text)
print(text)
for t in cut_sentence_with_fix_length(text, 15):
print(t)

76
src/voxcpm/zipenhancer.py Normal file
View File

@@ -0,0 +1,76 @@
"""
ZipEnhancer Module - Audio Denoising Enhancer
Provides on-demand import ZipEnhancer functionality for audio denoising processing.
Related dependencies are imported only when denoising functionality is needed.
"""
import os
import tempfile
from typing import Optional, Union
import torchaudio
import torch
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
class ZipEnhancer:
"""ZipEnhancer Audio Denoising Enhancer"""
def __init__(self, model_path: str = "iic/speech_zipenhancer_ans_multiloss_16k_base"):
"""
Initialize ZipEnhancer
Args:
model_path: ModelScope model path or local path
"""
self.model_path = model_path
self._pipeline = pipeline(
Tasks.acoustic_noise_suppression,
model=self.model_path
)
def _normalize_loudness(self, wav_path: str):
"""
Audio loudness normalization
Args:
wav_path: Audio file path
"""
audio, sr = torchaudio.load(wav_path)
loudness = torchaudio.functional.loudness(audio, sr)
normalized_audio = torchaudio.functional.gain(audio, -20-loudness)
torchaudio.save(wav_path, normalized_audio, sr)
def enhance(self, input_path: str, output_path: Optional[str] = None,
normalize_loudness: bool = True) -> str:
"""
Audio denoising enhancement
Args:
input_path: Input audio file path
output_path: Output audio file path (optional, creates temp file by default)
normalize_loudness: Whether to perform loudness normalization
Returns:
str: Output audio file path
Raises:
RuntimeError: If pipeline is not initialized or processing fails
"""
if not os.path.exists(input_path):
raise FileNotFoundError(f"Input audio file does not exist: {input_path}")
# Create temporary file if no output path is specified
if output_path is None:
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file:
output_path = tmp_file.name
try:
# Perform denoising processing
self._pipeline(input_path, output_path=output_path)
# Loudness normalization
if normalize_loudness:
self._normalize_loudness(output_path)
return output_path
except Exception as e:
# Clean up possibly created temporary files
if output_path and os.path.exists(output_path):
try:
os.unlink(output_path)
except OSError:
pass
raise RuntimeError(f"Audio denoising processing failed: {e}")