# Peak detection with recursive moving average in ABP [ex9xx.x]ΒΆ

Detection of peaks in a ABP by - naive peak detection (find_peaks) - applying symmetric and non-symmetric mean average filters

Recursive implementation

import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
import csv as csv

import lmlib as lm
from lmlib.utils.generator import gen_slope, gen_wgn

def moving_average_filter_recursive(y, a, b=0):
"""
Moving average filter with sliding window borders a and b.

:param y: signal
:param a: left window boundary
:param b: right window boundary (optional)
:return: filtered signal
"""

y_avg = np.zeros_like(y) # create empty array
K = len(y)

if (b>=0): # correct initialization of y_sum for non-causal windows; only needed when b>=0
y_avg[0] = sum(y[0:b+1])/(b-a+1)

for k in range(K):
y_sum = 0

if (k+a >= 0) and (k+a < K):
y_sum = y_sum - y[k+a]

if (k+1+b >= 0) and (k+1+b < K):
y_sum = y_sum + y[k+1+b]

if (k+1 >= 0) and (k+1 < K):
y_avg[k+1] = y_avg[k] + y_sum / (b-a+1)

return y_avg

k0 = 2200 # signal start index
K = 1300 # number of samples to load
fs = 500 # sampling frequency

k = range(K)

# select abp signal and add Gaussian noise (to test robustness of detection)
y = y_raw[:, 3]  + gen_wgn(K, sigma=1.0, seed=43141)

# --- standard peak detection on abp
(peaks_abp_ind, _) = find_peaks(y, height=0, distance=fs*0.8)

# --- moving average filter and peak detection
L=25 # averaging window size

y_ma_asym = moving_average_filter_recursive(y, a=-2*L, b=0) # non-symmetric filter
(peaks_abp_mean_asym_ind, _) = find_peaks(y_ma_asym, distance=fs*0.8)

y_ma_sym = moving_average_filter_recursive(y, a=-L, b=L) # symmetric filter
(peaks_abp_mean_sym_ind, _) = find_peaks(y_ma_sym, distance=fs*0.8)

# Plots
_, axs = plt.subplots(2, 1, sharex='all', figsize=(8, 3.5))

ax = axs[0]
ax.plot(k, y, lw=0.5, c='tab:gray', label='$y$')
ax.plot(k, y_ma_asym, lw=1.0, c='b', label='${y}_{avg}$')
ax.scatter(peaks_abp_ind, y[peaks_abp_ind],s=40, marker=7,  c='tab:gray', label='peaks', zorder=20)
ax.scatter(peaks_abp_mean_asym_ind, y_ma_asym[peaks_abp_mean_asym_ind],s=40, marker=7,  c='b', label='peaks', zorder=20)
ax.set(ylabel=r'$y_k$')
ax.set_title('non-symmetric, causal moving average filter with a='+str(-2*L)+' and b=0')
ax.legend(loc=4)

ax = axs[1]
ax.plot(k, y, lw=0.5, c='tab:gray', label='$y$')
ax.plot(k, y_ma_sym, lw=1.0, ls='-', c='b', label='${y}_{avg}$')
ax.scatter(peaks_abp_ind, y[peaks_abp_ind],s=40, marker=7,  c='tab:gray', label='peaks', zorder=20)
ax.scatter(peaks_abp_mean_sym_ind, y_ma_sym[peaks_abp_mean_sym_ind],s=40, marker=7,  c='b', label='peaks', zorder=20)
ax.set(xlabel='k', ylabel=r'$y_k$')
ax.set_title('symmetric, acausal moving average filter with a='+str(-L)+' and b='+str(L))
ax.legend(loc=4)

plt.show()


Total running time of the script: ( 0 minutes 0.288 seconds)

Gallery generated by Sphinx-Gallery