Detection of jumps in a signalΒΆ

../../_images/sphx_glr_fig-solution-exercise-model-switch-jump_001.png
import matplotlib.pyplot as plt
import numpy as np
import numpy.matlib
from scipy.signal import find_peaks
import lmlib as lm
from lmlib.utils.generator import *

# --- Generating Test signal ---


K = 2000
k = np.arange(K)



if True: # test signal with jumps

    fs = 600  # [Hz] sampling frequency
    # create test signal
    y1 = gen_steps(K, [500, 700, 1100, 1400], [1, .5, -1, .3])
    yn = gen_wgn(K, sigma=.2, seed=12345)


y = y1 + yn



# ----- Models and Parameter Estimation  --------------

# Polynomial ALSSM
alssm_poly = lm.AlssmPoly(poly_degree=0)
# Segment

L=60
segment_left = lm.Segment(a=-L, b=-1, direction=lm.BACKWARD, g=5000)
segment_right = lm.Segment(a=0, b=L, direction=lm.BACKWARD, g=5000)
segment_left_right = lm.Segment(a=-L, b=L, direction=lm.BACKWARD, g=5000)


# CostSeg
cost_left = lm.CostSegment(alssm_poly, segment_left)
cost_right = lm.CostSegment(alssm_poly, segment_right)
cost_left_right = lm.CostSegment(alssm_poly, segment_left_right)


# filter signal and take the approximation
se_param_left = lm.SEParam(cost_left)
se_param_left.filter(y)

se_param_right = lm.SEParam(cost_right)
se_param_right.filter(y)

se_param_left_right = lm.SEParam(cost_left_right)
se_param_left_right.filter(y)

# unconstrained minimizations
xs_left = se_param_left.minimize_x()
xs_right = se_param_right.minimize_x()
xs_left_right = se_param_left_right.minimize_x()

J_1 = se_param_left.eval_errors(xs_left)+se_param_right.eval_errors(xs_right) # low if jump is detected
J_0 = se_param_left_right.eval_errors(xs_left_right)  # low if no jumpm is detected

lcr = -.5*np.log(J_1 / J_0)

ind_peaks, _  = find_peaks(lcr, height=0.3, distance = 30, )

# ----------------  Plot  -----------------
ks = ind_peaks # indeces to display fit

trajs_left = lm.map_trajectories(cost_left.trajectories(xs_left[ks], thd=0.1), ks, K)
trajs_right = lm.map_trajectories(cost_right.trajectories(xs_right[ks], thd=0.1), ks, K)



fig = plt.figure(figsize=(7,3))

axs = fig.add_subplot(3, 1, (1))
axs.plot(k, y, lw=1.0, c='tab:gray')
axs.plot(k, y1, ':', lw=1.0, c='k')
axs.set( ylabel='y')


for (n, _) in enumerate(trajs_left[:,:,0]):
    axs.plot(k, trajs_left[n,:,0], lw=1.5, c='b', ls='-', label='traj. of $J_a^{-1}(\cdot)$' if n == 0 else '')
    axs.plot(k, trajs_right[n,:,0], lw=1.5, c='tab:red', ls='-', label='traj. of $J_0^{b}(\cdot)$' if n == 0 else '')
for xc in ind_peaks:
    axs.axvline(x=xc, c='b', lw=0.5, ls='--')
axs.grid(True)
axs.legend(loc='upper right')

axs = fig.add_subplot(3, 1, (2), sharex=axs)
axs.plot(k, J_1, label='$J_1$', c='k')
axs.plot(k, J_0, label='$J_0$', c='tab:green')

for xc in ind_peaks:
    axs.axvline(x=xc, c='b', lw=0.5, ls='--')
axs.legend(loc='upper right')
axs.set( ylabel='Costs')


axs = fig.add_subplot(3, 1, (3), sharex=axs)
axs.plot(k, lcr, label='LCR',  c='tab:purple')
axs.plot(ind_peaks, lcr[ind_peaks], "x", c='k')
for xc in ind_peaks:
    axs.axvline(x=xc, c='b', lw=0.5, ls='--')
axs.set( ylabel='LCR')

axs.legend(loc='upper right')
plt.subplots_adjust(hspace=0.4)

plt.show()

Total running time of the script: ( 0 minutes 0.677 seconds)

Gallery generated by Sphinx-Gallery