Detection of QRS wave onset in ECG signal [ex902.0]ΒΆ

../../_images/sphx_glr_fig-examp-ecg-onset-detector_001.png
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks

import lmlib as lm
from lmlib.utils.generator import gen_slopes, gen_wgn
from lmlib.utils.generator import *

# -- TEST SIGNAL --

# Constants
K = 3000  # number of samples
k = range(K)


# load ECG signal as a test signal from library
y1_mc = load_multi_channel('EECG_FILT_9CH_10S_FS600HZ.csv', K=K)
y = y1_mc[0:,2] # select single channel



# -- MODEL SETUP --

# Set up Composite Cost Model using two ALSSMs and exponentially decaying windows
#
#         ---->     <----
#      --------------------
#  A_L |   c_L  |    0    |
#      --------------------
#  A_R |   0    |    c_R  |
#      --------------------
#      :        :         :
#     a=-80     0        b=20

alssm_left = lm.AlssmPoly(poly_degree=1)   # A_L, c_L
alssm_right = lm.AlssmPoly(poly_degree=1)   # A_R, c_R
segment_left = lm.Segment(a=-80, b=-1, direction=lm.FORWARD, g=80)
segment_right = lm.Segment(a=0, b=20, direction=lm.BACKWARD, g=20)
F = [[1, 0], [0, 1]] # mixing matrix, turning on and off models per segment (1=on, 0=off)
costs = lm.CompositeCost((alssm_left, alssm_right), (segment_left, segment_right), F)

# Linear Constraints

# Forcing a continuous function of the two lines --> x_hat_edge
H_edge = np.array([[1, 0, 0],   # x_1,left : offset of left line
                   [0, 1, 0],   # x_2,left : slope of left line
                   [1, 0, 0],   # x_1,right : offset of right line
                   [0, 0, 1]])  # x_2,right : slope of right line

# Forcing additionally a common slope of the two lines --> x_hat_line
H_line = np.array([[1, 0],   # x_1,left : offset of left line
                   [0, 1],   # x_2,left : slope of left line
                   [1, 0],   # x_1,right : offset of right line
                   [0, 1]])  # x_2,right : slope of right line


# -- MODEL FITTING --
# Filter
separam = lm.SEParam(costs)
separam.filter(y)

# constraint minimization
x_hat_edge = separam.minimize_x(H_edge)
x_hat_line = separam.minimize_x(H_line)

# Square Error and LCR
error_edge = separam.eval_errors(x_hat_edge)
error_line = separam.eval_errors(x_hat_line)
lcr = -1/2 * np.log(np.divide(error_edge, error_line))

# Find LCR peaks with minimal distance and height
peaks, _ = find_peaks(lcr, height=0.2, distance=400)

# Evaluate trajectories (for plotting only)
trajs_edge = lm.map_trajectories(costs.trajectories(x_hat_edge[peaks]), peaks, K, merge_ks=True)
trajs_line = lm.map_trajectories(costs.trajectories(x_hat_line[peaks]), peaks, K, merge_ks=True)

wins = lm.map_window(costs.window(segment_selection=[1, 1]), peaks, K, merge_ks=True)


# -- PLOTTING --
_, axs = plt.subplots(5, 1, sharex='all', figsize=(7, 7))

axs[0].set(title='Edge Detection in ECG Signals')

axs[0].plot(k, wins[0, :], c='tab:orange', lw=1.0, label=r'$w( \theta_L )$')
axs[0].plot(k, wins[1, :], c='tab:blue', lw=1.0, label=r'$w( \theta_R )$')
axs[0].set(ylabel=r'window')
axs[0].legend(loc=1)

axs[1].plot(k, y, lw=1.0, c='gray', label='y')
# axs[1].plot(k, trajs_edge[0, :, 0], c='r', lw=1.5, label='$H: c A^{k-k_0} x, k<k_0$')
# axs[1].plot(k, trajs_edge[1, :, 0], c='b', lw=1.5, label='$H: c A^{k-k_0} x, k > k_0$')
#axs[1].plot(k, trajs_line[0, :, 0], '--', c='k', lw=1.0, label='$H_0 : c A^{k-k_0} x$')
#axs[1].plot(k, trajs_line[1, :, 0], '--', c='k')
axs[1].set(ylabel=r'observations')
axs[1].legend(loc=1)


axs[2].plot(k, y, lw=1.0, c='gray')
axs[2].plot(k, trajs_edge[0, :, 0], c='tab:orange', lw=1.5, label='$H: c_L A_L^{(j-k)} x_L$')
axs[2].plot(k, trajs_edge[1, :, 0], c='tab:blue', lw=1.5, label='$H: c_R A_R^{(j-k)} x_R$')
axs[2].plot(k, trajs_line[0, :, 0], '-', c='k', lw=1.0, label='$H_0 : c A^{(j-k)} x$')
axs[2].plot(k, trajs_line[1, :, 0], '-', c='k')
axs[2].set(ylabel=r'trajectories')
axs[2].legend(loc=1)

axs[3].plot(k, error_edge, c='tab:green', label="$H_1: J(x_{L})+J(x_{R})$")
axs[3].plot(k, error_line, c='k', label="$H_0: J(x_{L}) = J(x_{R})$")
axs[3].set(ylabel=r'costs')
axs[3].legend(loc=1)

axs[4].plot(k, lcr,  c='tab:blue')
axs[4].scatter(peaks, lcr[peaks], marker=7, c='r')
axs[4].set(xlabel='k', ylabel='LCR')

plt.show()

Total running time of the script: ( 0 minutes 0.958 seconds)

Gallery generated by Sphinx-Gallery