import numpy as np import matplotlib.pyplot as plt from scipy.fft import fft, ifft from scipy.signal import chirp import random import warnings import time # 忽略除零警告,我们在 PHAT 中会处理 epsilon warnings.filterwarnings('ignore') class MicArraySimulator: def __init__(self, fs=16000, mic_dist=0.05, c_sound=343.0): """ 初始化麦克风阵列仿真器 :param fs: 采样率 (Hz) :param mic_dist: 麦克风间距 (m), 等边三角形边长 :param c_sound: 声速 (m/s) """ self.fs = fs self.c = c_sound self.d = mic_dist # 1. 定义麦克风坐标 (等边三角形,A 在前,B 左后,C 右后) # 原点为阵列中心(重心) h = np.sqrt(3) / 2 * self.d # 三角形高 # A (Front): (0, 2/3 * h) # B (Left): (-d/2, -1/3 * h) # C (Right): (d/2, -1/3 * h) self.mic_pos = np.array([ [0, 2/3 * h], # Mic A [-self.d/2, -1/3 * h], # Mic B [self.d/2, -1/3 * h] # Mic C ]) # 预计算基线向量 (用于线性方程法) # 使用 AB 和 AC 作为独立基 self.vec_AB = self.mic_pos[1] - self.mic_pos[0] self.vec_AC = self.mic_pos[2] - self.mic_pos[0] # 构建矩阵 M 及其逆矩阵 (预先计算,嵌入式可查表) # M = [ [AB_x, AB_y], [AC_x, AC_y] ] self.M = np.array([ [self.vec_AB[0], self.vec_AB[1]], [self.vec_AC[0], self.vec_AC[1]] ]) self.M_inv = np.linalg.inv(self.M) # 预计算麦克风对向量 (用于快速 TDOA 理论计算) # 对:AB, AC, BC,index 对应 (0,1), (0,2), (1,2) self.pairs = [(0, 1), (0, 2), (1, 2)] self.pair_vecs = [] for i, j in self.pairs: vec = self.mic_pos[j] - self.mic_pos[i] self.pair_vecs.append(vec) self.mic_signals = [] # 存储每次仿真生成的麦克风信号 def generate_mic_signals(self, src_sig, true_angle): """生成带延迟的信号,添加微小随机误差模拟实际情况""" mics = [] theta = np.deg2rad(true_angle) wave_dir = np.array([-np.sin(theta), -np.cos(theta)]) for pos in self.mic_pos: dist = np.dot(pos, wave_dir) # 添加采样噪声,模拟实际情况中的量化误差 noised_sig = src_sig + np.random.normal(0.0, 0.01, size=len(src_sig)) # 添加微小噪声,表示采样的量化误差 # delay = dist / self.c + random.uniform(-0.05, 0.05) * (1/self.fs) # 添加微小随机误差,范围为 ±0.05 个采样周期 delay = dist / self.c mics.append(self.apply_delay_freq_domain(noised_sig, delay)) self.mic_signals.append(mics) def generate_source_signal(self, duration=0.1): """生成类人声宽带信号,先在频域生成所选带宽的信号,转为时域后添加噪声""" N = int(self.fs * duration) if N <= 0: raise ValueError("duration 必须大于 0") # 1) 在频域构造 200~4000Hz 的随机宽带信号(仅正频率,便于 irfft 重建实信号) freqs = np.fft.rfftfreq(N, d=1 / self.fs) spectrum = np.zeros(len(freqs), dtype=np.complex128) band_mask = (freqs >= 200.0) & (freqs <= 4000.0) band_count = np.count_nonzero(band_mask) if band_count == 0: raise ValueError("duration 过短,无法覆盖 200~4000Hz 频带") rand_amp = np.random.uniform(0.2, 1.0, size=band_count) rand_phase = np.random.uniform(0.0, 2 * np.pi, size=band_count) spectrum[band_mask] = rand_amp * np.exp(1j * rand_phase) # 2) IFFT 到时域 signal = np.fft.irfft(spectrum, n=N) # 3) 添加噪声(高斯白噪声) noise_std = 0.05 * np.std(signal) if noise_std < 1e-8: noise_std = 1e-3 noise = np.random.normal(0.0, noise_std, size=N) signal = signal + noise # 4) 归一化,避免后续处理溢出 peak = np.max(np.abs(signal)) if peak > 1e-12: signal = signal / peak return signal def apply_delay_freq_domain(self, signal, delay_sec): """ 在频域施加分数延迟 (比时域移位更精确,模拟真实物理延迟) """ N = len(signal) sig_fft = fft(signal) # 相位偏移:e^(-j * 2 * pi * f * tau) freqs = np.fft.fftfreq(N, 1/self.fs) # 频率轴 phase_shift = np.exp(-1j * 2 * np.pi * freqs * delay_sec) delayed_sig = ifft(sig_fft * phase_shift).real return delayed_sig def get_theoretical_tdoa(self, azimuth_deg): """ 根据声源方位角计算理论 TDOA :param azimuth_deg: 0 度为正前方 (Mic A 方向),顺时针增加 """ theta = np.deg2rad(azimuth_deg) # 远场平面波单位向量 (声源传播方向,与方位角相反) # 如果声源在 0 度 (前方),波向量指向 -Y 轴 (0, -1) # 这里定义:0 度 = +Y 轴方向来的波 source_vec = np.array([np.sin(theta), np.cos(theta)]) tdoas = [] for vec in self.pair_vecs: # 投影距离 / 声速 dist_diff = np.dot(vec, source_vec) tdoas.append(dist_diff / self.c) return tdoas def gcc_phat(self, sig1, sig2): """ 核心算法:GCC-PHAT 返回:互相关函数,峰值位置 (样本数) """ N = len(sig1) # 1. FFT S1 = fft(sig1, n=N) S2 = fft(sig2, n=N) # 2. 互功率谱 R = S1 * np.conj(S2) # 互功率谱,乘以第二个信号的共轭,得到相位差信息 # 3. PHAT 加权 (幅度归一化) # 添加 epsilon 防止除零 epsilon = 1e-10 R_phat = R / (np.abs(R) + epsilon) # 4. IFFT 得到互相关 corr = ifft(R_phat).real # 互相关函数,长度为 N,中心点对应零延迟,左右分别对应正负延迟 # 互相关的延迟意味着峰值位置,峰值越高说明两个信号越相似,峰值位置对应的延迟就是 TDOA 的估计值 # 5. 峰值检测 (整数部分) peak_idx = np.argmax(corr) # 6. 抛物线插值 (分数延迟估计,关键步骤) # 取峰值及其左右两点拟合抛物线 if peak_idx == 0: idx_prev, idx_curr, idx_next = N-1, 0, 1 elif peak_idx == N-1: idx_prev, idx_curr, idx_next = N-2, N-1, 0 else: idx_prev, idx_curr, idx_next = peak_idx-1, peak_idx, peak_idx+1 y_prev, y_curr, y_next = corr[idx_prev], corr[idx_curr], corr[idx_next] # 抛物线顶点偏移公式 denom = (y_prev - 2*y_curr + y_next) if abs(denom) < 1e-10: delta = 0 else: delta = 0.5 * (y_prev - y_next) / denom fine_peak_idx = peak_idx + delta # 处理循环移位 (IFFT 结果后半部分代表负延迟) if fine_peak_idx > N / 2: fine_peak_idx -= N return corr, fine_peak_idx def estimate_doa_linear(self, signals): """ 基于三角函数/线性方程组的直接解算方法 速度极快,适合嵌入式 """ # 1. 获取 TDOA (秒) _, tau_AB = self.gcc_phat(signals[0], signals[1]) _, tau_AC = self.gcc_phat(signals[0], signals[2]) # 2. 构建方程右侧向量 b = [c * tau_AB, c * tau_AC] b = np.array([self.c * tau_AB, self.c * tau_AC]) # 3. 求解 k = M_inv * b # k 代表声波传播向量 [kx, ky] k = np.dot(self.M_inv, b) # 4. 归一化 (消除噪声导致的模长误差) norm = np.linalg.norm(k) if norm < 1e-6: return 0.0 # 避免除零 k_hat = k / norm # 5. 反解角度 # k_hat = [-sin(theta), -cos(theta)] theta_rad = np.arctan2(k_hat[0], k_hat[1]) theta_deg = np.rad2deg(theta_rad) # 转换为 0-360 if theta_deg < 0: theta_deg += 360 return theta_deg def estimate_doa_grid(self, signals): """ 基于网格搜索的最小二乘法 (之前版本) 精度高,但计算量大 """ measured_tdoas = [] for i, j in self.pairs: _, tau = self.gcc_phat(signals[i], signals[j]) measured_tdoas.append(tau) angles = np.arange(0, 360, 0.1) errors = [] for ang in angles: theta = np.deg2rad(ang) source_vec = np.array([np.sin(theta), np.cos(theta)]) err = 0 for idx, (i, j) in enumerate(self.pairs): vec = self.mic_pos[j] - self.mic_pos[i] theo_tdoa = np.dot(vec, source_vec) / self.c err += (measured_tdoas[idx] - theo_tdoa)**2 errors.append(err) return angles[np.argmin(errors)] def run_simulation(self, true_angle, sim_sig, method='linear'): # 解算 if method == 'linear': est_angle = self.estimate_doa_linear(sim_sig) else: est_angle = self.estimate_doa_grid(sim_sig) return true_angle, est_angle # ========================================== # 主程序:对比两种算法 # ========================================== if __name__ == "__main__": sim = MicArraySimulator(fs=24000, mic_dist=0.05) test_angles = np.arange(0, 360, 30) results_linear = [] results_grid = [] # 生成一个固定的源信号,所有测试使用同一信号,确保对比公平 src_sig = sim.generate_source_signal(duration=0.03) # 生成每次仿真中ABC麦克风的信号,添加微小随机误差模拟实际情况 for ang in test_angles: sim.generate_mic_signals(src_sig, ang) # 计时对比 t_start = time.perf_counter() for i in range(len(test_angles)): true_a, est_a = sim.run_simulation(test_angles[i], sim.mic_signals[i], method='linear') results_linear.append((true_a, est_a)) t_linear = time.perf_counter() - t_start t_start = time.perf_counter() for i in range(len(test_angles)): true_a, est_a = sim.run_simulation(test_angles[i], sim.mic_signals[i], method='grid') results_grid.append((true_a, est_a)) t_grid = time.perf_counter() - t_start # 打印结果 print(f"{'Angle':<6} | {'Linear Err':<10} | {'Grid Err':<10}") print("-" * 35) for i, ang in enumerate(test_angles): err_l = abs(results_linear[i][1] - results_linear[i][0]) err_g = abs(results_grid[i][1] - results_grid[i][0]) if err_l > 180: err_l = 360 - err_l if err_g > 180: err_g = 360 - err_g print(f"{ang:<6.0f} | {err_l:<10.2f} | {err_g:<10.2f}") print(f"\n耗时对比 ( {len(test_angles)} 次解算 ):") print(f"线性解算法:{t_linear*1000:.2f} ms") print(f"网格搜索法:{t_grid*1000:.2f} ms") print(f"速度提升:{t_grid/t_linear:.1f} 倍") # 可视化 # 1. 误差分布图 plt.figure(figsize=(10, 5)) # 误差分布 plt.subplot(1, 2, 1) errs_l = [abs(r[1]-r[0]) if abs(r[1]-r[0])<=180 else 360-abs(r[1]-r[0]) for r in results_linear] errs_g = [abs(r[1]-r[0]) if abs(r[1]-r[0])<=180 else 360-abs(r[1]-r[0]) for r in results_grid] plt.plot(test_angles, errs_l, 'b-o', label='Linear Trig') plt.plot(test_angles, errs_g, 'r--s', label='Grid Search') plt.title("DOA Estimation Error vs Angle") plt.xlabel("True Angle (deg)") plt.ylabel("Absolute Error (deg)") plt.legend() plt.grid(True, alpha=0.3) # 散点图 plt.subplot(1, 2, 2) est_l = [r[1] for r in results_linear] est_g = [r[1] for r in results_grid] plt.plot(test_angles, test_angles, 'k--', label='Ideal') plt.scatter(test_angles, est_l, c='blue', label='Linear', alpha=0.6) plt.scatter(test_angles, est_g, c='red', marker='x', label='Grid', alpha=0.6) plt.title("Estimated vs True Angle") plt.xlabel("True Angle") plt.ylabel("Estimated Angle") plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() # 2. 采样波形图 plt.figure(figsize=(12, 4)) # 这里我们可以展示一个信号的原始波形和带延迟的波形,看看噪声的影响 plt.subplot(1, 2, 1) plt.plot(src_sig, label='Original Signal') plt.plot(sim.mic_signals[0][0], label='Noisy&Delayed Signal') plt.title("Effect of Sampling Noise") plt.xlabel("Sample Index") plt.ylabel("Amplitude") plt.legend() plt.grid(True, alpha=0.3) # 这里展示互相关函数,看看噪声对峰值的影响 plt.subplot(1, 2, 2) corr, fine_peak_idx = sim.gcc_phat(sim.mic_signals[0][0], sim.mic_signals[0][1]) plt.plot(corr, label='GCC-PHAT Correlation') plt.title("GCC-PHAT Correlation") plt.xlabel("Lag") plt.ylabel("Correlation") plt.legend() plt.grid(True, alpha=0.3) plt.show()