import numpy as np
import matplotlib.pyplot as plt
import scipy as sc
import scipy.special
import statistics as st
from math import lgamma
import numba as nb
from numba import jit
Transition Detection Using BITS
We have previously discussed using shapes for step/jump/transition detection: Step detection
This notebook, however, will serve as an addendum to that discussion, with more insight into Bayesian Inference-based Template Search (BITS) vs. general shape processing. We will be trying to find 2 transitions in datasets of variable length and will use both BITS and general shape comparison.
Let’s start by generating some fake data. We will use the same setup as the previous notebook, and provide a sample dataset below.
#generate data
@nb.njit
def data_generator(num_points, num_jumps, jump_size, noise_size, drift_size, blur, seed):
444 + seed)
np.random.seed(
# parameters of fake data
= num_points
nt = num_jumps
nj
# generating noise & drift
= []
tjumps while len(tjumps) < nj:
= int(np.random.exponential(scale=nt/2))
time if time < nt-50 and time > 50:
if len(tjumps) > 0:
if np.min(np.abs(np.array(tjumps) - time)) > jump_size + blur:
tjumps.append(time)else:
tjumps.append(time)
tjumps.sort()
= np.zeros(nj)
zsteps for i in range(nj):
= jump_size
zsteps[i] = np.ones(nt)
noise = np.ones(nt)
drift_arr for i in range(nt):
= np.random.normal()*noise_size
noise[i] = np.random.normal()*drift_size
drift_arr[i] = drift_arr.cumsum()
drift
# generating "true" path
= np.zeros(nt)
z
for i in range(nj):
+= zsteps[i]
z[tjumps[i]:]
= z + drift + noise
data
# calculating snr
= []
snr for i in range(len(zsteps)):
if i == 0:
/np.std((noise+drift)[:tjumps[i+1]]))
snr.append(zsteps[i]elif i == len(zsteps)-1:
-1]/np.std((noise+drift)[tjumps[-2]:]))
snr.append(zsteps[else:
/np.std((noise+drift)[tjumps[i]:tjumps[i+1]])))
snr.append((zsteps[i]'''
#print('SNR = ' + str(st.mean(snr)))
#plotting
fig,ax = plt.subplots(1,1,figsize=(12,4))
ax.plot(data,'k',lw=.5);
#ax[1].plot(data,'o',color='k',markersize=1)
#ax[1].plot(z+drift,lw=2,color='r')
#for aa in ax:
# for i in range(nj):
# aa.axvline(x=tjumps[i],color='r',lw=1,zorder=-1,alpha=.5)
for i in range(nj):
ax.axvline(x=tjumps[i],color='r',lw=1,zorder=-1,alpha=.5)
ax.set_title('Actual Transitions')
#ax[1].set_title('Simulated Path')
plt.show()
'''
return data, tjumps, np.mean(np.array(snr))
= data_generator(num_points=1000, num_jumps=2, jump_size=80, noise_size=20,
data, tjumps, snr =1, blur = 0, seed = 45)
drift_sizeprint('SNR = %.3f'%snr)
#plotting
= plt.subplots(1,1,figsize=(5,4))
fig,ax 'k',lw=.5);
ax.plot(data,for i in range(len(tjumps)):
=tjumps[i],color='r',lw=1,zorder=-1,alpha=.5)
ax.axvline(x#ax.set_title('Actual Transitions')
0,1000)
plt.xlim('/home/anjali/Documents/BITS_figures/Representative_trace_2.pdf')
plt.savefig( plt.show()
SNR = 3.559
We now create an evidence function corresponding to equation 2.2.1 in the SI of the Bayesian shape calculation paper. This function takes in data, a template, and a variable prior for m (scaling nuisance parameter) and returns the evidence that the input data has the same shape as the template.
@nb.jit(nb.float64(nb.float64[:],nb.float64,nb.float64[:]),nopython=True)
def evidence_template(data, prior_m, template): # corresponds to Eq. 2.2.1 in SI for shape calculations
# constant evidence statistics
= float(data.size)
N = (N-2.)/2.
m
= np.mean(data)
ey = np.mean(data*data)
eyy = eyy - ey*ey + 1e-300 ## helps with over/underflow
vy
# priors for m (scale), b (offset), tau (noise)
= 0
log_l = np.log(prior_m)
log_del_m = np.log(1e5)
log_del_b = np.log(1e5)
log_del_tau -= (log_del_m + log_del_b + log_del_tau)
log_l
# template-dependent evidence statistics
= np.mean(template)
ex = np.mean(template*template)
exx = np.mean(template*data)
exy = exx - ex*ex + 1e-300 ## helps with over/underflow
vx = exy - ex*ey + 1e-300 ## helps with over/underflow
vxy = vxy/np.sqrt(vx*vy)
r= r*r
r2
# evidence integral
+= -m*np.log(np.pi)
log_l += -N/2*np.log(N)
log_l += lgamma(m)
log_l += -0.5*np.log(vx)
log_l += -m*np.log(vy)
log_l += -m*np.log(1.-r2)
log_l += -np.log(2)
log_l #log_l += np.log(1.+(r/np.abs(r))*sc.special.betainc(0.5,m,r2))
return log_l
We also create an evidence function corresponding to equation 2.2.4 in the SI of the Bayesian shape calculation paper. This function takes in data and returns the evidence that the input data can be described by a “flat” template. Conceptually, this “flat” template can be thought of as a null case and alternative hypothesis; it represents the absence of shape, or latent structure, in the data.
In the Bayesian shape calculation paper, we describe the importance of this null template as it allows us to move beyond “Which template in out set is optimal?” and instead ask the question: “Does the data look more like any of our templates than like uncorrelated noise?” By having this alternative hypothesis, we are not forced to overfit and choose the “best” answer from our set of templates. Instead, if no particular template has an evidence which dominates over the null template, we conclude that the data does not look like any of our templates.
@nb.jit(nb.float64(nb.float64[:]),nopython=True)
def evidence_flat(data): # corresponds to Eq. 2.2.4 in SI for shape calculations
= float(data.size)
N = (N-2.)/2.
m
= np.mean(data)
ey = np.mean(data*data)
eyy = eyy - ey*ey + 1e-300 ## helps with over/underflow
vy
# evidence integral
= -(m + .5)*np.log(np.pi)
log_l += -(N/2)*np.log(N)
log_l += lgamma(m +.5)
log_l += -(m + 0.5)*np.log(vy)
log_l = np.log(1e5)
log_del_b = np.log(1e5)
log_del_tau -= (log_del_b + log_del_tau)
log_l
return log_l
In the previous notebook, the goal was to find two arbitrarily located signal jumps in a dataset of 600 data points. To do so, an exhaustive set of 360,000 templates was created, wherein each template was the full length of the dataset, with all possible combinations of the 2 jump locations represented. This sort of general shape analysis is admittedly not very scalable. Even for this small example of 2 jumps, the set of templates required grows as \(\mathcal{O}(N^2)\) where \(N\) is the number of points in the dataset.
More generally, the set of templates required scales both with the number of data points, \(N\) and the number of features to find, \(R\), as \(\mathcal{O}(N^{R})\). The number of computations required, however, grows as \(\mathcal{O}(N^{R+1})\). We pick up an extra +1 in the exponent from generating and comparing all \(N^R\) templates to \(N\) data points.
Below we have the code for generating our set of full data templates and calculating their associated evidences. The bayes factor of the template evidences is compared to the bayes factor for the null evidence in a process known as Bayesian Model Selection (BMS). Once the templates which maximize this comparison are identified, we use them to return as the most probable locations for transitions.
@nb.jit(nb.float64[:,:](nb.float64[:],nb.float64),nopython=True)
def full_shape_ev(data, prior_m):
= len(data)
nt = prior_m
m = np.zeros((nt,nt)) + np.nan
lnevs = np.zeros_like(data)
temp
for i in range(nt): ## sweep jump 1
for j in range(nt): ## sweep jump 2
## make the template
*= 0
temp = 1
temp[i:] = 2
temp[j:]
## calculate evidence
= evidence_template(data, m, temp)
lnevs[i,j]
return lnevs
@nb.jit(nb.int64[:](nb.float64[:],nb.float64),nopython=True)
def full_shape(data, prior_m):
## calculate evidence
= full_shape_ev(data, 10**8)
ln_evs if (np.isfinite(ln_evs)).any() == False:
print('NaN Alert')
return np.array([0])
else:
= ln_evs.flatten()
_l -= np.max(_l)
_l
= -np.log(_l.size) ## equal 1/N priors
ln_prior = np.exp(_l+ln_prior)
joint
## use BMS to find transitions
= joint/joint.sum()
bms_probs = np.empty_like(ln_evs)
bms_array for i in range(0, _l.size, ln_evs.shape[0]):
= int((i+ln_evs.shape[0]) / ln_evs.shape[0] - 1)
ind = bms_probs[i:i+ln_evs.shape[0]]
bms_array[ind,:]
= []
transitions for i in np.nonzero(bms_array == bms_array.max()):
0])
transitions.append(i[
return np.array(transitions)
Here, instead of creating thousands of templates that span the entire dataset, we exploit the localized nature the features we are looking for. We can generate a template that is just large enough to capture the behavior we are searching for, and then scan it across our dataset. We perform the usual BMS calculation on a slice of our data corresponding to the template location, before translating over one datapoint and repeating the process. In effect, this is equivalent to executing a linear search of our localized template through the full dataset (with the caveat that points on the edge will be missed).
This approach greatly allievates the scaling headache faced by the more general shape calculation approach described above. Here, we only need one template regardless of \(N\), the size of our dataset. Additionally, in this example, there are two transitions which have the same shape. Since we can move our template, we don’t have to treat them like distinct features and we can still use just one template, regardless of the number of transitions we have. The number of calculations required for BITS grows as \(\mathcal{O}(CN)\) where \(C\) is the number of \(\textbf{distinct}\) features we are searching for.
@nb.jit(nb.int64[:](nb.float64[:],nb.float64,nb.int64,nb.int64),nopython=True)
def BITS(data, prior_m, tail_size, blur_size=0):
# jump template
"""
tmp_jump = np.ones(tmp_size,dtype=nb.float64)
for i in range(tmp_size):
if i >= tmp_size / 2.:
tmp_jump[i] = 1.
else:
tmp_jump[i] = 0.
"""
# blurred template
= tail_size // 2
tt = np.zeros(blur_size + 2 * tt,dtype=nb.float64)
temp = blur_size + 2 * tt
tmp_size = np.arange(tt, tt+blur_size, 1)
ind = np.linspace(0., 1., len(ind))
temp[ind] -tt:] = 1
temp[
# calculate evidences
= np.empty(data.shape[0] - tmp_size)
ev_jump = np.empty(data.shape[0] - tmp_size)
ev_flat
for i in range(0, data.shape[0] - tmp_size):
= data[i:i+tmp_size]
sub #print(type(sub[0]))
= evidence_template(sub, prior_m, temp)
ev_jump[i] = evidence_flat(sub)
ev_flat[i]
# use BMS to calculate posteriors
#ev_line = np.ones_like(ev_step) * -np.inf
= ev_flat - ev_jump
bayes_jump = ev_jump - ev_flat
bayes_flat
= 1. / (1. + np.exp(bayes_jump))
prob_jump = 1. / (1. + np.exp(bayes_flat))
prob_flat
# taking out repeats
= np.where(prob_jump>=0.85)[0]
jump
if len(jump) == 0:
return jump
elif len(jump) == 1:
= jump
transitions_jump else:
if np.min(np.diff(jump))<=tmp_size:
= np.split(jump, np.argwhere(np.diff(jump)>=tmp_size).flatten()+1)
jump_split = 0
ind = np.ones(len(jump_split),dtype=nb.int64)
transitions_jump for i in jump_split:
= i[np.argmax(prob_jump[i])]
transitions_jump[ind] += 1
ind
return transitions_jump + round(tmp_size / 2)
Let’s take a look at how both of these methods scale.
import datetime
= []
BITS_times = []
full_times
= [300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000]
dp for i in dp:
print('%d Datapoints complete'%(i))
## generate data
= data_generator(num_points=i, num_jumps=2, jump_size=80, noise_size=20,
data, tjumps, snr =1, blur = 0, seed = 76)
drift_size
= np.zeros(10)
timers_bits = np.zeros(10)
timers_full for i in range(10):
#print('iteration %d'%(i))
## time BITS
= datetime.datetime.now()
start = BITS(data, 10**8, 55, blur_size=0)
tran_BITS = datetime.datetime.now()
finish if (tjumps == tran_BITS).all():
= (finish-start).total_seconds()
timers_bits[i] else:
print('failed to find all transitions - BITS')
## time full shape analysis
= datetime.datetime.now()
start = full_shape(data, 10**8)
tran_full = datetime.datetime.now()
finish if (tjumps == tran_full).all():
= (finish-start).total_seconds()
timers_full[i] else:
print('failed to find all transitions - full')
full_times.append(np.median(timers_full)) BITS_times.append(np.median(timers_bits))
300 Datapoints complete
600 Datapoints complete
900 Datapoints complete
1200 Datapoints complete
1500 Datapoints complete
1800 Datapoints complete
2100 Datapoints complete
2400 Datapoints complete
2700 Datapoints complete
3000 Datapoints complete
=(5,4))
plt.figure(figsize'o', label='BITS')
plt.plot(dp, BITS_times, 'o', label='Full Dataset Template')
plt.plot(dp, full_times, =0)
plt.legend(loc#plt.title('Linear Axes')
'N')
plt.xlabel(200,3100)
plt.xlim('Run Time (s)')
plt.ylabel('/home/anjali/Documents/BITS_figures/2Features_linear.pdf')
plt.savefig( plt.show()
We note some exponential behavior for the full dataset template curve, while the BITS times grow linearly. Let’s investigate further with a loglog plot.
from scipy.optimize import curve_fit
## log plotting
=(5,4))
plt.figure(figsize'o', label='BITS')
plt.plot(np.log(dp), np.log(BITS_times), 'o', label='Full Dataset Template')
plt.plot(np.log(dp), np.log(full_times), =0)
plt.legend(loc
## curve-fitting
def linear(x, m, b):
return m*x + b
= scipy.optimize.curve_fit(linear, np.log(dp), np.log(BITS_times))
popt, pcov 0], popt[1]), 'k--')
plt.plot(np.log(dp), linear(np.log(dp), popt[print('BITS fit: y = %.3f*m + %.3f'%(np.round(popt[0],3), np.round(popt[1],3)))
= scipy.optimize.curve_fit(linear, np.log(dp), np.log(full_times))
popt, pcov 0], popt[1]), 'k--')
plt.plot(np.log(dp), linear(np.log(dp), popt[print('Full Dataset Template fit: y = %.3f*m + %.3f'%(np.round(popt[0],3), np.round(popt[1],3)))
## finishing up plot
#plt.title('Log-Log Plot');
'Log(N)')
plt.xlabel('Log(Run Time) (s)')
plt.ylabel('/home/anjali/Documents/BITS_figures/2Features_loglog.pdf') plt.savefig(
BITS fit: y = 1.009*m + -14.301
Full Dataset Template fit: y = 2.908*m + -18.446
After fitting, we see that our Big-\(\mathcal{O}\) predictions are empirically confirmed. The BITS slope is \(\approx 1\) which matches \(\mathcal{O}(CN) = \mathcal{O}(N)\), while the full dataset slope is \(\approx 3\) which matches \(\mathcal{O}(N^{R+1}) = \mathcal{O}(N^{3})\).