def emd(signal, num_imfs): imfs = [] residue = signal.copy() for i in range(num_imfs): imf = residue.copy() while True: maxima = np.maximum(imf, 0) minima = np.minimum(imf, 0) max_env = hilbert(maxima) min_env = hilbert(-minima) mean_env = (max_env + min_env) / 2 imf_prev = imf imf = imf - mean_env if np.sum(np.abs(imf - imf_prev)) < 0.01: break imfs.append(imf) residue = residue - imf return imfs, residue def reconstruct(imfs, residue): signal = residue.copy() for imf in imfs: signal += imf return signal def denoise(signal, num_imfs): imfs, residue = emd(signal, num_imfs) denoised_signal = reconstruct(imfs, residue) return denoised_signal
时间: 2024-02-14 08:27:56 浏览: 29
这是一个使用经验模态分解 (Empirical Mode Decomposition, EMD) 方法进行信号去噪的代码。EMD 是一种将信号分解为一系列本征模态函数 (Intrinsic Mode Functions, IMFs) 的方法,其中每个 IMF 表示信号中的一个固有振动模式。通过去除一定数量的 IMFs 和剩余信号的重构,可以实现信号的去噪。
`emd` 函数接受一个信号和要提取的 IMF 数量作为输入,并返回提取的 IMFs 和剩余信号。在 `emd` 函数中,它首先对输入信号进行复制,并通过迭代计算每个 IMF。在每次迭代中,它计算出信号的局部极大值和极小值,并通过 Hilbert 变换获得它们的包络。然后根据包络计算出当前 IMF,并将其从原始信号中减去。迭代直到 IMF 的变化小于一个阈值(0.01)为止。最后,将提取的 IMF 添加到 IMFs 列表中,并更新剩余信号。
`reconstruct` 函数接受提取的 IMFs 和剩余信号,并重构原始信号。它首先将剩余信号复制给 `signal`,然后逐个将 IMFs 添加到 `signal` 中。
`denoise` 函数是一个封装函数,它将调用 `emd` 函数来提取 IMFs 和剩余信号,并通过调用 `reconstruct` 函数将它们重构为去噪信号。
请问有什么问题我可以帮助您解答的吗?
相关问题
data00=data m,n = np.shape(data00) a = np.array(data00) Data00 = a[1:m,2:n] Data00 = Data00.astype(np.float64) Power = Data00[:,13] Power_train = Power[0:96] P_min = np.min(Power_train) P_gap = np.max(Power_train)-np.min(Power_train) Power_uni = (Power-P_min)/P_gap # 提取imfs和剩余信号res emd = EMD() emd.emd(Power_uni) imfs, res = emd.get_imfs_and_residue() N = len(imfs) P_H = np.sum(imfs[0:6,:],axis=0) P_M = np.sum(imfs[6:12,:],axis=0) P_L = res P_H =np.expand_dims(P_H,axis=1) P_M =np.expand_dims(P_M,axis=1) P_L =np.expand_dims(P_L,axis=1) Nwp = Data00[:,0:7] Nwp_train = Nwp[0:96] N_min = np.min(Nwp_train,axis=0) N_gap = np.max(Nwp_train,axis=0)-np.min(Nwp_train,axis=0) Nwp_uni = (Nwp-N_min)/N_gap#(N,7) Weather = Data00[:,7:13] Weather_train = Weather[0:96] W_min = np.min(Weather_train,axis=0) W_gap = np.max(Weather_train,axis=0)-np.min(Weather_train,axis=0) Weather_uni = (Weather-W_min)/W_gap#(N,6) 优化代码
以下是部分代码的优化建议:
1. 对于以下代码段:
```
a = np.array(data00)
Data00 = a[1:m,2:n]
```
可以合并为一行:
```
Data00 = np.array(data00)[1:m,2:n]
```
2. 对于以下代码段:
```
P_H = np.sum(imfs[0:6,:],axis=0)
P_M = np.sum(imfs[6:12,:],axis=0)
P_L = res
P_H =np.expand_dims(P_H,axis=1)
P_M =np.expand_dims(P_M,axis=1)
P_L =np.expand_dims(P_L,axis=1)
```
可以使用 `np.newaxis` 替代 `np.expand_dims` 来实现:
```
P_H = np.sum(imfs[0:6,:],axis=0)[:, np.newaxis]
P_M = np.sum(imfs[6:12,:],axis=0)[:, np.newaxis]
P_L = res[:, np.newaxis]
```
3. 对于以下代码段:
```
N_min = np.min(Nwp_train,axis=0)
N_gap = np.max(Nwp_train,axis=0)-np.min(Nwp_train,axis=0)
Nwp_uni = (Nwp-N_min)/N_gap#(N,7)
```
可以使用 `np.ptp` 函数(peak-to-peak)来计算最大值和最小值的差:
```
N_min = np.min(Nwp_train,axis=0)
N_gap = np.ptp(Nwp_train,axis=0)
Nwp_uni = (Nwp-N_min)/N_gap#(N,7)
```
4. 对于以下代码段:
```
Weather = Data00[:,7:13]
Weather_train = Weather[0:96]
W_min = np.min(Weather_train,axis=0)
W_gap = np.max(Weather_train,axis=0)-np.min(Weather_train,axis=0)
Weather_uni = (Weather-W_min)/W_gap#(N,6)
```
可以使用与第三个优化建议类似的方法:
```
Weather = Data00[:,7:13]
Weather_train = Weather[0:96]
W_min = np.min(Weather_train,axis=0)
W_gap = np.ptp(Weather_train,axis=0)
Weather_uni = (Weather-W_min)/W_gap#(N,6)
```
当然,以上只是一些简单的优化建议,具体的优化效果还需要根据实际情况进行评估。
CEEMDAN算法实现
CEEMDAN(Complete Ensemble Empirical Mode Decomposition with Adaptive Noise)是一种用于非线性和非稳定信号分析的算法,以下是CEEMDAN算法的实现步骤:
1. 对原始信号进行扩展,添加一定比例的高斯白噪声。
2. 将扩展后的信号分解为若干个固有模态函数(IMF)和一个残差项。这里使用EMD(Empirical Mode Decomposition)算法进行分解。
3. 对每个IMF进行多次重构,每次重构时都会添加不同的高斯白噪声。
4. 将每个IMF的多次重构结果取平均得到最终的IMF。
5. 将所有的IMF相加得到CEEMDAN分解结果。
下面是Python代码实现CEEMDAN算法:
```python
import numpy as np
import scipy.signal as signal
def ceemdan(signal, num_of_sifts):
max_sifts = 10
num_of_sifts = min(num_of_sifts, max_sifts)
num_of_imfs = num_of_sifts + 1
# 添加高斯白噪声
noise_std = 0.2 * np.std(signal)
noise = np.random.normal(0, noise_std, size=signal.shape)
signal = signal + noise
imfs = np.zeros((num_of_imfs, signal.shape[0]))
for sift_idx in range(num_of_sifts):
res = signal.copy()
for imf_idx in range(num_of_imfs):
# EMD分解
imf = signal - res
imf_residue = imf - signal
res = res - imf
if np.sum(imf**2) < 1e-10:
break
imfs[imf_idx] += imf
signal = res
# 添加高斯白噪声
noise_std = 0.2 * np.std(signal)
noise = np.random.normal(0, noise_std, size=signal.shape)
signal = signal + noise
# 处理最后一个IMF
imfs[num_of_imfs-1] = signal
# 对每个IMF进行多次重构
imfs_r = np.zeros((num_of_imfs, signal.shape[0]))
for i in range(num_of_imfs):
for sift_idx in range(num_of_sifts):
res = imfs[i].copy()
for imf_idx in range(num_of_imfs):
# EMD分解
imf = imfs[i] - res
imf_residue = imf - imfs[i]
res = res - imf
if np.sum(imf**2) < 1e-10:
break
imfs_r[i] += imf
imfs[i] = res
# 添加高斯白噪声
noise_std = 0.2 * np.std(imfs[i])
noise = np.random.normal(0, noise_std, size=imfs[i].shape)
imfs[i] = imfs[i] + noise
# 取平均得到最终的IMF
for i in range(num_of_imfs):
imfs[i] = imfs[i] + imfs_r[i] / num_of_sifts
# CEEMDAN分解结果
ceemdan_signal = np.sum(imfs, axis=0)
return ceemdan_signal
```
其中,`signal`是输入信号,`num_of_sifts`是SIFT(Shift Invariant Fourier Transform)的次数,即每个IMF进行多少次重构。最后返回CEEMDAN分解结果`ceemdan_signal`。