Shape/Pulse Detection with Baseline [ex113.0]

This example demonstrates the detection of a known shape in a signal with baseline. The estimated parameters are the shapes amplitude and an estimate of the baseline.

This example is published in [Wildhaber2017] .

../../_images/sphx_glr_example-ex113.0-shape-detection_001.png

Out:

CostSegment : label: None
  └- ['Alssm : polynomial-jordan, A: (4, 4), C: (1, 4), label: alssm-pulse', 'Alssm : polynomial, A: (3, 3), C: (1, 3), label: alssm-baseline'],
  └- [Segment : a: -inf, b: -51, direction: fw, g: 100, delta: -51, label: left-segment , Segment : a: -50, b: 50, direction: fw, g: 2000, delta: 0, label: center-segment , Segment : a: 51, b: inf, direction: bw, g: 100, delta: 51, label: right-segment ]
H_A :  [[ 9.77477462e-03  0.00000000e+00  0.00000000e+00  0.00000000e+00]
 [-1.34563291e-03  0.00000000e+00  0.00000000e+00  0.00000000e+00]
 [-7.90529634e-07  0.00000000e+00  0.00000000e+00  0.00000000e+00]
 [ 3.09595504e-06  0.00000000e+00  0.00000000e+00  0.00000000e+00]
 [ 0.00000000e+00  1.00000000e+00  0.00000000e+00  0.00000000e+00]
 [ 0.00000000e+00  0.00000000e+00  1.00000000e+00  0.00000000e+00]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]
H_0 :  [[0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0.]
 [0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 1.]]

import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import find_peaks
from scipy.linalg import block_diag

import lmlib as lm
from lmlib.utils.generator import *


# --------------- loading test signal -----------------------
file_name = 'EECG_BASELINE_9CH_10S_FS2400HZ.csv'
K = 10000  # number of samples to process
y = load_multi_channel(file_name, K, ch_select=[3,])
y=y[0:]

# --------------- parameters of example -----------------------
LCR_THD = 0.1  # minimum log-cost ratio to detect a pulse in noise

K_REF = 1410 # time index of reference shape

SHAPE_LEN_2 = 50 # 1/2 length of shape to be found
MIN_DIST = 1500 # minimal number of samples between two pulses

g_sp = 2000  # pulse window weight, effective sample number under the window # (larger value lead to a more rectangular-like windows while too large values might lead to nummerical instabilities in the recursive computations.)
g_bl = 100  # baseline window weight, effective sample number under the window (larger value leads to a wider window)

N1 = 3 # number of polynomial coefficient for baseline  Note: N>4 might leads to nummerical instabilities in the recursions
N2 = 4 # number of polynomial coefficient for spike. Note: N>4 might leads to  nummerical instabilities in the recursions.


# --------------- main -----------------------

# Defining ALSSM models
alssm_baseline = lm.AlssmPoly(poly_degree=N1-1, label="alssm-baseline")
alssm_pulse = lm.AlssmPolyJordan(poly_degree=N2-1, label="alssm-pulse")


# Defining segments with a left- resp. right-sided decaying window and a center segment with nearly rectangular window
segmentL = lm.Segment(a=-np.infty, b=-1-SHAPE_LEN_2, direction=lm.FORWARD, g=g_bl, delta=-1-SHAPE_LEN_2, label='left-segment')
segmentC = lm.Segment(a=-SHAPE_LEN_2, b=SHAPE_LEN_2, direction=lm.FORWARD, g=g_sp, label='center-segment')
segmentR = lm.Segment(a=SHAPE_LEN_2+1, b=np.infty, direction=lm.BACKWARD, g=g_bl, delta=SHAPE_LEN_2+1, label='right-segment')

# Defining the final cost function (a so called composite cost = CCost)
# mapping matrix between models and segments (rows = models, columns = segments)
F = [[0, 1, 0],
     [1, 1, 1]]
ccost = lm.CompositeCost(( alssm_pulse, alssm_baseline), (segmentL, segmentC, segmentR), F)
print(ccost)

# filter signal
se_param = lm.SEParam(ccost, use_steady_state=False)
se_param.filter(y) # run recursions

xs = se_param.minimize_x() # unconstrained minimization
xs_ref = xs[K_REF] # store state variables as reference pulse shape

