mirror of
https://github.com/OpenBMB/VoxCPM
synced 2025-12-12 11:58:11 +00:00
Compare commits
27 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dc6b6d1d1c | ||
|
|
cef6aefb3d | ||
|
|
1a46c5d1ad | ||
|
|
5257ec3dc5 | ||
|
|
bdd516b579 | ||
|
|
11568f0776 | ||
|
|
e5bcb735f0 | ||
|
|
1fa9e2ca02 | ||
|
|
10f48ba330 | ||
|
|
639b2272ab | ||
|
|
7e8f754ba1 | ||
|
|
032c7fe403 | ||
|
|
5390a47862 | ||
|
|
e7012f1a94 | ||
|
|
82332cfc99 | ||
|
|
605ac2d8e4 | ||
|
|
0fa8d894d1 | ||
|
|
776c0d19fb | ||
|
|
ed6e6b4dac | ||
|
|
e3108d4a12 | ||
|
|
59fe3f30a1 | ||
|
|
6f2fb45756 | ||
|
|
91128d823d | ||
|
|
436e8cd6e5 | ||
|
|
11574ae93d | ||
|
|
706403187e | ||
|
|
38a76704ee |
29
README.md
29
README.md
@@ -1,13 +1,20 @@
|
|||||||
## 🎙️ VoxCPM: Tokenizer-Free TTS for Context-Aware Speech Generation and True-to-Life Voice Cloning
|
## 🎙️ VoxCPM: Tokenizer-Free TTS for Context-Aware Speech Generation and True-to-Life Voice Cloning
|
||||||
|
|
||||||
|
|
||||||
[](https://github.com/OpenBMB/VoxCPM/) [](https://huggingface.co/openbmb/VoxCPM-0.5B) [](https://huggingface.co/spaces/OpenBMB/VoxCPM-Demo) [](https://thuhcsi.github.io/VoxCPM/)
|
[](https://github.com/OpenBMB/VoxCPM/) [](https://huggingface.co/openbmb/VoxCPM-0.5B) [](https://modelscope.cn/models/OpenBMB/VoxCPM-0.5B) [](https://huggingface.co/spaces/OpenBMB/VoxCPM-Demo) [](https://openbmb.github.io/VoxCPM-demopage)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<div align="center">
|
<div align="center">
|
||||||
<img src="assets/voxcpm_logo.png" alt="VoxCPM Logo" width="40%">
|
<img src="assets/voxcpm_logo.png" alt="VoxCPM Logo" width="40%">
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
<div align="center">
|
||||||
|
|
||||||
|
👋 Contact us on [WeChat](assets/wechat.png)
|
||||||
|
|
||||||
|
</div>
|
||||||
|
|
||||||
## News
|
## News
|
||||||
* [2025.09.16] 🔥 🔥 🔥 We Open Source the VoxCPM-0.5B [weights](https://huggingface.co/openbmb/VoxCPM-0.5B)!
|
* [2025.09.16] 🔥 🔥 🔥 We Open Source the VoxCPM-0.5B [weights](https://huggingface.co/openbmb/VoxCPM-0.5B)!
|
||||||
* [2025.09.16] 🎉 🎉 🎉 We Provide the [Gradio PlayGround](https://huggingface.co/spaces/OpenBMB/VoxCPM-Demo) for VoxCPM-0.5B, try it now!
|
* [2025.09.16] 🎉 🎉 🎉 We Provide the [Gradio PlayGround](https://huggingface.co/spaces/OpenBMB/VoxCPM-Demo) for VoxCPM-0.5B, try it now!
|
||||||
@@ -32,11 +39,6 @@ Unlike mainstream approaches that convert speech to discrete tokens, VoxCPM uses
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
## Quick Start
|
## Quick Start
|
||||||
|
|
||||||
### 🔧 Install from PyPI
|
### 🔧 Install from PyPI
|
||||||
@@ -87,10 +89,10 @@ After installation, the entry point is `voxcpm` (or use `python -m voxcpm.cli`).
|
|||||||
|
|
||||||
```bash
|
```bash
|
||||||
# 1) Direct synthesis (single text)
|
# 1) Direct synthesis (single text)
|
||||||
voxcpm --text "Hello VoxCPM" --output out.wav
|
voxcpm --text "VoxCPM is an innovative end-to-end TTS model from ModelBest, designed to generate highly expressive speech." --output out.wav
|
||||||
|
|
||||||
# 2) Voice cloning (reference audio + transcript)
|
# 2) Voice cloning (reference audio + transcript)
|
||||||
voxcpm --text "Hello" \
|
voxcpm --text "VoxCPM is an innovative end-to-end TTS model from ModelBest, designed to generate highly expressive speech." \
|
||||||
--prompt-audio path/to/voice.wav \
|
--prompt-audio path/to/voice.wav \
|
||||||
--prompt-text "reference transcript" \
|
--prompt-text "reference transcript" \
|
||||||
--output out.wav \
|
--output out.wav \
|
||||||
@@ -238,6 +240,13 @@ VoxCPM achieves competitive results on public zero-shot TTS benchmarks:
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## 📝TO-DO List
|
||||||
|
Please stay tuned for updates!
|
||||||
|
- [ ] Release the VoxCPM technical report.
|
||||||
|
- [ ] Support higher sampling rate (next version).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
## 📄 License
|
## 📄 License
|
||||||
The VoxCPM model weights and code are open-sourced under the [Apache-2.0](LICENSE) license.
|
The VoxCPM model weights and code are open-sourced under the [Apache-2.0](LICENSE) license.
|
||||||
|
|
||||||
@@ -258,10 +267,14 @@ This project is developed by the following institutions:
|
|||||||
- <img src="assets/thuhcsi_logo.png" width="28px"> [THUHCSI](https://github.com/thuhcsi)
|
- <img src="assets/thuhcsi_logo.png" width="28px"> [THUHCSI](https://github.com/thuhcsi)
|
||||||
|
|
||||||
|
|
||||||
|
## ⭐ Star History
|
||||||
|
[](https://star-history.com/#OpenBMB/VoxCPM&Date)
|
||||||
|
|
||||||
|
|
||||||
## 📚 Citation
|
## 📚 Citation
|
||||||
|
|
||||||
|
The techical report is coming soon, please wait for the release 😊
|
||||||
|
|
||||||
If you find our model helpful, please consider citing our projects 📝 and staring us ⭐️!
|
If you find our model helpful, please consider citing our projects 📝 and staring us ⭐️!
|
||||||
|
|
||||||
```bib
|
```bib
|
||||||
|
|||||||
11
app.py
11
app.py
@@ -170,7 +170,7 @@ def create_demo_interface(demo: VoxCPMDemo):
|
|||||||
|
|
||||||
# Pro Tips
|
# Pro Tips
|
||||||
with gr.Accordion("💡 Pro Tips |使用建议", open=False, elem_id="acc_tips"):
|
with gr.Accordion("💡 Pro Tips |使用建议", open=False, elem_id="acc_tips"):
|
||||||
gr.Markdown(f"""
|
gr.Markdown("""
|
||||||
### Prompt Speech Enhancement|参考语音降噪
|
### Prompt Speech Enhancement|参考语音降噪
|
||||||
- **Enable** to remove background noise for a clean, studio-like voice, with an external ZipEnhancer component.
|
- **Enable** to remove background noise for a clean, studio-like voice, with an external ZipEnhancer component.
|
||||||
**启用**:通过 ZipEnhancer 组件消除背景噪音,获得更好的音质。
|
**启用**:通过 ZipEnhancer 组件消除背景噪音,获得更好的音质。
|
||||||
@@ -194,10 +194,6 @@ def create_demo_interface(demo: VoxCPMDemo):
|
|||||||
**调低**:合成速度更快。
|
**调低**:合成速度更快。
|
||||||
- **Higher** for better synthesis quality.
|
- **Higher** for better synthesis quality.
|
||||||
**调高**:合成质量更佳。
|
**调高**:合成质量更佳。
|
||||||
|
|
||||||
### Long Text (e.g., >5 min speech)|长文本 (如 >5分钟的合成语音)
|
|
||||||
While VoxCPM can handle long texts directly, we recommend using empty lines to break very long content into paragraphs; the model will then synthesize each paragraph individually.
|
|
||||||
虽然 VoxCPM 支持直接生成长文本,但如果目标文本过长,我们建议使用换行符将内容分段;模型将对每个段落分别合成。
|
|
||||||
""")
|
""")
|
||||||
|
|
||||||
# Main controls
|
# Main controls
|
||||||
@@ -206,7 +202,7 @@ def create_demo_interface(demo: VoxCPMDemo):
|
|||||||
prompt_wav = gr.Audio(
|
prompt_wav = gr.Audio(
|
||||||
sources=["upload", 'microphone'],
|
sources=["upload", 'microphone'],
|
||||||
type="filepath",
|
type="filepath",
|
||||||
label="Prompt Speech",
|
label="Prompt Speech (Optional, or let VoxCPM improvise)",
|
||||||
value="./examples/example.wav",
|
value="./examples/example.wav",
|
||||||
)
|
)
|
||||||
DoDenoisePromptAudio = gr.Checkbox(
|
DoDenoisePromptAudio = gr.Checkbox(
|
||||||
@@ -244,14 +240,13 @@ def create_demo_interface(demo: VoxCPMDemo):
|
|||||||
text = gr.Textbox(
|
text = gr.Textbox(
|
||||||
value="VoxCPM is an innovative end-to-end TTS model from ModelBest, designed to generate highly realistic speech.",
|
value="VoxCPM is an innovative end-to-end TTS model from ModelBest, designed to generate highly realistic speech.",
|
||||||
label="Target Text",
|
label="Target Text",
|
||||||
info="Default processing splits text on \\n into paragraphs; each is synthesized as a chunk and then concatenated into the final audio."
|
|
||||||
)
|
)
|
||||||
with gr.Row():
|
with gr.Row():
|
||||||
DoNormalizeText = gr.Checkbox(
|
DoNormalizeText = gr.Checkbox(
|
||||||
value=False,
|
value=False,
|
||||||
label="Text Normalization",
|
label="Text Normalization",
|
||||||
elem_id="chk_normalize",
|
elem_id="chk_normalize",
|
||||||
info="We use WeTextPorcessing library to normalize the input text."
|
info="We use wetext library to normalize the input text."
|
||||||
)
|
)
|
||||||
audio_output = gr.Audio(label="Output Audio")
|
audio_output = gr.Audio(label="Output Audio")
|
||||||
|
|
||||||
|
|||||||
BIN
assets/wechat.png
Normal file
BIN
assets/wechat.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 9.5 KiB |
@@ -34,11 +34,14 @@ dependencies = [
|
|||||||
"gradio",
|
"gradio",
|
||||||
"inflect",
|
"inflect",
|
||||||
"addict",
|
"addict",
|
||||||
"WeTextProcessing",
|
"wetext",
|
||||||
"modelscope",
|
"modelscope>=1.22.0",
|
||||||
|
"datasets>=3,<4",
|
||||||
"huggingface-hub",
|
"huggingface-hub",
|
||||||
"pydantic",
|
"pydantic",
|
||||||
"tqdm",
|
"tqdm",
|
||||||
|
"simplejson",
|
||||||
|
"sortedcontainers",
|
||||||
"soundfile",
|
"soundfile",
|
||||||
"funasr",
|
"funasr",
|
||||||
"spaces"
|
"spaces"
|
||||||
|
|||||||
@@ -1,13 +1,10 @@
|
|||||||
import torch
|
import torch
|
||||||
import torchaudio
|
import torchaudio
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
import tempfile
|
import tempfile
|
||||||
from modelscope.pipelines import pipeline
|
|
||||||
from modelscope.utils.constant import Tasks
|
|
||||||
from huggingface_hub import snapshot_download
|
from huggingface_hub import snapshot_download
|
||||||
from .model.voxcpm import VoxCPMModel
|
from .model.voxcpm import VoxCPMModel
|
||||||
from .utils.text_normalize import TextNormalizer
|
|
||||||
|
|
||||||
|
|
||||||
class VoxCPM:
|
class VoxCPM:
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
@@ -27,16 +24,16 @@ class VoxCPM:
|
|||||||
"""
|
"""
|
||||||
print(f"voxcpm_model_path: {voxcpm_model_path}, zipenhancer_model_path: {zipenhancer_model_path}, enable_denoiser: {enable_denoiser}")
|
print(f"voxcpm_model_path: {voxcpm_model_path}, zipenhancer_model_path: {zipenhancer_model_path}, enable_denoiser: {enable_denoiser}")
|
||||||
self.tts_model = VoxCPMModel.from_local(voxcpm_model_path)
|
self.tts_model = VoxCPMModel.from_local(voxcpm_model_path)
|
||||||
self.text_normalizer = TextNormalizer()
|
self.text_normalizer = None
|
||||||
if enable_denoiser and zipenhancer_model_path is not None:
|
if enable_denoiser and zipenhancer_model_path is not None:
|
||||||
self.denoiser = pipeline(
|
from .zipenhancer import ZipEnhancer
|
||||||
Tasks.acoustic_noise_suppression,
|
self.denoiser = ZipEnhancer(zipenhancer_model_path)
|
||||||
model=zipenhancer_model_path)
|
|
||||||
else:
|
else:
|
||||||
self.denoiser = None
|
self.denoiser = None
|
||||||
print("Warm up VoxCPMModel...")
|
print("Warm up VoxCPMModel...")
|
||||||
self.tts_model.generate(
|
self.tts_model.generate(
|
||||||
target_text="Hello, this is the first test sentence."
|
target_text="Hello, this is the first test sentence.",
|
||||||
|
max_len=10,
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@@ -50,7 +47,7 @@ class VoxCPM:
|
|||||||
"""Instantiate ``VoxCPM`` from a Hugging Face Hub snapshot.
|
"""Instantiate ``VoxCPM`` from a Hugging Face Hub snapshot.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
hf_model_id: Explicit Hugging Face repository id (e.g. "org/repo").
|
hf_model_id: Explicit Hugging Face repository id (e.g. "org/repo") or local path.
|
||||||
load_denoiser: Whether to initialize the denoiser pipeline.
|
load_denoiser: Whether to initialize the denoiser pipeline.
|
||||||
zipenhancer_model_id: Denoiser model id or path for ModelScope
|
zipenhancer_model_id: Denoiser model id or path for ModelScope
|
||||||
acoustic noise suppression.
|
acoustic noise suppression.
|
||||||
@@ -67,14 +64,19 @@ class VoxCPM:
|
|||||||
``hf_model_id`` is provided.
|
``hf_model_id`` is provided.
|
||||||
"""
|
"""
|
||||||
repo_id = hf_model_id
|
repo_id = hf_model_id
|
||||||
if not repo_id or repo_id.strip() == "":
|
if not repo_id:
|
||||||
raise ValueError("You must provide a valid hf_model_id")
|
raise ValueError("You must provide hf_model_id")
|
||||||
|
|
||||||
local_path = snapshot_download(
|
# Load from local path if provided
|
||||||
repo_id=repo_id,
|
if os.path.isdir(repo_id):
|
||||||
cache_dir=cache_dir,
|
local_path = repo_id
|
||||||
local_files_only=local_files_only,
|
else:
|
||||||
)
|
# Otherwise, try from_pretrained (Hub); exit on failure
|
||||||
|
local_path = snapshot_download(
|
||||||
|
repo_id=repo_id,
|
||||||
|
cache_dir=cache_dir,
|
||||||
|
local_files_only=local_files_only,
|
||||||
|
)
|
||||||
|
|
||||||
return cls(
|
return cls(
|
||||||
voxcpm_model_path=local_path,
|
voxcpm_model_path=local_path,
|
||||||
@@ -82,12 +84,6 @@ class VoxCPM:
|
|||||||
enable_denoiser=load_denoiser,
|
enable_denoiser=load_denoiser,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _normalize_loudness(self, wav_path: str):
|
|
||||||
audio, sr = torchaudio.load(wav_path)
|
|
||||||
loudness = torchaudio.functional.loudness(audio, sr)
|
|
||||||
normalized_audio = torchaudio.functional.gain(audio, -20-loudness)
|
|
||||||
torchaudio.save(wav_path, normalized_audio, sr)
|
|
||||||
|
|
||||||
def generate(self,
|
def generate(self,
|
||||||
text : str,
|
text : str,
|
||||||
prompt_wav_path : str = None,
|
prompt_wav_path : str = None,
|
||||||
@@ -125,9 +121,18 @@ class VoxCPM:
|
|||||||
Returns:
|
Returns:
|
||||||
numpy.ndarray: 1D waveform array (float32) on CPU.
|
numpy.ndarray: 1D waveform array (float32) on CPU.
|
||||||
"""
|
"""
|
||||||
texts = text.split("\n")
|
if not text.strip() or not isinstance(text, str):
|
||||||
texts = [t.strip() for t in texts if t.strip()]
|
raise ValueError("target text must be a non-empty string")
|
||||||
final_wav = []
|
|
||||||
|
if prompt_wav_path is not None:
|
||||||
|
if not os.path.exists(prompt_wav_path):
|
||||||
|
raise FileNotFoundError(f"prompt_wav_path does not exist: {prompt_wav_path}")
|
||||||
|
|
||||||
|
if (prompt_wav_path is None) != (prompt_text is None):
|
||||||
|
raise ValueError("prompt_wav_path and prompt_text must both be provided or both be None")
|
||||||
|
|
||||||
|
text = text.replace("\n", " ")
|
||||||
|
text = re.sub(r'\s+', ' ', text)
|
||||||
temp_prompt_wav_path = None
|
temp_prompt_wav_path = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -135,9 +140,7 @@ class VoxCPM:
|
|||||||
if denoise and self.denoiser is not None:
|
if denoise and self.denoiser is not None:
|
||||||
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file:
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file:
|
||||||
temp_prompt_wav_path = tmp_file.name
|
temp_prompt_wav_path = tmp_file.name
|
||||||
|
self.denoiser.enhance(prompt_wav_path, output_path=temp_prompt_wav_path)
|
||||||
self.denoiser(prompt_wav_path, output_path=temp_prompt_wav_path)
|
|
||||||
self._normalize_loudness(temp_prompt_wav_path)
|
|
||||||
prompt_wav_path = temp_prompt_wav_path
|
prompt_wav_path = temp_prompt_wav_path
|
||||||
fixed_prompt_cache = self.tts_model.build_prompt_cache(
|
fixed_prompt_cache = self.tts_model.build_prompt_cache(
|
||||||
prompt_wav_path=prompt_wav_path,
|
prompt_wav_path=prompt_wav_path,
|
||||||
@@ -146,32 +149,25 @@ class VoxCPM:
|
|||||||
else:
|
else:
|
||||||
fixed_prompt_cache = None # will be built from the first inference
|
fixed_prompt_cache = None # will be built from the first inference
|
||||||
|
|
||||||
for sub_text in texts:
|
if normalize:
|
||||||
if sub_text.strip() == "":
|
if self.text_normalizer is None:
|
||||||
continue
|
from .utils.text_normalize import TextNormalizer
|
||||||
print("sub_text:", sub_text)
|
self.text_normalizer = TextNormalizer()
|
||||||
if normalize:
|
text = self.text_normalizer.normalize(text)
|
||||||
sub_text = self.text_normalizer.normalize(sub_text)
|
|
||||||
wav, target_text_token, generated_audio_feat = self.tts_model.generate_with_prompt_cache(
|
|
||||||
target_text=sub_text,
|
|
||||||
prompt_cache=fixed_prompt_cache,
|
|
||||||
min_len=2,
|
|
||||||
max_len=max_length,
|
|
||||||
inference_timesteps=inference_timesteps,
|
|
||||||
cfg_value=cfg_value,
|
|
||||||
retry_badcase=retry_badcase,
|
|
||||||
retry_badcase_max_times=retry_badcase_max_times,
|
|
||||||
retry_badcase_ratio_threshold=retry_badcase_ratio_threshold,
|
|
||||||
)
|
|
||||||
if fixed_prompt_cache is None:
|
|
||||||
fixed_prompt_cache = self.tts_model.merge_prompt_cache(
|
|
||||||
original_cache=None,
|
|
||||||
new_text_token=target_text_token,
|
|
||||||
new_audio_feat=generated_audio_feat
|
|
||||||
)
|
|
||||||
final_wav.append(wav)
|
|
||||||
|
|
||||||
return torch.cat(final_wav, dim=1).squeeze(0).cpu().numpy()
|
wav, target_text_token, generated_audio_feat = self.tts_model.generate_with_prompt_cache(
|
||||||
|
target_text=text,
|
||||||
|
prompt_cache=fixed_prompt_cache,
|
||||||
|
min_len=2,
|
||||||
|
max_len=max_length,
|
||||||
|
inference_timesteps=inference_timesteps,
|
||||||
|
cfg_value=cfg_value,
|
||||||
|
retry_badcase=retry_badcase,
|
||||||
|
retry_badcase_max_times=retry_badcase_max_times,
|
||||||
|
retry_badcase_ratio_threshold=retry_badcase_ratio_threshold,
|
||||||
|
)
|
||||||
|
|
||||||
|
return wav.squeeze(0).cpu().numpy()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if temp_prompt_wav_path and os.path.exists(temp_prompt_wav_path):
|
if temp_prompt_wav_path and os.path.exists(temp_prompt_wav_path):
|
||||||
|
|||||||
@@ -148,12 +148,20 @@ class VoxCPMModel(nn.Module):
|
|||||||
|
|
||||||
|
|
||||||
def optimize(self):
|
def optimize(self):
|
||||||
if self.device == "cuda":
|
try:
|
||||||
|
if self.device != "cuda":
|
||||||
|
raise ValueError("VoxCPMModel can only be optimized on CUDA device")
|
||||||
|
try:
|
||||||
|
import triton
|
||||||
|
except:
|
||||||
|
raise ValueError("triton is not installed")
|
||||||
self.base_lm.forward_step = torch.compile(self.base_lm.forward_step, mode="reduce-overhead", fullgraph=True)
|
self.base_lm.forward_step = torch.compile(self.base_lm.forward_step, mode="reduce-overhead", fullgraph=True)
|
||||||
self.residual_lm.forward_step = torch.compile(self.residual_lm.forward_step, mode="reduce-overhead", fullgraph=True)
|
self.residual_lm.forward_step = torch.compile(self.residual_lm.forward_step, mode="reduce-overhead", fullgraph=True)
|
||||||
self.feat_encoder_step = torch.compile(self.feat_encoder, mode="reduce-overhead", fullgraph=True)
|
self.feat_encoder_step = torch.compile(self.feat_encoder, mode="reduce-overhead", fullgraph=True)
|
||||||
self.feat_decoder.estimator = torch.compile(self.feat_decoder.estimator, mode="reduce-overhead", fullgraph=True)
|
self.feat_decoder.estimator = torch.compile(self.feat_decoder.estimator, mode="reduce-overhead", fullgraph=True)
|
||||||
else:
|
except Exception as e:
|
||||||
|
print(f"Error: {e}")
|
||||||
|
print("Warning: VoxCPMModel can not be optimized by torch.compile, using original forward_step functions")
|
||||||
self.base_lm.forward_step = self.base_lm.forward_step
|
self.base_lm.forward_step = self.base_lm.forward_step
|
||||||
self.residual_lm.forward_step = self.residual_lm.forward_step
|
self.residual_lm.forward_step = self.residual_lm.forward_step
|
||||||
self.feat_encoder_step = self.feat_encoder
|
self.feat_encoder_step = self.feat_encoder
|
||||||
@@ -276,7 +284,10 @@ class VoxCPMModel(nn.Module):
|
|||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
return self.audio_vae.decode(latent_pred.to(torch.float32)).squeeze(1).cpu()
|
|
||||||
|
decode_audio = self.audio_vae.decode(latent_pred.to(torch.float32)).squeeze(1).cpu()
|
||||||
|
decode_audio = decode_audio[..., 640:-640] # trick: trim the start and end of the audio
|
||||||
|
return decode_audio
|
||||||
|
|
||||||
@torch.inference_mode()
|
@torch.inference_mode()
|
||||||
def build_prompt_cache(
|
def build_prompt_cache(
|
||||||
@@ -314,7 +325,7 @@ class VoxCPMModel(nn.Module):
|
|||||||
audio = torch.nn.functional.pad(audio, (0, patch_len - audio.size(1) % patch_len))
|
audio = torch.nn.functional.pad(audio, (0, patch_len - audio.size(1) % patch_len))
|
||||||
|
|
||||||
# extract audio features
|
# extract audio features
|
||||||
audio_feat = self.audio_vae.encode(audio.cuda(), self.sample_rate).cpu()
|
audio_feat = self.audio_vae.encode(audio.to(self.device), self.sample_rate).cpu()
|
||||||
|
|
||||||
audio_feat = audio_feat.view(
|
audio_feat = audio_feat.view(
|
||||||
self.audio_vae.latent_dim,
|
self.audio_vae.latent_dim,
|
||||||
@@ -460,6 +471,7 @@ class VoxCPMModel(nn.Module):
|
|||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
decode_audio = self.audio_vae.decode(latent_pred.to(torch.float32)).squeeze(1).cpu()
|
decode_audio = self.audio_vae.decode(latent_pred.to(torch.float32)).squeeze(1).cpu()
|
||||||
|
decode_audio = decode_audio[..., 640:-640] # trick: trim the start and end of the audio
|
||||||
|
|
||||||
return (
|
return (
|
||||||
decode_audio,
|
decode_audio,
|
||||||
@@ -572,7 +584,6 @@ class VoxCPMModel(nn.Module):
|
|||||||
pred_feat_seq = torch.cat(pred_feat_seq, dim=1) # b, t, p, d
|
pred_feat_seq = torch.cat(pred_feat_seq, dim=1) # b, t, p, d
|
||||||
|
|
||||||
feat_pred = rearrange(pred_feat_seq, "b t p d -> b d (t p)", b=B, p=self.patch_size)
|
feat_pred = rearrange(pred_feat_seq, "b t p d -> b d (t p)", b=B, p=self.patch_size)
|
||||||
feat_pred = feat_pred[..., 1:-1] # trick: remove the first and last token
|
|
||||||
return feat_pred, pred_feat_seq.squeeze(0).cpu()
|
return feat_pred, pred_feat_seq.squeeze(0).cpu()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
@@ -3,40 +3,7 @@ import re
|
|||||||
import regex
|
import regex
|
||||||
import inflect
|
import inflect
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from tn.chinese.normalizer import Normalizer as ZhNormalizer
|
from wetext import Normalizer
|
||||||
from tn.english.normalizer import Normalizer as EnNormalizer
|
|
||||||
|
|
||||||
def normal_cut_sentence(text):
|
|
||||||
# 先处理括号内的逗号,将其替换为特殊标记
|
|
||||||
text = re.sub(r'([((][^))]*)([,,])([^))]*[))])', r'\1&&&\3', text)
|
|
||||||
text = re.sub('([。!,?\?])([^’”])',r'\1\n\2',text)#普通断句符号且后面没有引号
|
|
||||||
text = re.sub('(\.{6})([^’”])',r'\1\n\2',text)#英文省略号且后面没有引号
|
|
||||||
text = re.sub('(\…{2})([^’”])',r'\1\n\2',text)#中文省略号且后面没有引号
|
|
||||||
text = re.sub('([. ,。!;?\?\.{6}\…{2}][’”])([^’”])',r'\1\n\2',text)#断句号+引号且后面没有引号
|
|
||||||
# 处理英文句子的分隔
|
|
||||||
text = re.sub(r'([.,!?])([^’”\'"])', r'\1\n\2', text) # 句号、感叹号、问号后面没有引号
|
|
||||||
text = re.sub(r'([.!?][’”\'"])([^’”\'"])', r'\1\n\2', text) # 句号、感叹号、问号加引号后面的部分
|
|
||||||
text = re.sub(r'([((][^))]*)(&&&)([^))]*[))])', r'\1,\3', text)
|
|
||||||
text = [t for t in text.split("\n") if t]
|
|
||||||
return text
|
|
||||||
|
|
||||||
|
|
||||||
def cut_sentence_with_fix_length(text : str, length : int):
|
|
||||||
sentences = normal_cut_sentence(text)
|
|
||||||
cur_length = 0
|
|
||||||
res = ""
|
|
||||||
for sentence in sentences:
|
|
||||||
if not sentence:
|
|
||||||
continue
|
|
||||||
if cur_length > length or cur_length + len(sentence) > length:
|
|
||||||
yield res
|
|
||||||
res = ""
|
|
||||||
cur_length = 0
|
|
||||||
res += sentence
|
|
||||||
cur_length += len(sentence)
|
|
||||||
if res:
|
|
||||||
yield res
|
|
||||||
|
|
||||||
|
|
||||||
chinese_char_pattern = re.compile(r'[\u4e00-\u9fff]+')
|
chinese_char_pattern = re.compile(r'[\u4e00-\u9fff]+')
|
||||||
|
|
||||||
@@ -195,8 +162,8 @@ def clean_text(text):
|
|||||||
class TextNormalizer:
|
class TextNormalizer:
|
||||||
def __init__(self, tokenizer=None):
|
def __init__(self, tokenizer=None):
|
||||||
self.tokenizer = tokenizer
|
self.tokenizer = tokenizer
|
||||||
self.zh_tn_model = ZhNormalizer(remove_erhua=False, full_to_half=False, remove_interjections=False, overwrite_cache=True)
|
self.zh_tn_model = Normalizer(lang="zh", operator="tn", remove_erhua=True)
|
||||||
self.en_tn_model = EnNormalizer()
|
self.en_tn_model = Normalizer(lang="en", operator="tn")
|
||||||
self.inflect_parser = inflect.engine()
|
self.inflect_parser = inflect.engine()
|
||||||
|
|
||||||
def normalize(self, text, split=False):
|
def normalize(self, text, split=False):
|
||||||
@@ -207,38 +174,12 @@ class TextNormalizer:
|
|||||||
text = text.replace("=", "等于") # 修复 ”550 + 320 等于 870 千卡。“ 被错误正则为 ”五百五十加三百二十等于八七十千卡.“
|
text = text.replace("=", "等于") # 修复 ”550 + 320 等于 870 千卡。“ 被错误正则为 ”五百五十加三百二十等于八七十千卡.“
|
||||||
if re.search(r'([\d$%^*_+≥≤≠×÷?=])', text): # 避免 英文连字符被错误正则为减
|
if re.search(r'([\d$%^*_+≥≤≠×÷?=])', text): # 避免 英文连字符被错误正则为减
|
||||||
text = re.sub(r'(?<=[a-zA-Z0-9])-(?=\d)', ' - ', text) # 修复 x-2 被正则为 x负2
|
text = re.sub(r'(?<=[a-zA-Z0-9])-(?=\d)', ' - ', text) # 修复 x-2 被正则为 x负2
|
||||||
text = self.zh_tn_model.normalize(text)
|
|
||||||
text = re.sub(r'(?<=[a-zA-Z0-9])-(?=\d)', ' - ', text) # 修复 x-2 被正则为 x负2
|
|
||||||
text = self.zh_tn_model.normalize(text)
|
text = self.zh_tn_model.normalize(text)
|
||||||
text = replace_blank(text)
|
text = replace_blank(text)
|
||||||
text = replace_corner_mark(text)
|
text = replace_corner_mark(text)
|
||||||
text = remove_bracket(text)
|
text = remove_bracket(text)
|
||||||
text = re.sub(r'[,,]+$', '。', text)
|
|
||||||
else:
|
else:
|
||||||
text = self.en_tn_model.normalize(text)
|
text = self.en_tn_model.normalize(text)
|
||||||
text = spell_out_number(text, self.inflect_parser)
|
text = spell_out_number(text, self.inflect_parser)
|
||||||
if split is False:
|
if split is False:
|
||||||
return text
|
return text
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
text_normalizer = TextNormalizer()
|
|
||||||
text = r"""今天我们学习一元二次方程。一元二次方程的标准形式是:
|
|
||||||
ax2+bx+c=0ax^2 + bx + c = 0ax2+bx+c=0
|
|
||||||
其中,aaa、bbb 和 ccc 是常数,xxx 是变量。这个方程的解可以通过求根公式来找到。
|
|
||||||
一元二次方程的解法有几种:
|
|
||||||
- 因式分解法:通过将方程因式分解来求解。我们首先尝试将方程表达成两个括号的形式,解决方程的解。比如,方程x2−5x+6=0x^2 - 5x + 6 = 0x2−5x+6=0可以因式分解为(x−2)(x−3)=0(x - 2)(x - 3) = 0(x−2)(x−3)=0,因此根为2和3。
|
|
||||||
- 配方法:通过配方将方程转化为完全平方的形式,从而解出。我们通过加上或减去适当的常数来完成这一过程,使得方程可以直接写成一个完全平方的形式。
|
|
||||||
- 求根公式:我们可以使用求根公式直接求出方程的解。这个公式适用于所有的一元二次方程,即使我们无法通过因式分解或配方法来解决时,也能使用该公式。
|
|
||||||
公式:x=−b±b2−4ac2ax = \frac{-b \pm \sqrt{b^2 - 4ac}}{2a}x=2a−b±b2−4ac这个公式可以帮助我们求解任何一元二次方程的根。
|
|
||||||
对于一元二次方程,我们需要了解判别式。判别式的作用是帮助我们判断方程的解的个数和性质。判别式 Δ\DeltaΔ 由下式给出:Δ=b2−4ac\Delta = b^2 - 4acΔ=b2−4ac 根据判别式的值,我们可以知道:
|
|
||||||
- 如果 Δ>0\Delta > 0Δ>0,方程有两个不相等的实数解。这是因为判别式大于0时,根号内的值是正数,所以我们可以得到两个不同的解。
|
|
||||||
- 如果 Δ=0\Delta = 0Δ=0,方程有一个实数解。这是因为根号内的值为零,导致两个解相等,也就是说方程有一个解。
|
|
||||||
- 如果 Δ<0\Delta < 0Δ<0,方程没有实数解。这意味着根号内的值是负数,无法进行实数运算,因此方程没有实数解,可能有复数解。"""
|
|
||||||
texts = ["这是一个公式 (a+b)³=a³+3a²b+3ab²+b³ S=(a×b)÷2", "这样的发展为AI仅仅作为“工具”这一观点提出了新的挑战,", "550 + 320 = 870千卡。", "解一元二次方程:3x^2+x-2=0", "你好啊"]
|
|
||||||
texts = [text]
|
|
||||||
for text in texts:
|
|
||||||
text = text_normalizer.normalize(text)
|
|
||||||
print(text)
|
|
||||||
for t in cut_sentence_with_fix_length(text, 15):
|
|
||||||
print(t)
|
|
||||||
76
src/voxcpm/zipenhancer.py
Normal file
76
src/voxcpm/zipenhancer.py
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
"""
|
||||||
|
ZipEnhancer Module - Audio Denoising Enhancer
|
||||||
|
|
||||||
|
Provides on-demand import ZipEnhancer functionality for audio denoising processing.
|
||||||
|
Related dependencies are imported only when denoising functionality is needed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from typing import Optional, Union
|
||||||
|
import torchaudio
|
||||||
|
import torch
|
||||||
|
from modelscope.pipelines import pipeline
|
||||||
|
from modelscope.utils.constant import Tasks
|
||||||
|
|
||||||
|
|
||||||
|
class ZipEnhancer:
|
||||||
|
"""ZipEnhancer Audio Denoising Enhancer"""
|
||||||
|
def __init__(self, model_path: str = "iic/speech_zipenhancer_ans_multiloss_16k_base"):
|
||||||
|
"""
|
||||||
|
Initialize ZipEnhancer
|
||||||
|
Args:
|
||||||
|
model_path: ModelScope model path or local path
|
||||||
|
"""
|
||||||
|
self.model_path = model_path
|
||||||
|
self._pipeline = pipeline(
|
||||||
|
Tasks.acoustic_noise_suppression,
|
||||||
|
model=self.model_path
|
||||||
|
)
|
||||||
|
|
||||||
|
def _normalize_loudness(self, wav_path: str):
|
||||||
|
"""
|
||||||
|
Audio loudness normalization
|
||||||
|
|
||||||
|
Args:
|
||||||
|
wav_path: Audio file path
|
||||||
|
"""
|
||||||
|
audio, sr = torchaudio.load(wav_path)
|
||||||
|
loudness = torchaudio.functional.loudness(audio, sr)
|
||||||
|
normalized_audio = torchaudio.functional.gain(audio, -20-loudness)
|
||||||
|
torchaudio.save(wav_path, normalized_audio, sr)
|
||||||
|
|
||||||
|
def enhance(self, input_path: str, output_path: Optional[str] = None,
|
||||||
|
normalize_loudness: bool = True) -> str:
|
||||||
|
"""
|
||||||
|
Audio denoising enhancement
|
||||||
|
Args:
|
||||||
|
input_path: Input audio file path
|
||||||
|
output_path: Output audio file path (optional, creates temp file by default)
|
||||||
|
normalize_loudness: Whether to perform loudness normalization
|
||||||
|
Returns:
|
||||||
|
str: Output audio file path
|
||||||
|
Raises:
|
||||||
|
RuntimeError: If pipeline is not initialized or processing fails
|
||||||
|
"""
|
||||||
|
if not os.path.exists(input_path):
|
||||||
|
raise FileNotFoundError(f"Input audio file does not exist: {input_path}")
|
||||||
|
# Create temporary file if no output path is specified
|
||||||
|
if output_path is None:
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file:
|
||||||
|
output_path = tmp_file.name
|
||||||
|
try:
|
||||||
|
# Perform denoising processing
|
||||||
|
self._pipeline(input_path, output_path=output_path)
|
||||||
|
# Loudness normalization
|
||||||
|
if normalize_loudness:
|
||||||
|
self._normalize_loudness(output_path)
|
||||||
|
return output_path
|
||||||
|
except Exception as e:
|
||||||
|
# Clean up possibly created temporary files
|
||||||
|
if output_path and os.path.exists(output_path):
|
||||||
|
try:
|
||||||
|
os.unlink(output_path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
raise RuntimeError(f"Audio denoising processing failed: {e}")
|
||||||
Reference in New Issue
Block a user