Rectangular Pulse Detection with Baseline [ex111.0]ΒΆ

This example demonstrates the detection of a rectangular pulses of known duration in a noisy signal with baseline interferences.

import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import find_peaks

import lmlib as lm
from lmlib.utils.generator import (gen_rand_pulse, gen_baseline_sin, gen_wgn, gen_rand_walk)

# --------------- parameters of example -----------------------
K = 4000  # number of samples (length of test signal)
k = np.arange(K)
len_pulse = 20  # [samples] number of samples of the pulse width
y_rpulse = 0.03*gen_rand_pulse(K, n_pulses=6, length=len_pulse, seed=1000)
y = y_rpulse - 0.25+ gen_baseline_sin(K, k_period=2000) + gen_wgn(K, sigma=0.01, seed=1000)

LCR_THD = 0.1  # minimum log-cost ratio to detect a pulse in noise

g_sp = 15000  # pulse window weight, effective sample number under the window # (larger value lead to a more rectangular-like windows while too large values might lead to nummerical instabilities in the recursive computations.)
g_bl = 50  # baseline window weight, effective sample number under the window (larger value leads to a wider window)

# --------------- main -----------------------

# Defining ALSSM models
alssm_pulse = lm.AlssmPoly(poly_degree=0, label="line-model-pulse")
alssm_baseline = lm.AlssmPoly(poly_degree=2, label="offset-model-baseline")

# Defining segments with a left- resp. right-sided decaying window and a center segment with nearly rectangular window
segmentL = lm.Segment(a=-np.inf, b=-1, direction=lm.FORWARD, g=g_bl)
segmentC = lm.Segment(a=0, b=len_pulse, direction=lm.FORWARD, g=g_sp)
segmentR = lm.Segment(a=len_pulse+1, b=np.inf, direction=lm.BACKWARD, g=g_bl, delta=len_pulse)

# Defining the final cost function (a so called composite cost = CCost)
# mapping matrix between models and segments (rows = models, columns = segments)
F = [[0, 1, 0],
     [1, 1, 1]]
costs = lm.CompositeCost((alssm_pulse, alssm_baseline), (segmentL, segmentC, segmentR), F)

# filter signal
se_param = lm.SEParam(costs)
xs = se_param.filter_minimize_x(y)
y_hat = costs.eval_alssm(xs, alssm_selection=[1, 0])

xs_0 = np.copy(xs)
xs_0[:, costs.ind_x('line-model-pulse.x')] = 0

J = se_param.eval_errors(xs, range(K))  # get SE (squared error) for hypothesis 1 (baseline + pulse)
J0 = se_param.eval_errors(xs_0, range(K))  # get SE (squared error)  for hypothesis 0 (baseline only) --> J0 should be a vector not a matrice

LCR = -0.5 * np.log(J / J0)

# find peaks
peaks, _ = find_peaks(LCR, height=LCR_THD, distance=30)

# --------------- plotting of results -----------------------

fig, axs = plt.subplots(4, 1, sharex='all', figsize=(8,6))

# Remove horizontal space between axes

if peaks.size != 0:
    wins = lm.map_window(costs.window(segment_selection=[True, True, True], thds=0.001), peaks, K, merge_ks=True)
    axs[0].plot(k, wins[0], color='r', lw=0.25, ls='-', label=r"$\alpha_k(.)$")
    axs[0].plot(k, wins[1], color='g', lw=0.25, ls='-', label=r"$\alpha_k(.)$")
    axs[0].plot(k, wins[2], color='b', lw=0.25, ls='-', label=r"$\alpha_k(.)$")

axs[1].plot(k, y_rpulse, color="red", lw=.5, linestyle="-", label='pulses (reference)')
axs[1].plot(k, y, color="grey", lw=0.25, label='y (observations)')
axs[1].plot(peaks, y[peaks], "s", color='r', fillstyle='none', markersize=15, markeredgewidth=1.0, lw=1.0)
axs[1].legend(loc='upper right')

axs[2].plot(k, LCR, lw=1.0, color='green', label=r"$LCR = J(\hat{\lambda}_k) / J(0)$")
axs[2].plot(peaks, LCR[peaks], "x", color='r', markersize=8, markeredgewidth=2.0)
axs[2].axhline(LCR_THD, color="black", linestyle="--", lw=1.0)
axs[2].legend(loc='upper right')

axs[3].plot(k, y_hat, lw=1.0, color='gray', label=r'$\hat{\lambda}_{k}$ (pulse amplitudes estim.)')
#_ , stemlines, _ = axs[3].stem(peaks, x[peaks, 0], markerfmt="bo", basefmt=" ")
#plt.setp(stemlines,'linewidth', 3)
axs[3].axhline(0, color="black",  lw=0.5)
axs[3].legend(loc='upper right')
axs[3].set(xlabel='time index $k$')

Total running time of the script: ( 0 minutes 3.302 seconds)

Gallery generated by Sphinx-Gallery