Note
Click here to download the full example code
Detection of peaks in a ABP by - naive peak detection (find_peaks) - applying symmetric and non-symmetric mean average filters
import numpy as np import matplotlib.pyplot as plt from scipy.signal import find_peaks import csv as csv import lmlib as lm from lmlib.utils.generator import gen_wgn from lmlib.utils.beta import find_max_mask from lmlib.utils.beta import load_source_csv from lmlib.utils.beta import diff0 def moving_average_filter(y, a, b=0): """ Moving average filter with sliding window borders a and b. Note: this is a naive (slow) implementation and for demonstration purpose only. :param y: signal :param a: left window boundary :param b: right window boundary (optional) :return: filtered signal """ y_avg = np.zeros_like(y) # create empty array K = len(y) for k in range(K): y_sum = 0 # summed samples l = 0 # number of summed samples (= window length) for i in range(a, b+1): if (k+i >=0) and (k+i < K): y_sum += y[k+i] l += 1 if (l>0): # prevent runtime errors near the signal borders where l=0 y_avg[k] = y_sum / l else: y_avg[k] = None return y_avg # Load Signal k0 = 2200 # signal start index K = 1300 # number of samples to load fs = 500 # sampling frequency y_raw = load_source_csv('./csv-data/cerebral-vasoreg-diabetes-heaad-up-tilt_day1-s0030DA-noheader.csv', time_format = "H:M:S", K = K, k = k0) k = range(K) # select abp signal and add Gaussian noise (to test robustness of detection) y = y_raw[:, 3] + gen_wgn(K, sigma=1.0, seed=43141) # --- standard peak detection on abp (peaks_abp_ind, _) = find_peaks(y, height=0, distance=fs*0.8) diff_abp = diff0(peaks_abp_ind)/fs # get delay between two peaks # --- moving average filter and peak detection L=25 # averaging window size y_ma_asym = moving_average_filter(y, a=-2*L, b=0) # non-symmetric filter (peaks_abp_mean_asym_ind, _) = find_peaks(y_ma_asym, distance=fs*0.8) y_ma_sym = moving_average_filter(y, a=-L, b=L) # symmetric filter (peaks_abp_mean_sym_ind, _) = find_peaks(y_ma_sym, distance=fs*0.8) # Plots _, axs = plt.subplots(2, 1, sharex='all', figsize=(8, 3.5)) plt.subplots_adjust(hspace=0.3) ax = axs[0] ax.plot(k, y, lw=0.5, c='tab:gray', label='$y$') ax.plot(k, y_ma_asym, lw=1.0, c='b', label='${y}_{avg}$') ax.scatter(peaks_abp_ind, y[peaks_abp_ind],s=40, marker=7, c='tab:gray', label='peaks', zorder=20) ax.scatter(peaks_abp_mean_asym_ind, y_ma_asym[peaks_abp_mean_asym_ind],s=40, marker=7, c='b', label='peaks', zorder=20) ax.set(ylabel=r'$y_k$') ax.set_title('non-symmetric, causal moving average filter with a='+str(-2*L)+' and b=0') ax.legend(loc=4) ax = axs[1] ax.plot(k, y, lw=0.5, c='tab:gray', label='$y$') ax.plot(k, y_ma_sym, lw=1.0, ls='-', c='b', label='${y}_{avg}$') ax.scatter(peaks_abp_ind, y[peaks_abp_ind],s=40, marker=7, c='tab:gray', label='peaks', zorder=20) ax.scatter(peaks_abp_mean_sym_ind, y_ma_sym[peaks_abp_mean_sym_ind],s=40, marker=7, c='b', label='peaks', zorder=20) ax.set(xlabel='k', ylabel=r'$y_k$') ax.set_title('symmetric, acausal moving average filter with a='+str(-L)+' and b='+str(L)) ax.legend(loc=4) plt.show()
Total running time of the script: ( 0 minutes 0.317 seconds)
Gallery generated by Sphinx-Gallery