Detection of changes in signal slopeΒΆ

../../_images/sphx_glr_fig-solution-exercise-model-switch-sine-2lines_001.png

Out:

[ 308  905 1505]

import matplotlib.pyplot as plt
import numpy as np
import numpy.matlib
from scipy.signal import find_peaks
import lmlib as lm
from lmlib.utils.generator import gen_wgn, load_multi_channel, gen_sine

# --- Generating Test signal ---


K = 2000
k = np.arange(K)



if True: # test signal with jumps

    fs = 600  # [Hz] sampling frequency
    # create test signal
    y1 = gen_sine(K, k_periods=[600,], amplitudes=[1.0,] , k0s=[300,])
    y1[y1<0] = 0
    yn = gen_wgn(K, sigma=.02, seed=12345)

y = y1 + yn



# ----- Models and Parameter Estimation  --------------

# Polynomial ALSSM
alssm_line = lm.AlssmPoly(poly_degree=1)
# Segment

L_LEFT=150
L_RIGHT=40
segment_left = lm.Segment(a=-L_LEFT, b=-1, direction=lm.BACKWARD, g=5000)
segment_right = lm.Segment(a=0, b=L_RIGHT, direction=lm.BACKWARD, g=5000)
segment_left_right = lm.Segment(a=-L_LEFT, b=L_RIGHT, direction=lm.BACKWARD, g=5000)


# CostSeg
cost_left = lm.CostSegment(alssm_line, segment_left)
cost_right = lm.CostSegment(alssm_line, segment_right)
cost_left_right = lm.CostSegment(alssm_line, segment_left_right)


# filter signal and take the approximation
se_param_left = lm.SEParam(cost_left)
se_param_left.filter(y)

se_param_right = lm.SEParam(cost_right)
se_param_right.filter(y)

se_param_left_right = lm.SEParam(cost_left_right)
se_param_left_right.filter(y)

# unconstrained minimizations
xs_right = se_param_right.minimize_x()
xs_left = se_param_left.minimize_x()
xs_left_right = se_param_left_right.minimize_x()

J_1 = se_param_left.eval_errors(xs_left)+se_param_right.eval_errors(xs_right) # low if jump is detected
J_0 = se_param_left_right.eval_errors(xs_left_right)  # low if no jumpm is detected

lcr = -.5*np.log(J_1 / J_0)

ind_peaks, _  = find_peaks(lcr, height=0.1, distance = 400, )

# ----------------  Plot  -----------------
ks = ind_peaks # indeces to display fit
print(ind_peaks)
trajs_left = lm.map_trajectories(cost_left.trajectories(xs_left[ks], thd=0.1), ks, K)
trajs_right = lm.map_trajectories(cost_right.trajectories(xs_right[ks], thd=0.1), ks, K)



fig = plt.figure(figsize=(7,3))

axs = fig.add_subplot(3, 1, (1))
axs.plot(k, y, lw=1.0, c='tab:gray')
axs.plot(k, y1, ':', lw=1.0, c='k')
axs.set( ylabel='y')


for (n, _) in enumerate(trajs_left[:,:,0]):
    axs.plot(k, trajs_left[n,:,0], lw=1.5, c='b', ls='-', label='traj. left' if n == 0 else '')
    axs.plot(k, trajs_right[n,:,0], lw=1.5, c='tab:red', ls='-', label='traj. right' if n == 0 else '')
for xc in ind_peaks:
    axs.axvline(x=xc, c='b', lw=0.5, ls='--')
axs.grid(True)
axs.legend(loc='upper right')

axs = fig.add_subplot(3, 1, (2), sharex=axs)
axs.plot(k, J_1, label='$H_1: J_{a}^{-1}(k,x_L) + J_{0}^{b}(k,x_R) $', c='k')
axs.plot(k, J_0, label='$H_0: J_{a}^{b}(k,x)$', c='tab:green')

for xc in ind_peaks:
    axs.axvline(x=xc, c='b', lw=0.5, ls='--')
axs.legend(loc='upper right')
axs.set( ylabel='Costs')


axs = fig.add_subplot(3, 1, (3), sharex=axs)
axs.plot(k, lcr, label='LCR',  c='tab:purple')
axs.plot(ind_peaks, lcr[ind_peaks], "x", c='k')
for xc in ind_peaks:
    axs.axvline(x=xc, c='b', lw=0.5, ls='--')
axs.set( ylabel='LCR')

axs.legend(loc='upper right')
plt.subplots_adjust(hspace=0.4)

plt.show()

Total running time of the script: ( 0 minutes 0.716 seconds)

Gallery generated by Sphinx-Gallery