# Peak detection with causal recursive moving average in ABP [ex9xx.x]¶

Recursive implementation. import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
import csv as csv

import lmlib as lm
from lmlib.utils.generator import gen_slope, gen_wgn

def causal_moving_average_recursive(y, L):
"""
Recursive, causal moving average filter with sliding of window size L+1.

:param y: signal
:param L: Window length
:return: filtered signal
"""

y_avg = np.zeros_like(y) # create empty array
K = len(y)

y_avg[L] = y  # for correct initialization

for k in range(L, K-1):
y_avg[k+1] = y_avg[k] + (y[k+1]-y[k-L] ) / (L+1)

return y_avg

k0 = 2200 # signal start index
K = 1300 # number of samples to load
fs = 500 # sampling frequency

k = range(K)

# select abp signal and add Gaussian noise (to test robustness of detection)
y = y_raw[:, 3]  + gen_wgn(K, sigma=1.0, seed=43141)

# --- standard peak detection on abp
(peaks_abp_ind, _) = find_peaks(y, height=0, distance=fs*0.8)

# --- moving average filter and peak detection
L=3 # averaging window size is L+1

y_ma_asym = causal_moving_average_recursive(y, L=L) # non-symmetric filter
(peaks_abp_mean_asym_ind, _) = find_peaks(y_ma_asym, distance=fs*0.8)

# Plots
_, axs = plt.subplots(1, 1, sharex='all', figsize=(8, 3.5))

ax = axs
ax.plot(k, y, lw=0.5, c='tab:gray', label='$y$')
ax.plot(k, y_ma_asym, lw=1.0, c='b', label='${y}_{avg}$')
ax.scatter(peaks_abp_ind, y[peaks_abp_ind],s=40, marker=7,  c='tab:gray', label='peaks', zorder=20)
ax.scatter(peaks_abp_mean_asym_ind, y_ma_asym[peaks_abp_mean_asym_ind],s=40, marker=7,  c='b', label='peaks', zorder=20)
ax.set(ylabel=r'$y_k$')
ax.set_title('non-symmetric, causal moving average filter with L='+str(L))
ax.legend(loc=4)

plt.show()


Total running time of the script: ( 0 minutes 0.217 seconds)

Gallery generated by Sphinx-Gallery