diff --git a/Simulation/mic_array_gd_localization.py b/Simulation/mic_array_gd_localization.py new file mode 100644 index 0000000..41b2616 --- /dev/null +++ b/Simulation/mic_array_gd_localization.py @@ -0,0 +1,259 @@ +import numpy as np +import matplotlib.pyplot as plt +from scipy.fft import fft, ifft + + +class GCCPHATTOAGDLocalizer3D: + """5x5 平面阵列:先用 GCC-PHAT 估计每个麦克风 TOA,再用梯度下降估计 3D 声源位置。""" + + def __init__( + self, + fs=24000, + c_sound=343.0, + grid_shape=(5, 5), + spacing=0.04, + seed=7, + ): + self.fs = fs + self.c = c_sound + self.grid_shape = grid_shape + self.spacing = spacing + self.rng = np.random.default_rng(seed) + + self.mic_pos = self._build_xy_plane_array(grid_shape, spacing) + self.num_mics = self.mic_pos.shape[0] + + def _build_xy_plane_array(self, grid_shape, spacing): + nx, ny = grid_shape + xs = (np.arange(nx) - (nx - 1) / 2.0) * spacing + ys = (np.arange(ny) - (ny - 1) / 2.0) * spacing + xx, yy = np.meshgrid(xs, ys, indexing="xy") + zz = np.zeros_like(xx) + return np.column_stack((xx.ravel(), yy.ravel(), zz.ravel())) + + def generate_source_signal(self, duration=0.08, f_low=300.0, f_high=4000.0): + n = int(self.fs * duration) + if n <= 0: + raise ValueError("duration 必须大于 0") + + freqs = np.fft.rfftfreq(n, d=1.0 / self.fs) + spectrum = np.zeros(freqs.size, dtype=np.complex128) + band_mask = (freqs >= f_low) & (freqs <= min(f_high, self.fs / 2 - 1.0)) + band_count = np.count_nonzero(band_mask) + if band_count == 0: + raise ValueError("duration 过短,无法形成有效宽带信号") + + amp = self.rng.uniform(0.2, 1.0, size=band_count) + phase = self.rng.uniform(0.0, 2.0 * np.pi, size=band_count) + spectrum[band_mask] = amp * np.exp(1j * phase) + + signal = np.fft.irfft(spectrum, n=n) + noise_std = max(0.03 * np.std(signal), 1e-4) + signal = signal + self.rng.normal(0.0, noise_std, size=n) + + peak = np.max(np.abs(signal)) + if peak > 1e-12: + signal = signal / peak + return signal + + def apply_delay_freq_domain(self, signal, delay_sec): + n = len(signal) + padded = np.pad(signal, (0, n)) + n_fft = len(padded) + sig_fft = fft(padded) + freqs = np.fft.fftfreq(n_fft, d=1.0 / self.fs) + phase_shift = np.exp(-1j * 2.0 * np.pi * freqs * delay_sec) + delayed = ifft(sig_fft * phase_shift).real + return delayed[:n] + + def simulate_mic_signals(self, src_sig, source_pos, snr_db=20.0): + source_pos = np.asarray(source_pos, dtype=float) + if source_pos.shape != (3,): + raise ValueError("source_pos 必须是长度为 3 的向量 [x, y, z]") + if source_pos[2] <= 0: + raise ValueError("仅考虑前方声源,请保证 z > 0") + + vec = source_pos[None, :] - self.mic_pos + dists = np.linalg.norm(vec, axis=1) + arrival_times = dists / self.c + + mic_signals = [] + for dist, delay in zip(dists, arrival_times): + delayed = self.apply_delay_freq_domain(src_sig, delay) + atten = 1.0 / max(dist, 1e-3) + mic_signals.append(delayed * atten) + + mic_signals = np.asarray(mic_signals) + for i in range(self.num_mics): + sig_power = np.mean(mic_signals[i] ** 2) + noise_power = sig_power / (10.0 ** (snr_db / 10.0)) + noise_std = np.sqrt(max(noise_power, 1e-12)) + mic_signals[i] += self.rng.normal(0.0, noise_std, size=mic_signals.shape[1]) + + return mic_signals, arrival_times + + def gcc_phat_delay(self, sig, refsig, max_tau=0.01, interp=16): + n = sig.shape[0] + refsig.shape[0] + sig_fft = fft(sig, n=n) + ref_fft = fft(refsig, n=n) + + cross_power = sig_fft * np.conj(ref_fft) + cross_power /= np.abs(cross_power) + 1e-15 + + corr = ifft(cross_power, n=interp * n).real + + max_shift = int(interp * n / 2) + if max_tau is not None: + max_shift = min(int(interp * self.fs * max_tau), max_shift) + + corr_window = np.concatenate((corr[-max_shift:], corr[: max_shift + 1])) + shift = int(np.argmax(corr_window)) - max_shift + tau = shift / float(interp * self.fs) + return tau + + def estimate_toa_with_gcc_phat(self, mic_signals, src_sig): + toa = np.zeros(self.num_mics) + for i in range(self.num_mics): + toa[i] = self.gcc_phat_delay(mic_signals[i], src_sig) + return toa + + def predict_toa(self, source_pos): + vec = source_pos[None, :] - self.mic_pos + dists = np.linalg.norm(vec, axis=1) + return dists / self.c + + def localize_with_gradient_descent( + self, + measured_toa, + init_pos=np.array([0.0, 0.0, 0.40]), + lr=0.10, + max_iters=2000, + decay=0.001, + z_min=0.02, + tol=1e-9, + ): + x = np.asarray(init_pos, dtype=float).copy() + if x.shape != (3,): + raise ValueError("init_pos 必须是长度为 3 的向量 [x, y, z]") + + x[2] = max(x[2], z_min) + target_ranges = measured_toa * self.c + + loss_history = [] + for i in range(max_iters): + vec = x[None, :] - self.mic_pos + dists = np.linalg.norm(vec, axis=1) + dists = np.maximum(dists, 1e-8) + + err = dists - target_ranges + loss = 0.5 * np.mean(err ** 2) + loss_history.append(loss) + + grad = np.mean(err[:, None] * (vec / dists[:, None]), axis=0) + step = lr / (1.0 + decay * i) + x = x - step * grad + x[2] = max(x[2], z_min) + + if np.linalg.norm(grad) < tol: + break + + pred_toa = self.predict_toa(x) + return x, np.asarray(loss_history), pred_toa + + def visualize(self, true_pos, est_pos, loss_history, measured_toa, fitted_toa): + fig = plt.figure(figsize=(16, 5)) + + ax1 = fig.add_subplot(1, 3, 1, projection="3d") + ax1.scatter(self.mic_pos[:, 0], self.mic_pos[:, 1], self.mic_pos[:, 2], c="royalblue", s=28, label="Microphones") + ax1.scatter(true_pos[0], true_pos[1], true_pos[2], c="limegreen", marker="*", s=220, label="True source") + ax1.scatter(est_pos[0], est_pos[1], est_pos[2], c="crimson", marker="^", s=120, label="Estimated source") + ax1.plot( + [true_pos[0], est_pos[0]], + [true_pos[1], est_pos[1]], + [true_pos[2], est_pos[2]], + "k--", + linewidth=1.2, + ) + ax1.set_title("3D Localization (z > 0)") + ax1.set_xlabel("x (m)") + ax1.set_ylabel("y (m)") + ax1.set_zlabel("z (m)") + ax1.legend(loc="upper left") + ax1.view_init(elev=24, azim=-55) + + ax2 = fig.add_subplot(1, 3, 2) + ax2.plot(loss_history, color="tab:orange", linewidth=2) + if np.all(loss_history > 0): + ax2.set_yscale("log") + ax2.set_title("Gradient Descent Loss") + ax2.set_xlabel("Iteration") + ax2.set_ylabel("MSE of range error (m^2)") + ax2.grid(alpha=0.3) + + ax3 = fig.add_subplot(1, 3, 3) + mic_index = np.arange(self.num_mics) + ax3.plot(mic_index, measured_toa * 1e6, "o-", label="Measured TOA (GCC-PHAT)") + ax3.plot(mic_index, fitted_toa * 1e6, "x--", label="Predicted TOA") + ax3.set_title("TOA Fit Across 25 Mics") + ax3.set_xlabel("Mic index") + ax3.set_ylabel("Arrival time (us)") + ax3.grid(alpha=0.3) + ax3.legend() + + fig.tight_layout() + plt.show() + + +def main(): + localizer = GCCPHATTOAGDLocalizer3D( + fs=24000, + c_sound=343.0, + grid_shape=(5, 5), + spacing=0.04, + seed=7, + ) + + true_source = np.array([0.12, -0.08, 0.55]) + + src_signal = localizer.generate_source_signal(duration=0.08) + mic_signals, true_arrival_times = localizer.simulate_mic_signals( + src_signal, + true_source, + snr_db=20.0, + ) + + measured_toa = localizer.estimate_toa_with_gcc_phat(mic_signals, src_signal) + + est_source, loss_history, fitted_toa = localizer.localize_with_gradient_descent( + measured_toa, + init_pos=np.array([0.0, 0.0, 0.40]), + lr=0.10, + max_iters=2000, + decay=0.001, + z_min=0.02, + tol=1e-9, + ) + + pos_err = np.linalg.norm(est_source - true_source) + toa_rmse_us = np.sqrt(np.mean((measured_toa - true_arrival_times) ** 2)) * 1e6 + fit_rmse_us = np.sqrt(np.mean((fitted_toa - measured_toa) ** 2)) * 1e6 + + print("=== 5x5 平面阵列 + GCC-PHAT(TOA) + 梯度下降定位 ===") + print(f"真实声源位置 [x, y, z] (m): {true_source}") + print(f"估计声源位置 [x, y, z] (m): {est_source}") + print(f"位置误差: {pos_err:.4f} m") + print(f"GCC-PHAT 到达时间估计 RMSE (相对真值): {toa_rmse_us:.2f} us") + print(f"估计位置反推 TOA 拟合 RMSE: {fit_rmse_us:.2f} us") + print(f"迭代次数: {len(loss_history)}") + + localizer.visualize( + true_pos=true_source, + est_pos=est_source, + loss_history=loss_history, + measured_toa=measured_toa, + fitted_toa=fitted_toa, + ) + + +if __name__ == "__main__": + main() diff --git a/Simulation/mic_array_sim.py b/Simulation/mic_array_sim.py new file mode 100644 index 0000000..e66cb0b --- /dev/null +++ b/Simulation/mic_array_sim.py @@ -0,0 +1,353 @@ +import numpy as np +import matplotlib.pyplot as plt +from scipy.fft import fft, ifft +from scipy.signal import chirp +import random +import warnings +import time + +# 忽略除零警告,我们在 PHAT 中会处理 epsilon +warnings.filterwarnings('ignore') + +class MicArraySimulator: + def __init__(self, fs=16000, mic_dist=0.05, c_sound=343.0): + """ + 初始化麦克风阵列仿真器 + :param fs: 采样率 (Hz) + :param mic_dist: 麦克风间距 (m), 等边三角形边长 + :param c_sound: 声速 (m/s) + """ + self.fs = fs + self.c = c_sound + self.d = mic_dist + + # 1. 定义麦克风坐标 (等边三角形,A 在前,B 左后,C 右后) + # 原点为阵列中心(重心) + h = np.sqrt(3) / 2 * self.d # 三角形高 + # A (Front): (0, 2/3 * h) + # B (Left): (-d/2, -1/3 * h) + # C (Right): (d/2, -1/3 * h) + self.mic_pos = np.array([ + [0, 2/3 * h], # Mic A + [-self.d/2, -1/3 * h], # Mic B + [self.d/2, -1/3 * h] # Mic C + ]) + + # 预计算基线向量 (用于线性方程法) + # 使用 AB 和 AC 作为独立基 + self.vec_AB = self.mic_pos[1] - self.mic_pos[0] + self.vec_AC = self.mic_pos[2] - self.mic_pos[0] + + # 构建矩阵 M 及其逆矩阵 (预先计算,嵌入式可查表) + # M = [ [AB_x, AB_y], [AC_x, AC_y] ] + self.M = np.array([ + [self.vec_AB[0], self.vec_AB[1]], + [self.vec_AC[0], self.vec_AC[1]] + ]) + self.M_inv = np.linalg.inv(self.M) + + # 预计算麦克风对向量 (用于快速 TDOA 理论计算) + # 对:AB, AC, BC,index 对应 (0,1), (0,2), (1,2) + self.pairs = [(0, 1), (0, 2), (1, 2)] + self.pair_vecs = [] + for i, j in self.pairs: + vec = self.mic_pos[j] - self.mic_pos[i] + self.pair_vecs.append(vec) + + self.mic_signals = [] # 存储每次仿真生成的麦克风信号 + + def generate_mic_signals(self, src_sig, true_angle): + """生成带延迟的信号,添加微小随机误差模拟实际情况""" + mics = [] + theta = np.deg2rad(true_angle) + wave_dir = np.array([-np.sin(theta), -np.cos(theta)]) + for pos in self.mic_pos: + dist = np.dot(pos, wave_dir) + # 添加采样噪声,模拟实际情况中的量化误差 + noised_sig = src_sig + np.random.normal(0.0, 0.01, size=len(src_sig)) # 添加微小噪声,表示采样的量化误差 + # delay = dist / self.c + random.uniform(-0.05, 0.05) * (1/self.fs) # 添加微小随机误差,范围为 ±0.05 个采样周期 + delay = dist / self.c + mics.append(self.apply_delay_freq_domain(noised_sig, delay)) + self.mic_signals.append(mics) + + def generate_source_signal(self, duration=0.1): + """生成类人声宽带信号,先在频域生成所选带宽的信号,转为时域后添加噪声""" + N = int(self.fs * duration) + if N <= 0: + raise ValueError("duration 必须大于 0") + + # 1) 在频域构造 200~4000Hz 的随机宽带信号(仅正频率,便于 irfft 重建实信号) + freqs = np.fft.rfftfreq(N, d=1 / self.fs) + spectrum = np.zeros(len(freqs), dtype=np.complex128) + + band_mask = (freqs >= 200.0) & (freqs <= 4000.0) + band_count = np.count_nonzero(band_mask) + if band_count == 0: + raise ValueError("duration 过短,无法覆盖 200~4000Hz 频带") + + rand_amp = np.random.uniform(0.2, 1.0, size=band_count) + rand_phase = np.random.uniform(0.0, 2 * np.pi, size=band_count) + spectrum[band_mask] = rand_amp * np.exp(1j * rand_phase) + + # 2) IFFT 到时域 + signal = np.fft.irfft(spectrum, n=N) + + # 3) 添加噪声(高斯白噪声) + noise_std = 0.05 * np.std(signal) + if noise_std < 1e-8: + noise_std = 1e-3 + noise = np.random.normal(0.0, noise_std, size=N) + signal = signal + noise + + # 4) 归一化,避免后续处理溢出 + peak = np.max(np.abs(signal)) + if peak > 1e-12: + signal = signal / peak + + return signal + + + def apply_delay_freq_domain(self, signal, delay_sec): + """ + 在频域施加分数延迟 (比时域移位更精确,模拟真实物理延迟) + """ + N = len(signal) + sig_fft = fft(signal) + # 相位偏移:e^(-j * 2 * pi * f * tau) + freqs = np.fft.fftfreq(N, 1/self.fs) # 频率轴 + phase_shift = np.exp(-1j * 2 * np.pi * freqs * delay_sec) + delayed_sig = ifft(sig_fft * phase_shift).real + return delayed_sig + + def get_theoretical_tdoa(self, azimuth_deg): + """ + 根据声源方位角计算理论 TDOA + :param azimuth_deg: 0 度为正前方 (Mic A 方向),顺时针增加 + """ + theta = np.deg2rad(azimuth_deg) + # 远场平面波单位向量 (声源传播方向,与方位角相反) + # 如果声源在 0 度 (前方),波向量指向 -Y 轴 (0, -1) + # 这里定义:0 度 = +Y 轴方向来的波 + source_vec = np.array([np.sin(theta), np.cos(theta)]) + + tdoas = [] + for vec in self.pair_vecs: + # 投影距离 / 声速 + dist_diff = np.dot(vec, source_vec) + tdoas.append(dist_diff / self.c) + return tdoas + + def gcc_phat(self, sig1, sig2): + """ + 核心算法:GCC-PHAT + 返回:互相关函数,峰值位置 (样本数) + """ + N = len(sig1) + # 1. FFT + S1 = fft(sig1, n=N) + S2 = fft(sig2, n=N) + + # 2. 互功率谱 + R = S1 * np.conj(S2) # 互功率谱,乘以第二个信号的共轭,得到相位差信息 + + # 3. PHAT 加权 (幅度归一化) + # 添加 epsilon 防止除零 + epsilon = 1e-10 + R_phat = R / (np.abs(R) + epsilon) + + # 4. IFFT 得到互相关 + corr = ifft(R_phat).real # 互相关函数,长度为 N,中心点对应零延迟,左右分别对应正负延迟 + # 互相关的延迟意味着峰值位置,峰值越高说明两个信号越相似,峰值位置对应的延迟就是 TDOA 的估计值 + + # 5. 峰值检测 (整数部分) + peak_idx = np.argmax(corr) + + # 6. 抛物线插值 (分数延迟估计,关键步骤) + # 取峰值及其左右两点拟合抛物线 + if peak_idx == 0: + idx_prev, idx_curr, idx_next = N-1, 0, 1 + elif peak_idx == N-1: + idx_prev, idx_curr, idx_next = N-2, N-1, 0 + else: + idx_prev, idx_curr, idx_next = peak_idx-1, peak_idx, peak_idx+1 + + y_prev, y_curr, y_next = corr[idx_prev], corr[idx_curr], corr[idx_next] + + # 抛物线顶点偏移公式 + denom = (y_prev - 2*y_curr + y_next) + if abs(denom) < 1e-10: + delta = 0 + else: + delta = 0.5 * (y_prev - y_next) / denom + + fine_peak_idx = peak_idx + delta + + # 处理循环移位 (IFFT 结果后半部分代表负延迟) + if fine_peak_idx > N / 2: + fine_peak_idx -= N + + return corr, fine_peak_idx + + def estimate_doa_linear(self, signals): + """ + 基于三角函数/线性方程组的直接解算方法 + 速度极快,适合嵌入式 + """ + # 1. 获取 TDOA (秒) + _, tau_AB = self.gcc_phat(signals[0], signals[1]) + _, tau_AC = self.gcc_phat(signals[0], signals[2]) + + # 2. 构建方程右侧向量 b = [c * tau_AB, c * tau_AC] + b = np.array([self.c * tau_AB, self.c * tau_AC]) + + # 3. 求解 k = M_inv * b + # k 代表声波传播向量 [kx, ky] + k = np.dot(self.M_inv, b) + + # 4. 归一化 (消除噪声导致的模长误差) + norm = np.linalg.norm(k) + if norm < 1e-6: + return 0.0 # 避免除零 + + k_hat = k / norm + + # 5. 反解角度 + # k_hat = [-sin(theta), -cos(theta)] + theta_rad = np.arctan2(k_hat[0], k_hat[1]) + theta_deg = np.rad2deg(theta_rad) + + # 转换为 0-360 + if theta_deg < 0: + theta_deg += 360 + + return theta_deg + + def estimate_doa_grid(self, signals): + """ + 基于网格搜索的最小二乘法 (之前版本) + 精度高,但计算量大 + """ + measured_tdoas = [] + for i, j in self.pairs: + _, tau = self.gcc_phat(signals[i], signals[j]) + measured_tdoas.append(tau) + + angles = np.arange(0, 360, 0.1) + errors = [] + for ang in angles: + theta = np.deg2rad(ang) + source_vec = np.array([np.sin(theta), np.cos(theta)]) + err = 0 + for idx, (i, j) in enumerate(self.pairs): + vec = self.mic_pos[j] - self.mic_pos[i] + theo_tdoa = np.dot(vec, source_vec) / self.c + err += (measured_tdoas[idx] - theo_tdoa)**2 + errors.append(err) + + return angles[np.argmin(errors)] + + def run_simulation(self, true_angle, sim_sig, method='linear'): + + # 解算 + if method == 'linear': + est_angle = self.estimate_doa_linear(sim_sig) + else: + est_angle = self.estimate_doa_grid(sim_sig) + + return true_angle, est_angle + +# ========================================== +# 主程序:对比两种算法 +# ========================================== +if __name__ == "__main__": + sim = MicArraySimulator(fs=24000, mic_dist=0.05) + + test_angles = np.arange(0, 360, 30) + results_linear = [] + results_grid = [] + + # 生成一个固定的源信号,所有测试使用同一信号,确保对比公平 + src_sig = sim.generate_source_signal(duration=0.03) + # 生成每次仿真中ABC麦克风的信号,添加微小随机误差模拟实际情况 + for ang in test_angles: + sim.generate_mic_signals(src_sig, ang) + # 计时对比 + t_start = time.perf_counter() + for i in range(len(test_angles)): + true_a, est_a = sim.run_simulation(test_angles[i], sim.mic_signals[i], method='linear') + results_linear.append((true_a, est_a)) + t_linear = time.perf_counter() - t_start + + t_start = time.perf_counter() + for i in range(len(test_angles)): + true_a, est_a = sim.run_simulation(test_angles[i], sim.mic_signals[i], method='grid') + results_grid.append((true_a, est_a)) + t_grid = time.perf_counter() - t_start + + # 打印结果 + print(f"{'Angle':<6} | {'Linear Err':<10} | {'Grid Err':<10}") + print("-" * 35) + for i, ang in enumerate(test_angles): + err_l = abs(results_linear[i][1] - results_linear[i][0]) + err_g = abs(results_grid[i][1] - results_grid[i][0]) + if err_l > 180: err_l = 360 - err_l + if err_g > 180: err_g = 360 - err_g + print(f"{ang:<6.0f} | {err_l:<10.2f} | {err_g:<10.2f}") + + print(f"\n耗时对比 ( {len(test_angles)} 次解算 ):") + print(f"线性解算法:{t_linear*1000:.2f} ms") + print(f"网格搜索法:{t_grid*1000:.2f} ms") + print(f"速度提升:{t_grid/t_linear:.1f} 倍") + + # 可视化 + # 1. 误差分布图 + plt.figure(figsize=(10, 5)) + + # 误差分布 + plt.subplot(1, 2, 1) + errs_l = [abs(r[1]-r[0]) if abs(r[1]-r[0])<=180 else 360-abs(r[1]-r[0]) for r in results_linear] + errs_g = [abs(r[1]-r[0]) if abs(r[1]-r[0])<=180 else 360-abs(r[1]-r[0]) for r in results_grid] + plt.plot(test_angles, errs_l, 'b-o', label='Linear Trig') + plt.plot(test_angles, errs_g, 'r--s', label='Grid Search') + plt.title("DOA Estimation Error vs Angle") + plt.xlabel("True Angle (deg)") + plt.ylabel("Absolute Error (deg)") + plt.legend() + plt.grid(True, alpha=0.3) + + # 散点图 + plt.subplot(1, 2, 2) + est_l = [r[1] for r in results_linear] + est_g = [r[1] for r in results_grid] + plt.plot(test_angles, test_angles, 'k--', label='Ideal') + plt.scatter(test_angles, est_l, c='blue', label='Linear', alpha=0.6) + plt.scatter(test_angles, est_g, c='red', marker='x', label='Grid', alpha=0.6) + plt.title("Estimated vs True Angle") + plt.xlabel("True Angle") + plt.ylabel("Estimated Angle") + plt.legend() + plt.grid(True, alpha=0.3) + + plt.tight_layout() + + # 2. 采样波形图 + plt.figure(figsize=(12, 4)) + # 这里我们可以展示一个信号的原始波形和带延迟的波形,看看噪声的影响 + plt.subplot(1, 2, 1) + plt.plot(src_sig, label='Original Signal') + plt.plot(sim.mic_signals[0][0], label='Noisy&Delayed Signal') + plt.title("Effect of Sampling Noise") + plt.xlabel("Sample Index") + plt.ylabel("Amplitude") + plt.legend() + plt.grid(True, alpha=0.3) + # 这里展示互相关函数,看看噪声对峰值的影响 + plt.subplot(1, 2, 2) + corr, fine_peak_idx = sim.gcc_phat(sim.mic_signals[0][0], sim.mic_signals[0][1]) + plt.plot(corr, label='GCC-PHAT Correlation') + plt.title("GCC-PHAT Correlation") + plt.xlabel("Lag") + plt.ylabel("Correlation") + plt.legend() + plt.grid(True, alpha=0.3) + plt.show() \ No newline at end of file