H_A = np.transpose( block_diag( [xs_ref[0:N2]], np.eye(N1) ) ) # constrain matrix to find pulses of same shape as the reference pulse
H_0 = np.transpose( block_diag( np.zeros((N1,N2)), np.eye(N1) ) ) # constrain matrix to test for no pulse (baseline only)

print("H_A : ", H_A)
print("H_0 : ", H_0)

xs_A = se_param.minimize_x(H_A)
xs_0 = se_param.minimize_x(H_0)

J_A = se_param.eval_errors(xs_A, range(K))  # get SE (squared error) for hypothesis 1 (baseline + pulse)
J_0 = se_param.eval_errors(xs_0, range(K))  # get SE (squared error)  for hypothesis 0 (baseline only) --> J0 should be a vector not a matrice

LCR = -0.5 * np.log(J_A / J_0) # log-cost ratio computation


vs = se_param.minimize_v(H_A) # TODO: need to be joined with minimize_x
amp = vs[:,0]

# find peaks
peaks, _ = find_peaks(LCR, height=LCR_THD, distance=MIN_DIST)


# --------------- plotting of results -----------------------
k = np.arange(K)

# Trajectories
trajs_baseline = lm.map_trajectories(ccost.trajectories(xs_A[peaks], F=[[0, 0, 0],[1, 1, 1]], thds=0.01), peaks, K, merge_ks=True, merge_seg=True)
trajs_pulse = lm.map_trajectories(ccost.trajectories(xs_A[peaks], F=[[0, 1, 0],[0, 1, 0]], thds=0.01), peaks, K, merge_ks=True, merge_seg=True)




fig, axs = plt.subplots(6, 1, sharex='all', figsize=(8,6))

# Remove horizontal space between axes, maximize use of plotting pane
fig.tight_layout()
fig.subplots_adjust(hspace=0.0, left=0.08, bottom=0.05)

if peaks.size != 0:
    wins = lm.map_window(ccost.window(segment_selection=[True, True, True], thds=0.001), peaks, K, merge_ks=True, fill_value=0)
#    wins = lm.map_window(ccost.window(segment_selection=[True, True, True], thds=0.001), peaks, K, merge_ks=True)
    axs[0].plot(k, wins[0], color='r', lw=0.5, ls='-', label=segmentL.label)
    axs[0].plot(k, wins[1], color='g', lw=0.5, ls='-', label=segmentC.label)
    axs[0].plot(k, wins[2], color='b', lw=0.5, ls='-', label=segmentR.label)
axs[0].set(ylabel='windows')
axs[0].legend(loc='upper right')

axs[1].plot(k, y, color="gray", lw=1.0, label='y')
axs[1].axvline(x=K_REF, color='black', ls='--')
axs[1].text(K_REF, y[K_REF], ' K_REF',horizontalalignment='right', rotation=90)
axs[1].legend(loc='upper right')

axs[2].plot(k, y, color="gray", lw=1.0, label='y')
axs[2].plot(range(K), trajs_baseline, color='g', lw=1.0, linestyle="-", label='trajectories pulse')
axs[2].plot(range(K), trajs_pulse, color='r', lw=1.0, linestyle="-", label='trajectories baseline')
axs[2].legend(loc='upper right')

axs[3].plot(k, J_A, lw=1.0, color='blue', label=r"$J(x_A)$")
axs[3].plot(k, J_0, lw=1.0, color='darkred', label=r"$J(x_0)$")
axs[3].legend(loc='upper right')

axs[4].plot(k, LCR, lw=1.0, color='green', label=r"$LCR = -.5 ln(J(\hat{x}_A) / J(\hat{x}_0))$")
axs[4].plot(peaks, LCR[peaks], "x", color='r', markersize=8, markeredgewidth=2.0)
axs[4].axhline(LCR_THD, color="black", linestyle="--", lw=1.0)
axs[4].legend(loc='upper right')

axs[5].plot(k, amp, lw=1.0, color='gray', label=r'$\hat{\lambda}_{k}$')
_ , stemlines, _ = axs[4].stem(peaks, amp[peaks], markerfmt="bx", basefmt=" ", use_line_collection=True)
plt.setp(stemlines,'linewidth', 2, 'color','blue')
axs[5].axhline(1.0, color="black", linestyle="--", lw=1.0)
axs[5].axhline(0, color="black",  lw=0.5)
axs[5].legend(loc='upper right')
axs[5].set(xlabel='time index $k$')

plt.show()

Total running time of the script: ( 0 minutes 4.548 seconds)

Gallery generated by Sphinx-Gallery