Peak detection in ABP signal [ex905.0]ΒΆ

../../_images/sphx_glr_fig-exercise-detect-abp-peaks_001.png
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
import csv as csv

import lmlib as lm
from lmlib.utils.generator import gen_slope, gen_wgn
from lmlib.utils.beta import find_max_mask
from lmlib.utils.beta import load_source_csv




# -- LOAD TEST SIGNAL --
y_raw = load_source_csv('./csv-data/cerebral-vasoreg-diabetes-heaad-up-tilt_day1-s0030DA-noheader.csv',
                        time_format = "H:M:S")

fs = 500
k0 = 2200
K = 1100
k = range(K)
y_n = gen_wgn(K, sigma=2, seed=431412) # generate Gaussian noise
y_ecg = y_raw[k0:k0+K, 2] # select ECG signal (Channel 2)
y = y_raw[k0:k0+K, 3]  + y_n # select ABP signal (Channel 3)


# Find QRS locations using simple peak detection
(peaks_ecg_ind, _) = find_peaks(y_ecg, distance=fs*0.6)

# standard peak detection on abp
(peaks_abp_ind, _) = find_peaks(y, height=1, distance=fs*0.8)


# Model
alssm = lm.AlssmPoly(poly_degree=2)
segment = lm.Segment(a=-50, b=50, direction=lm.FORWARD, g=1000)
cost = lm.CostSegment(alssm, segment)


# ALSSM Filter
separam = lm.SEParam(cost)
separam.filter(y)
x_hat_edge = separam.minimize_x()

# extract slope and offset using estimates
slope = x_hat_edge[:,1]
offset =  x_hat_edge[:,0]

# find zero crossing of slope
peaks_abp_slope_ind = find_max_mask(-np.abs(slope), () , locs=peaks_abp_ind, range_start=-int(fs*0.1), range_end=int(fs*0.1))

# Trajectories
trajs_edge = lm.map_trajectories(cost.trajectories(x_hat_edge[peaks_abp_slope_ind]), peaks_abp_slope_ind, K, merge_ks=True)

# Windows
wins = lm.map_window(cost.window(), peaks_abp_slope_ind, K, merge_ks=True)

# peak estimate
y_hat = cost.eval_alssm(x_hat_edge)



# --- PLOTTING ---
_, axs = plt.subplots(2, 1, sharex='all', figsize=(6, 3))

axs[0].plot(k, y, lw=0.5, c='tab:gray', label='')
axs[0].plot(k, trajs_edge[:, 0], lw=1, c='r', label='')
axs[0].scatter(peaks_abp_ind, y[peaks_abp_ind], s=30, marker='+',  c='k', label='peaks', zorder=20)
axs[0].scatter(peaks_abp_slope_ind, y_hat[peaks_abp_slope_ind], s=30, marker=7,  c='k', label='peaks', zorder=20)

for xc in peaks_abp_slope_ind:
    axs[0].axvline(x=xc, c='tab:red', ls='--')
axs[0].set(xlabel='k', ylabel=r'$y_k$')
axs[0].set_title('arterial blood pressure (ABP)')
axs[0].legend(loc=4)

axs[1].plot(k, x_hat_edge[:,0], c='tab:blue', label='$a_0$')
axs[1].plot(k, x_hat_edge[:,1]*100, c='tab:red', label='$a_1 \cdot 100$')
axs[1].plot(k, x_hat_edge[:,2]*10000, c='tab:orange', label='$a_2 \cdot 10e3$')
for xc in peaks_abp_slope_ind:
    axs[1].axvline(x=xc, c='tab:red', ls='--')
axs[1].legend(loc=4)
axs[1].grid(True)

plt.show()

Total running time of the script: ( 0 minutes 1.055 seconds)

Gallery generated by Sphinx-Gallery