ABP notch localization [ex905.0]

Identifies the notch in the arterial blood pressure (ABP) signal

../../_images/sphx_glr_fig-exercise-detect-abp-notch_001.png

Out:

------
Identified QRS peaks at:  [ 184  587 1012 1431]
------
Composed signal model:  CostSegment : label: Edge-Model
  └- ['Alssm : polynomial, A: (2, 2), C: (1, 2), label: None', 'Alssm : polynomial, A: (2, 2), C: (1, 2), label: None'],
  └- [Segment : a: -30, b: -1, direction: fw, g: 50, delta: 0, label: None , Segment : a: 0, b: 30, direction: bw, g: 50, delta: 0, label: None ]
/Users/wbr8/homedata/projects/lmlib-branches/reto-doc/lmlib/utils/beta.py:93: RuntimeWarning: invalid value encountered in greater_equal
  loc_msk = (y_loc>=threshold) & msk_merged[ind_start:ind_end]
------
Identified notches at:  [ 417  817 1243]
------

import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
import csv as csv

import lmlib as lm
from lmlib.utils.generator import gen_slope, gen_wgn
from lmlib.utils.beta import find_max_mask
from lmlib.utils.beta import load_source_csv






# y_raw = load_source_csv('../../python/cerebral-vasoreg-diabetes-heaad-up-tilt_day1-s0030DA-noheader.csv') # for direct start
y_raw = load_source_csv('./csv-data/cerebral-vasoreg-diabetes-heaad-up-tilt_day1-s0030DA-noheader.csv', time_format = "H:M:S") # for script start


# Signal
fs = 500
k0 = 2000
K = 1600
y_n = gen_wgn(K, sigma=.5, seed=431412)

y_ecg = y_raw[k0:k0+K, 2]
y = y_raw[k0:k0+K, 3]  + y_n


k = range(K)


# Reference from QRS
(peaks_ecg_ind, _) = find_peaks(y_ecg, distance=fs*0.6)
print("------")
print("Identified QRS peaks at: ", peaks_ecg_ind)


# ALSSM models
alssm_left = lm.AlssmPoly(poly_degree=1)   # A_L, c_L
alssm_right = lm.AlssmPoly(poly_degree=1)   # A_R, c_R
segment_left = lm.Segment(a=-30, b=-1, direction=lm.FORWARD, g=50)
segment_right = lm.Segment(a=0, b=30, direction=lm.BACKWARD, g=50)
F = [[1, 0], [0, 1]] # mixing matrix, turning on and off models per segment (1=on, 0=off)
cost = lm.CompositeCost((alssm_left, alssm_right), (segment_left, segment_right), F, label="Edge-Model")

print("------")
print("Composed signal model: ", cost)


# Linear Constraints

# Forcing a continuous function of the two lines --> x_hat_edge
H_edge = np.array([[1, 0, 0],   # x_1,left : offset of left line
                   [0, 1, 0],   # x_2,left : slope of left line
                   [1, 0, 0],   # x_1,right : offset of right line
                   [0, 0, 1]])  # x_2,right : slope of right line

# Forcing additionally a common slope of the two lines --> x_hat_line
H_line = np.array([[1, 0],   # x_1,left : offset of left line
                   [0, 1],   # x_2,left : slope of left line
                   [1, 0],   # x_1,right : offset of right line
                   [0, 1]])  # x_2,right : slope of right line


# -- MODEL FITTING --
# Filter
separam = lm.SEParam(cost)
separam.filter(y)

# constraint minimization
x_hat_edge = separam.minimize_x(H_edge)
x_hat_line = separam.minimize_x(H_line)

# Square Error and LCR
error_edge = separam.eval_errors(x_hat_edge)
error_line = separam.eval_errors(x_hat_line)
lcr = -1/2 * np.log(np.divide(error_edge, error_line))

# find edges with conditions: only consider samples where slope increases;
peaks_notch = find_max_mask(lcr, (x_hat_edge[:,1]<x_hat_edge[:,3]) , locs=peaks_ecg_ind, range_start=int(fs*0.2), range_end=int(fs*0.6), skip_invalid=True)

print("------")
print("Identified notches at: ", peaks_notch)

print("------")

# Trajectories
trajs_edge = lm.map_trajectories(cost.trajectories(x_hat_edge[peaks_notch], F=F), peaks_notch, K, merge_ks=True)

# peak estimate
y_hat = cost.eval_alssm(x_hat_edge, alssm_selection=[1, 0])

# Plot
_, axs = plt.subplots(5, 1, sharex='all', figsize=(6, 5))

axs[0].set_title('Arterial blood pressure (ABP)')

axs[0].plot(k, y, lw=0.5, c='k', label='y')
#axs[0].scatter(peaks_notch, y_hat[peaks_notch], s=30, marker=6,  c='k', label='peaks', zorder=20)

axs[0].set(xlabel='k', ylabel=r'ABP')


axs[1].plot(k, y, lw=0.5, c='tab:gray', label='y')
axs[1].plot(k, trajs_edge[0, :, 0], lw=1, c='tab:red', label='')
axs[1].plot(k, trajs_edge[1, :, 0], lw=1, c='tab:blue', label='')
axs[1].scatter(peaks_notch, y_hat[peaks_notch], s=30, marker=6,  c='k', label='peaks', zorder=20)

axs[1].legend(loc=4)

axs[2].plot(k, x_hat_edge[:,1], c='tab:blue', label='$x_1$')
axs[2].plot(k, x_hat_edge[:,3], c='tab:red', label='$x_3$')
axs[2].set(ylabel=r'slopes')
axs[2].legend(loc=4)
axs[2].grid(True)

axs[3].plot(k, lcr, c='tab:purple', label='$LCR$')
axs[3].set(ylabel=r'LCR')
axs[3].legend(loc=4)
axs[3].grid(True)

axs[4].plot(k, y_ecg, c='tab:gray', label='ECG')
axs[4].scatter(peaks_ecg_ind, y_ecg[peaks_ecg_ind], s=20, marker=7,  c='k', label='peaks', zorder=20)
axs[4].set(ylabel=r'ECG', xlabel='k')
axs[4].legend(loc=4)

for ax in axs: # add vertical dashed lines
    for xc in peaks_notch:
        ax.axvline(x=xc, c='black', ls='--', lw=1.0)

plt.show()

Total running time of the script: ( 0 minutes 1.345 seconds)

Gallery generated by Sphinx-Gallery