Notch detection in ABP Signal [ex000.0]ΒΆ

Detection of notches in a ABP (Arterial Blood Pressure) signal using 3rd order Polynomials.

../../_images/sphx_glr_fig-solution-exercise-detect-abp-notch-poly_001.png
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
import csv as csv

import lmlib as lm
from lmlib.utils.generator import gen_wgn
from lmlib.utils.beta import find_max_mask
from lmlib.utils.beta import load_source_csv


# -- LOAD TEST SIGNAL --
y_raw = load_source_csv('./csv-data/cerebral-vasoreg-diabetes-heaad-up-tilt_day1-s0030DA-noheader.csv',
                        time_format = "H:M:S")
fs = 500
k0 = 2200
K = 2100
k = range(K)
y_n = gen_wgn(K, sigma=2, seed=431412) # generate Gaussian noise (to test robustness)
y_ecg = y_raw[k0:k0+K, 2] # select ECG signal (Channel 2)
y = y_raw[k0:k0+K, 3]  + y_n # select ABP signal (Channel 3)



# Find QRS locations using simple peak detection
(peaks_ecg_ind, _) = find_peaks(y_ecg, distance=fs*0.6)

# standard peak detection on abp
(peaks_abp_ind, _) = find_peaks(y, height=1, distance=fs*0.6)


# --- Model 1 - Notch Detector---
alssm1 = lm.AlssmPoly(poly_degree=3)
segment1 = lm.Segment(a=-40, b=40, direction=lm.FORWARD, g=100)
cost1 = lm.CostSegment(alssm1, segment1)

# ALSSM Filter
separam1 = lm.SEParam(cost1)
separam1.filter(y)
x_hat1 = separam1.minimize_x() # states: x_hat1[:,0] -> offset; x_hat1[:,1] -> slope; x_hat1[:,2] -> curvature

# Detection rule: curvature > 0 AND slope maximal.
curvature_mask = x_hat1[:,2]>0 # mask samples where curvature is positive
notch_ind = find_max_mask(x_hat1[:,1], (curvature_mask,), locs=peaks_abp_ind, range_start=int(fs*0.1), range_end=int(fs*0.3), skip_invalid=False)


# --- Model 2 - Systolic Start Detector ---
alssm2 = lm.AlssmPoly(poly_degree=3)
segment2 = lm.Segment(a=-70, b=2, direction=lm.FORWARD, g=100)
cost2 = lm.CostSegment(alssm2, segment2)

# ALSSM Filter
separam2 = lm.SEParam(cost2)
separam2.filter(y)
x_hat2 = separam2.minimize_x()  # states: x_hat2[:,0] -> offset; x_hat2[:,1] -> slope; x_hat2[:,2] -> curvature

# Detection rule: time index where slope gets >= 0 --> start of systole
slope_0_msk = (x_hat2[:,1]<0)
systol_ind = find_max_mask(y, (slope_0_msk), locs=peaks_abp_ind, range_start=int(-fs*0.4), range_end=int(-fs*0.15), skip_invalid=False, direction='backward')


# --- Compute ALSSM Trajectories and Windows for later Plotting ---
# Trajectories
trajs_1 = lm.map_trajectories(cost1.trajectories(x_hat1[notch_ind[notch_ind>=0]]), notch_ind, K, merge_ks=True)
trajs_2 = lm.map_trajectories(cost2.trajectories(x_hat2[systol_ind[systol_ind>=0]]), systol_ind, K, merge_ks=True)

# peak estimate
y_hat1 = cost1.eval_alssm(x_hat1)
y_hat2 = cost2.eval_alssm(x_hat2)



# --- PLOTTING ---
_, axs = plt.subplots(3, 1, sharex='all', figsize=(9, 4))

axs[0].plot(k, y, lw=0.5, c='tab:gray', label='')
axs[0].plot(k, trajs_1[:, 0], lw=1, c='r', label='')
axs[0].plot(k, trajs_2[:, 0], lw=1, c='b', label='')
axs[0].scatter(notch_ind, y_hat1[notch_ind], s=30, marker=7,  c='k', label='systole start', zorder=20)
axs[0].scatter(systol_ind, y_hat2[systol_ind], s=30, marker=6,  c='k', label='systole end', zorder=20)
for xc in notch_ind:
    axs[0].axvline(x=xc, c='tab:red', ls='--')
for xc in systol_ind:
    axs[0].axvline(x=xc, c='b', ls='--')

axs[0].set(ylabel=r'$y_k$ [mmHg]')
axs[0].set_title('arterial blood pressure (ABP)')
axs[0].legend(loc=4)

axs[1].plot(k, x_hat1[:,0], c='tab:green', label='$a_0$')
axs[1].plot(k, x_hat1[:,1]*100, c='tab:red', label='$a_1 \cdot 100$')
axs[1].plot(k, x_hat1[:,2]*10000, c='tab:gray', label='$a_2 \cdot 10e3$')
for xc in notch_ind:
    axs[1].axvline(x=xc, c='tab:red', ls='--')

axs[1].set(ylabel=r'syst. end')
axs[1].legend(loc=4)
axs[1].grid(True)


axs[2].plot(k, x_hat2[:,0], c='tab:green', label='$a_0$')
axs[2].plot(k, x_hat2[:,1]*100, c='tab:red', label='$a_1 \cdot 100$')
axs[2].plot(k, x_hat2[:,2]*10000, c='tab:gray', label='$a_2 \cdot 10e3$')
for xc in systol_ind:
    axs[2].axvline(x=xc, c='b', ls='--')

axs[2].set(ylabel=r'syst. start')
axs[2].set_ylim([-300, 300])
axs[2].legend(loc=4)
axs[2].grid(True)

axs[2].set(xlabel=r'time index $k$')




plt.show()

Total running time of the script: ( 0 minutes 1.579 seconds)

Gallery generated by Sphinx-Gallery