# Peak detection with moving average in ABP [ex911.1]ΒΆ

Detection of peaks in a ABP by - naive peak detection (find_peaks) - applying symmetric and non-symmetric mean average filters

import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
import csv as csv

import lmlib as lm
from lmlib.utils.generator import gen_wgn
from lmlib.utils.beta import diff0

def moving_average_filter(y, a, b=0):
"""
Moving average filter with sliding window borders a and b.

Note: this is a naive (slow) implementation and for demonstration purpose only.

:param y: signal
:param a: left window boundary
:param b: right window boundary (optional)
:return: filtered signal
"""

y_avg = np.zeros_like(y) # create empty array
K = len(y)
for k in range(K):
y_sum = 0 # summed samples
l = 0 # number of summed samples (= window length)
for i in range(a, b+1):
if (k+i >=0) and (k+i < K):
y_sum += y[k+i]
l += 1
if (l>0):  # prevent runtime errors near the signal borders where l=0
y_avg[k] = y_sum / l
else:
y_avg[k] = None
return y_avg

k0 = 2200 # signal start index
K = 1300 # number of samples to load
fs = 500 # sampling frequency

k = range(K)

# select abp signal and add Gaussian noise (to test robustness of detection)
y = y_raw[:, 3]  + gen_wgn(K, sigma=1.0, seed=43141)

# --- standard peak detection on abp
(peaks_abp_ind, _) = find_peaks(y, height=0, distance=fs*0.8)
diff_abp = diff0(peaks_abp_ind)/fs  # get delay between two peaks

# --- moving average filter and peak detection
L=25 # averaging window size

y_ma_asym = moving_average_filter(y, a=-2*L, b=0) # non-symmetric filter
(peaks_abp_mean_asym_ind, _) = find_peaks(y_ma_asym, distance=fs*0.8)

y_ma_sym = moving_average_filter(y, a=-L, b=L) # symmetric filter
(peaks_abp_mean_sym_ind, _) = find_peaks(y_ma_sym, distance=fs*0.8)

# Plots
_, axs = plt.subplots(2, 1, sharex='all', figsize=(8, 3.5))

ax = axs[0]
ax.plot(k, y, lw=0.5, c='tab:gray', label='$y$')
ax.plot(k, y_ma_asym, lw=1.0, c='b', label='${y}_{avg}$')
ax.scatter(peaks_abp_ind, y[peaks_abp_ind],s=40, marker=7,  c='tab:gray', label='peaks', zorder=20)
ax.scatter(peaks_abp_mean_asym_ind, y_ma_asym[peaks_abp_mean_asym_ind],s=40, marker=7,  c='b', label='peaks', zorder=20)
ax.set(ylabel=r'$y_k$')
ax.set_title('non-symmetric, causal moving average filter with a='+str(-2*L)+' and b=0')
ax.legend(loc=4)

ax = axs[1]
ax.plot(k, y, lw=0.5, c='tab:gray', label='$y$')
ax.plot(k, y_ma_sym, lw=1.0, ls='-', c='b', label='${y}_{avg}$')
ax.scatter(peaks_abp_ind, y[peaks_abp_ind],s=40, marker=7,  c='tab:gray', label='peaks', zorder=20)
ax.scatter(peaks_abp_mean_sym_ind, y_ma_sym[peaks_abp_mean_sym_ind],s=40, marker=7,  c='b', label='peaks', zorder=20)
ax.set(xlabel='k', ylabel=r'$y_k$')
ax.set_title('symmetric, acausal moving average filter with a='+str(-L)+' and b='+str(L))
ax.legend(loc=4)

plt.show()

Total running time of the script: ( 0 minutes 0.317 seconds)

Gallery generated by Sphinx-Gallery