Peak detection in ABP signal [ex905.0]ΒΆ

../../_images/sphx_glr_fig-exercise-detect-abp-peaks-lcr_001.png

Out:

34998

import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
import csv as csv

import lmlib as lm
from lmlib.utils.generator import gen_slope, gen_wgn
from lmlib.utils.beta import find_max_mask
from lmlib.utils.beta import load_source_csv



def zero_cross_ind(vals, zeros_at=0):
    """ returns a list of the indeces before a zero crossing occurs in vals

    Parameters
    ----------
        vals: vector to look for zero-crossings
        zeros_at: crossing value different from zero (default: 0).

    """
    return (np.where(np.diff(np.signbit(vals-zeros_at)))[0])

def diff0(y):
    """ returns difference to next element, starting with the difference from 0 for the first element"""
    return np.append(np.array(y[0]), np.diff(y))


# y_raw = load_source_csv('../../python/cerebral-vasoreg-diabetes-heaad-up-tilt_day1-s0030DA-noheader.csv') # for direct start
y_raw = load_source_csv('./csv-data/cerebral-vasoreg-diabetes-heaad-up-tilt_day1-s0030DA-noheader.csv', time_format = "H:M:S") # for script start

print(len(y_raw))




# Signal
fs = 500
k0 = 2200
K = 1100
k = range(K)
y_n = gen_wgn(K, sigma=2, seed=431412)
y_ecg = y_raw[k0:k0+K, 2]
y = y_raw[k0:k0+K, 3]  + y_n


# Reference from QRS
(peaks_ecg_ind, _) = find_peaks(y_ecg, distance=fs*0.6)
diff_ecg = diff0(peaks_ecg_ind)/fs

# standard peak detection on abp
(peaks_abp_ind, _) = find_peaks(y, height=1, distance=fs*0.8)
diff_abp = diff0(peaks_abp_ind)/fs


# Model
alssm_left = lm.AlssmPoly(poly_degree=2)
segment_left = lm.Segment(a=-50, b=50, direction=lm.FORWARD, g=1000)
F = [[1]]
cost = lm.CompositeCost((alssm_left,), (segment_left,), F)


# Filter
separam = lm.SEParam(cost)
separam.filter(y)
x_hat_edge = separam.minimize_x()


# Signal Approximation
y_hat = cost.eval_alssm(x_hat_edge, alssm_selection=[1])

# Square Error and lcr
error_edge = separam.eval_errors(x_hat_edge)

# Find best matches
slope = x_hat_edge[:,1]
offset =  x_hat_edge[:,0]

# find zero crossing of slope
peaks_abp_slope_ind = find_max_mask(-np.abs(slope), () , locs=peaks_abp_ind, range_start=-int(fs*0.1), range_end=int(fs*0.1))
diff_abp_slope = diff0(peaks_abp_slope_ind)/fs


# Trajectories
trajs_edge = lm.map_trajectories(cost.trajectories(x_hat_edge[peaks_abp_slope_ind], F=F), peaks_abp_slope_ind, K, merge_ks=True)

# Windows
wins = lm.map_window(cost.window(segment_selection=[1]), peaks_abp_slope_ind, K, merge_ks=True)

# peak estimate
y_hat = cost.eval_alssm(x_hat_edge, alssm_selection=[1])

# Plot
_, axs = plt.subplots(2, 1, sharex='all', figsize=(6, 3))


axs[0].plot(k, y, lw=0.5, c='tab:gray', label='')
axs[0].plot(k, trajs_edge[0, :, 0], lw=1, c='r', label='')
axs[0].scatter(peaks_abp_ind, y[peaks_abp_ind], s=30, marker='+',  c='k', label='peaks', zorder=20)
axs[0].scatter(peaks_abp_slope_ind, y_hat[peaks_abp_slope_ind], s=30, marker=7,  c='k', label='peaks', zorder=20)

# axs[0].scatter(peaks_abp_slope_ind, offset[peaks_abp_slope_ind], lw='2', marker='+', s=120, c='r', facecolors='none', label='$x_1=0$', zorder=20)
for xc in peaks_abp_slope_ind:
    axs[0].axvline(x=xc, c='tab:red', ls='--')
axs[0].set(xlabel='k', ylabel=r'$y_k$')
axs[0].set_title('arterial blood pressure (ABP)')
axs[0].legend(loc=4)

axs[1].plot(k, x_hat_edge[:,0], c='tab:blue', label='$a_0$')
axs[1].plot(k, x_hat_edge[:,1]*100, c='tab:red', label='$a_1 \cdot 100$')
axs[1].plot(k, x_hat_edge[:,2]*10000, c='tab:orange', label='$a_2 \cdot 10e3$')
for xc in peaks_abp_slope_ind:
    axs[1].axvline(x=xc, c='tab:red', ls='--')
axs[1].legend(loc=4)
axs[1].grid(True)

plt.show()

Total running time of the script: ( 0 minutes 1.045 seconds)

Gallery generated by Sphinx-Gallery