Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,12 @@ Thumbs.db
# Milvus
**/volumes/

**/rag_storage/
**/rag_storage/

# Local operator runtime dependencies are kept outside tracked operator packages.
../DataMate_additional_dependency/

# Keep mapper operator package structure aligned with the repository.
runtime/ops/mapper/**/local_libs/
runtime/ops/mapper/**/bin/
runtime/ops/mapper/**/lib/
24 changes: 24 additions & 0 deletions runtime/datamate-python/app/module/operator/parsers/tar_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"""
import tarfile
import os
import shutil
from pathlib import Path
from typing import Optional

from app.module.operator.parsers.abstract_parser import AbstractParser
Expand All @@ -13,6 +15,25 @@
class TarParser(AbstractParser):
"""TAR 压缩包解析器"""

@staticmethod
def _flatten_single_package_dir(target_dir: str) -> None:
target = Path(target_dir)
required = ("__init__.py", "metadata.yml", "process.py")
if all((target / name).exists() for name in required):
return

children = [item for item in target.iterdir() if item.name != "__MACOSX"]
if len(children) != 1 or not children[0].is_dir():
return

package_dir = children[0]
if not all((package_dir / name).exists() for name in required):
return

for item in package_dir.iterdir():
shutil.move(str(item), str(target / item.name))
shutil.rmtree(package_dir, ignore_errors=True)

def parse_yaml_from_archive(
self,
archive_path: str,
Expand All @@ -36,12 +57,15 @@ def parse_yaml_from_archive(
def extract_to(self, archive_path: str, target_dir: str) -> None:
"""解压 TAR 文件到目标目录"""
try:
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
os.makedirs(target_dir, exist_ok=True)
with tarfile.open(archive_path, 'r:*') as tar:
# Safety check: prevent path traversal
for member in tar.getmembers():
if os.path.isabs(member.name) or ".." in member.name.split("/"):
raise ValueError(f"Unsafe path in archive: {member.name}")
tar.extractall(target_dir)
self._flatten_single_package_dir(target_dir)
except (tarfile.TarError, EOFError) as e:
raise ValueError(f"Failed to extract TAR file: {e}")
24 changes: 24 additions & 0 deletions runtime/datamate-python/app/module/operator/parsers/zip_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
"""
import zipfile
import os
import shutil
from pathlib import Path
from typing import Optional

from app.module.operator.parsers.abstract_parser import AbstractParser
Expand All @@ -13,6 +15,25 @@
class ZipParser(AbstractParser):
"""ZIP 压缩包解析器"""

@staticmethod
def _flatten_single_package_dir(target_dir: str) -> None:
target = Path(target_dir)
required = ("__init__.py", "metadata.yml", "process.py")
if all((target / name).exists() for name in required):
return

children = [item for item in target.iterdir() if item.name != "__MACOSX"]
if len(children) != 1 or not children[0].is_dir():
return

package_dir = children[0]
if not all((package_dir / name).exists() for name in required):
return

for item in package_dir.iterdir():
shutil.move(str(item), str(target / item.name))
shutil.rmtree(package_dir, ignore_errors=True)

