Peak detection with moving average in ABP [ex911.1]ΒΆ

Detection of peaks in a ABP by - naive peak detection (find_peaks) - applying symmetric and non-symmetric mean average filters

../../_images/sphx_glr_fig-examp-detect-abp-peaks-moving-average_001.png
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
import csv as csv

import lmlib as lm
from lmlib.utils.generator import gen_wgn
from lmlib.utils.beta import find_max_mask
from lmlib.utils.beta import load_source_csv
from lmlib.utils.beta import diff0


def moving_average_filter(y, a, b=0):
    """
    Moving average filter with sliding window borders a and b.

    Note: this is a naive (slow) implementation and for demonstration purpose only.

    :param y: signal
    :param a: left window boundary
    :param b: right window boundary (optional)
    :return: filtered signal
    """

    y_avg = np.zeros_like(y) # create empty array
    K = len(y)
    for k in range(K):
        y_sum = 0 # summed samples
        l = 0 # number of summed samples (= window length)
        for i in range(a, b+1):
            if (k+i >=0) and (k+i < K):
                y_sum += y[k+i]
                l += 1
        if (l>0):  # prevent runtime errors near the signal borders where l=0
            y_avg[k] = y_sum / l
        else:
            y_avg[k] = None
    return y_avg




# Load Signal
k0 = 2200 # signal start index
K = 1300 # number of samples to load
fs = 500 # sampling frequency

y_raw = load_source_csv('./csv-data/cerebral-vasoreg-diabetes-heaad-up-tilt_day1-s0030DA-noheader.csv', time_format = "H:M:S", K = K, k = k0)

k = range(K)

# select abp signal and add Gaussian noise (to test robustness of detection)
y = y_raw[:, 3]  + gen_wgn(K, sigma=1.0, seed=43141)

# --- standard peak detection on abp
(peaks_abp_ind, _) = find_peaks(y, height=0, distance=fs*0.8)
diff_abp = diff0(peaks_abp_ind)/fs  # get delay between two peaks

# --- moving average filter and peak detection
L=25 # averaging window size

y_ma_asym = moving_average_filter(y, a=-2*L, b=0) # non-symmetric filter
(peaks_abp_mean_asym_ind, _) = find_peaks(y_ma_asym, distance=fs*0.8)


y_ma_sym = moving_average_filter(y, a=-L, b=L) # symmetric filter
(peaks_abp_mean_sym_ind, _) = find_peaks(y_ma_sym, distance=fs*0.8)



# Plots
_, axs = plt.subplots(2, 1, sharex='all', figsize=(8, 3.5))
plt.subplots_adjust(hspace=0.3)

ax = axs[0]
ax.plot(k, y, lw=0.5, c='tab:gray', label='$y$')
ax.plot(k, y_ma_asym, lw=1.0, c='b', label='${y}_{avg}$')
ax.scatter(peaks_abp_ind, y[peaks_abp_ind],s=40, marker=7,  c='tab:gray', label='peaks', zorder=20)
ax.scatter(peaks_abp_mean_asym_ind, y_ma_asym[peaks_abp_mean_asym_ind],s=40, marker=7,  c='b', label='peaks', zorder=20)
ax.set(ylabel=r'$y_k$')
ax.set_title('non-symmetric, causal moving average filter with a='+str(-2*L)+' and b=0')
ax.legend(loc=4)


ax = axs[1]
ax.plot(k, y, lw=0.5, c='tab:gray', label='$y$')
ax.plot(k, y_ma_sym, lw=1.0, ls='-', c='b', label='${y}_{avg}$')
ax.scatter(peaks_abp_ind, y[peaks_abp_ind],s=40, marker=7,  c='tab:gray', label='peaks', zorder=20)
ax.scatter(peaks_abp_mean_sym_ind, y_ma_sym[peaks_abp_mean_sym_ind],s=40, marker=7,  c='b', label='peaks', zorder=20)
ax.set(xlabel='k', ylabel=r'$y_k$')
ax.set_title('symmetric, acausal moving average filter with a='+str(-L)+' and b='+str(L))
ax.legend(loc=4)

plt.show()

Total running time of the script: ( 0 minutes 0.317 seconds)

Gallery generated by Sphinx-Gallery