def parse_yaml_from_archive(
self,
archive_path: str,
Expand All @@ -35,12 +56,15 @@ def parse_yaml_from_archive(
def extract_to(self, archive_path: str, target_dir: str) -> None:
"""解压 ZIP 文件到目标目录"""
try:
if os.path.exists(target_dir):
shutil.rmtree(target_dir)
os.makedirs(target_dir, exist_ok=True)
with zipfile.ZipFile(archive_path, 'r') as zf:
# Safety check: prevent path traversal
for name in zf.namelist():
if os.path.isabs(name) or ".." in name.split("/"):
raise ValueError(f"Unsafe path in archive: {name}")
zf.extractall(target_dir)
self._flatten_single_package_dir(target_dir)
except (zipfile.BadZipFile, zipfile.LargeZipFile) as e:
raise ValueError(f"Failed to extract ZIP file: {e}")
30 changes: 28 additions & 2 deletions runtime/ops/mapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,30 @@ def _import_operators():
from . import remove_duplicate_sentences
from . import knowledge_relation_slice
from . import pii_ner_detection
# ===== Video operators (PR1-PR5) =====

# ===== Audio operators =====
from . import audio_anomaly_filter
from . import audio_asr_pipeline
from . import audio_asr_transcribe
from . import audio_dc_offset_removal
from . import audio_emotion_recognize
from . import audio_fast_lang_id
from . import audio_fast_lang_id_text
from . import audio_format_convert
from . import audio_gtcrn_denoise
from . import audio_hum_notch
from . import audio_noise_gate
from . import audio_pre_emphasis
from . import audio_quantize_encode
from . import audio_rms_loudness_normalize
from . import audio_simple_agc
from . import audio_soft_peak_limiter
from . import audio_sound_classify
from . import audio_telephony_bandpass
from . import audio_text_summarize
from . import audio_trim_silence_edges

# ===== Video operators (PR1-PR5) =====
from . import _video_common
from . import video_format_convert
from . import video_sensitive_detect
Expand All @@ -60,8 +83,11 @@ def _import_operators():
from . import video_keyframe_extract
from . import video_deborder_crop
from . import video_audio_extract
from . import video_speech_asr
from . import video_subtitle_ocr
from . import video_text_ocr
try:
from . import video_speech_asr
except ImportError:
pass

_import_operators()
63 changes: 63 additions & 0 deletions runtime/ops/mapper/audio_anomaly_filter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# AudioAnomalyFilter 异常语音检测与过滤算子

## 概述

AudioAnomalyFilter 用于对音频做快速质量检测,计算时长、静音帧比例与音频可读性,并给出 `quality_flag`。异常音频会写入输出数据集的 `references/anomaly_report.jsonl`,默认不再输出异常音频;如开启保留开关,则异常音频按原文件名继续输出。

## 功能特性

- **时长检测**:支持最小时长/最大时长阈值
- **静音比例检测**:基于短时 RMS 统计静音帧占比
- **可读性检测**:文本文件强行改成 `.wav` 等不可读取音频会被标记为 `invalid`
- **异常清单**:异常音频以 JSONL 形式写入 `references/anomaly_report.jsonl`
- **过滤开关**:默认过滤异常音频;开启后保留异常音频,文件名不变
- **结果结构化输出**:报告写入 `ext_params.audio_quality`

## 参数说明

| 参数 | 类型 | 默认值 | 说明 |
|---|---|---:|---|
| minDur | inputNumber | 1.0 | 最小时长(秒),小于该值视为异常 |
| maxDur | inputNumber | 20000.0 | 最大时长(秒),大于该值视为异常 |
| silenceRatioTh | slider | 0.8 | 静音帧比例阈值(0~1),>= 阈值视为异常 |
| silenceRmsRatioTh | slider | 0.05 | 静音判定阈值 = global_rms * 该比例 |
| keepInvalidAudio | switch | false | 是否保留异常音频输出;关闭时只写清单,不输出异常音频 |
| anomalyReportPath | input | /dataset/{dataset_id}/references/anomaly_report.jsonl | 异常清单 JSONL 路径;默认优先写入当前输出数据集的 references |

## 输入输出

- **输入**:`sample["filePath"]`(音频文件路径)
- **输出**:
- `sample["ext_params"]["audio_quality"]`:
- `quality_flag`: `ok/invalid`
- `duration/silence_ratio/global_rms/reason/read_error/keep_invalid_audio/anomaly_report_file`
- 正常音频:按原文件名输出
- 异常音频:写入异常清单;默认不输出音频,开启 `keepInvalidAudio` 后按原文件名输出

## 异常清单格式

默认路径:

```text
/dataset/{dataset_id}/references/anomaly_report.jsonl
```

在 DataMate 任务有 `export_path` 时,会优先写入本次输出数据集:

```text
<export_path>/references/anomaly_report.jsonl
```

每行一个 JSON 对象,例如:

```json
{"file":"bad.wav","fileName":"bad.wav","key":"bad","reason":"unreadable_audio,duration_le_zero,too_much_silence","read_error":"RuntimeError: failed to read audio: ...","duration":0.0,"silence_ratio":1.0,"global_rms":0.0,"keep_invalid_audio":false}
```

## 依赖说明

- **Python 依赖**:优先 `torchaudio`,兜底 `soundfile`

## 版本历史

- **v1.0.0**:支持时长/静音比例/可读性检测;异常音频写入 references 清单,可选择过滤或按原名保留
6 changes: 6 additions & 0 deletions runtime/ops/mapper/audio_anomaly_filter/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-

from datamate.core.base_op import OPERATORS

OPERATORS.register_module(module_name='AudioAnomalyFilter',
module_path="ops.mapper.audio_anomaly_filter.process")
Loading