Source code for genepy3d.objext.simpletracks

"""Methods for working with tracks.
"""

import numpy as np
from numbers import Number
from scipy.interpolate import splev, splrep
from genepy3d.obj import curves
from genepy3d.util import geo

[docs] def merge(tracklst): """Merge tracks. They must be placed in the increasing order of times. Args: tracklst (array of SimpleTrack): a list of SimpleTrack objects. Returns: A merged SimpleTrack object. """ if len(tracklst)==1: return tracklst[0] else: coors, t = [], [] coors.append(tracklst[0].coors) # initialize by the first track t.append(tracklst[0].time) for ix in range(1,len(tracklst)): if (tracklst[ix-1].time[-1] < tracklst[ix].time[0]): # check time order coors.append(tracklst[ix].coors) t.append(tracklst[ix].time) else: raise Exception("Time between tracks {} and {} is not in increasing order".format(ix-1,ix)) coors = np.concatenate(coors,axis=0) t = np.concatenate(t,axis=0) return SimpleTrack(coors,t)
[docs] class SimpleTrack(curves.Curve): """A simple track object. It's simple because it has no division or merge. We can consider it as a Curve object with time attribute. Work in 3D but also support 2D tracks. Please check the documentation of each function to see if it supports 2D. Attributes: coors (array | tuple): array of points. See ``Curve`` for more detail. time (numeric): array of time. Examples: .. code-block:: python import numpy as np from genepy3d.objext.simpletracks import SimpleTrack # Create SimpleTrack from array of points and time coors = np.array([[1,2,3],[1,2,3],[1,2,3]]) t = np.array([1,2,3]) track = SimpleTrack(coors,t) """ def __init__(self, coors, t): super().__init__(coors) self.time = np.array(t)
[docs] def compute_time_gap(self): """Compute the difference between two consecutive times. Work in 3D and 2D. Returns: Array of time gap whose i element is the time gap between (ti, ti-1). Notes: The first element has no time gap and is set as np.nan. """ time_shift = self.time[1:] - self.time[:-1] return np.array([np.nan] + list(time_shift))
[docs] def split(self,max_gap): """Split track if the gap between two consecutive times is larger than max_gap. Work in 3D and 2D. Args: max_gap (float): maximual gap allowed between two consecutive times. Returns: List of subtracks. Examples: .. code-block:: python import numpy as np from genepy3d.objext.simpletracks import SimpleTrack # Create SimpleTrack from array of points and time coors = np.array([[0,0,0],[1,1,1],[2,2,2],[1,1,1],[0,0,0]]) t = np.array([0,1,2,10,11]) track = SimpleTrack(coors,t) subtracks = track.split(max_gap=5) # the track is splitted in two tracks. """ frameshift = self.compute_time_gap() breakidlst = np.argwhere(frameshift>max_gap).flatten() checkidlst = [0] + list(breakidlst) + [self.nb_of_points] subtracks = [] for i in range(len(checkidlst)-1): sub = SimpleTrack(self.coors[checkidlst[i]:checkidlst[i+1]],self.time[checkidlst[i]:checkidlst[i+1]]) sub.dim = self.dim # make sure the subtraj has the same dim with the mother traj subtracks.append(sub) return subtracks
[docs] def convolve_gaussian(self, sigma, mo="nearest", kerlen=4): """Overlay convolve_gaussian() of Curve object. See more details of this function in ``Curve`` class. Work in 3D and 2D. Returns: SimpleTrack object. """ crv_smoothed = super().convolve_gaussian(sigma, mo, kerlen) if self.dim == 2: return SimpleTrack(crv_smoothed.coors[:,[0,1]],self.time) else: return SimpleTrack(crv_smoothed.coors,self.time)
[docs] def denoise(self,sigma_step=.25,max_iter=None,return_sigma=False): """Override the denoise() from Curve. Details see ``Curve.denoise()``. """ res = super().denoise(sigma_step,max_iter,return_sigma) if res is None: raise Exception("Failed to denoise the curve") else: if return_sigma: crv_denoised = res[0] sigmalst = res[1] trk = SimpleTrack(crv_denoised.coors,self.time) trk.dim = self.dim return (trk,sigmalst) else: trk = SimpleTrack(res.coors,self.time) trk.dim = self.dim return trk
[docs] def resample(self, dt, spline_order=1, return_old_indices=False, return_interp_param=False): """The track is resampled by the new time step. Work in 3D and 2D. Args: dt (float|int): resampled time step. spline_order (int): degree of spline interpolation. return_old_indices (bool): if True, return the indices of new sampled time array correspond to the old time array. return_interp_param (bool): if True, return spline parameters. Returns: Resampled SimpleTrack object. Examples: .. code-block:: python import numpy as np from genepy3d.objext.simpletracks import SimpleTrack # Create SimpleTrack from array of points and time coors = np.array([[0,0,0],[1,1,1],[2,2,2],[1,1,1],[0,0,0]]) t = np.array([0,1,2,10,11]) track = SimpleTrack(coors,t) track_new = track.resample(dt=1) # resample with time step = 1 """ # New number of spots n = int((self.time[-1]-self.time[0])/dt) + 1 # New time time_new = np.linspace(self.time[0],self.time[-1],n) # Find from the new time the indices corresponding to the old time if possible if return_old_indices: old_ids = np.ones(len(self.time))*np.nan for ix in range(len(self.time)): ix2 = np.argwhere(self.time[ix]==time_new).flatten() if len(ix2)>0: old_ids[ix] = ix2[0] # Compute spline parameters splx = splrep(self.time,self.coors[:,0],k=spline_order,s=0) sply = splrep(self.time,self.coors[:,1],k=spline_order,s=0) splz = splrep(self.time,self.coors[:,2],k=spline_order,s=0) # Compute new coordinates x_new = splev(time_new,splx) y_new = splev(time_new,sply) z_new = splev(time_new,splz) # New SimpleTrack track_resampled = SimpleTrack(np.array([x_new,y_new,z_new]).T,time_new) track_resampled.dim = self.dim # make sure the resampled one has the same dim property. # Handle returns if (return_old_indices == False) & (return_interp_param == False): return track_resampled else: return_lst = [track_resampled] if return_old_indices: return_lst.append(old_ids) if return_interp_param: return_lst.append((splx,sply,splz)) return return_lst
[docs] def is_moving(self,num_neighbor,displ_tol): """Check if the object is moving at each point. Work in 3D and 2D. Args: num_neighbor (int): number of neighbor frames. The frames are used to check the movement at a frame n are [n-k,n-k-1,...,n,...,n+k-1,n+k], where k is the number of neighbors frames. displ_tol (float): if the displacement between neighbor points is larger than this value, then it's considered as no moving. Returns: Array of boolean with 0 = no moving, 1 = moving. """ imflag = [] frame_ids = np.arange(self.nb_of_points) for ix in frame_ids: ids = np.argwhere((frame_ids>=ix-num_neighbor)&(frame_ids<=ix+num_neighbor)).flatten() x = self.coors[ids,0] y = self.coors[ids,1] z = self.coors[ids,2] # compare min/max versus mean displacement in x, y and z dxmin = np.mean(x) - np.min(x) dxmax = np.max(x) - np.mean(x) dymin = np.mean(y) - np.min(y) dymax = np.max(y) - np.mean(y) dzmin = np.mean(z) - np.min(z) dzmax = np.max(z) - np.mean(z) flag = 0 # is immobile if (dxmin>displ_tol) | (dymin>displ_tol) | (dzmin>displ_tol) | (dxmax>displ_tol) | (dymax>displ_tol) | (dzmax>displ_tol): flag = 1 # is mobile imflag.append(flag) return np.array(imflag)
[docs] def compute_velocity(self): """Velocity at every point of the track. Work in 3D and 2D. Returns: Array of velocities. Notes: The first point has no velocity and is set as np.nan """ velolst = [np.nan] # first frame has no velocity for ix in np.arange(1,self.nb_of_points): delta_t = self.time[ix] - self.time[ix-1] displacement = np.sqrt(np.sum((self.coors[ix] - self.coors[ix-1])**2)) if delta_t == 0: velolst.append(np.nan) else: velolst.append(displacement/delta_t) return np.array(velolst)
[docs] def compute_acceleration(self): """Acceleration at every point of the track. Work in 3D and 2D. Returns: Array of acceleration. Notes: The first two points have no acceleration and are set as np.nan """ velocity = self.compute_velocity() accelst = [np.nan, np.nan] for ix in np.arange(2,self.nb_of_points): delta_t = self.time[ix] - self.time[ix-1] delta_v = velocity[ix] - velocity[ix-1] if delta_t == 0: accelst.append(np.nan) else: accelst.append(delta_v/delta_t) return np.array(accelst)
[docs] def compute_msd(self): """Mean square displacement. Adapt from this post: https://colab.research.google.com/github/kaityo256/zenn-content/blob/main/articles/msd_fft_python/msd_fft_python.ipynb#scrollTo=z7KpcUd8ddZ2 Work in 3D and 2D. Returns: Array of MSD. """ msd = [np.nan] # no msd for the first time r = geo.norm(self.coors) for s in np.arange(1,self.nb_of_points): delta = r[s:] - r[:-s] msd.append(np.average(delta**2)) return np.array(msd)
[docs] def filter_by_time_interval(self,duration,t_begin=None): """Filter the track starting from a `t_begin` and with a given `duration`. If `t_begin` is None, then the first time point is taken. Otherwise, the time point corresponding to the lower bound of the `t_begin` is chosen. When computing the time point corresponding to the duration, it's the time point corresponds to the upper bound of (t_begin + duration). Work in 3D and 2D. Args: duration (numeric, list): duration starting from t_begin. t_begin (numeric): time begin, if None, then take the first time point. Returns: A subtrack. Notes: Duration can be a single value or a list of durations. If the latter one is provided, then return the track with the maximal duration from the list. """ # compute the first time index if t_begin is None: i0 = 0 else: ids = np.argwhere(self.time <= t_begin).flatten() if len(ids) == 0: raise Exception("t_begin is not valid.") i0 = ids[-1] # the lower bound index # check duration input if isinstance(duration,Number): duration_lst = np.array([duration]) elif isinstance(duration,(list,np.ndarray)): duration_lst = np.array(duration) else: raise Exception("duration must be a number of a list/array") # sort duration lst duration_lst = np.sort(duration_lst) # find the last time index i1 = None for item in duration_lst[-1::-1]: # check the longest duration first if t_begin is None: t_end = self.time[0]+item else: t_end = t_begin + item ids = np.argwhere(self.time >= t_end).flatten() if len(ids) == 0: continue else: i1 = ids[0] # the upper bound index break if i1 is None: return None # can not find subtrack satisfying the given duration else: sub = SimpleTrack(self.coors[i0:i1+1],self.time[i0:i1+1]) sub.dim = self.dim return sub
[docs] def filter_by_distance(self,d,t_begin=None): """Filter from a time `t_begin`, a subtrack that is inside a distance `d`. If `t_begin` is None, then the first time point is taken. Otherwise, the time point corresponding to the lower bound of the `t_begin` is chosen. Assuming the track intersects the ball `B` with center given by the position `p` at the time `t_begin` and with radius `d`. The function returns a subtrack starting from `p` and end at the first position intersecting the ball. Work in 3D and 2D. Args: d (numeric): a distance. t_begin (numeric): time begin, if None, then take the first time point. Returns: A subtrack. Notes: The function only returns the portion starting from the time `t_begin`. All the time points within the ball but before `t_begin` are not counted. It must have a least 1 position intersecting the ball. Otherwise it returns None. """ # compute the first time index if t_begin is None: i0 = 0 else: ids = np.argwhere(self.time <= t_begin).flatten() if len(ids) == 0: raise Exception("t_begin is not valid.") i0 = ids[-1] # the lower bound index # compute distances i_target = np.arange(i0,self.nb_of_points) # print(i_target) p_target = self.coors[i_target] i_src = np.ones(len(i_target),dtype=int) * i0 p_src = self.coors[i_src] dlst = np.sum((p_target - p_src)**2,axis=1)**0.5 ilst = np.argwhere(dlst >= d).flatten() if len(ilst)==0: return None else: i1 = i_target[ilst[0]] # the upper bound index # print(i1) sub = SimpleTrack(self.coors[i0:i1+1],self.time[i0:i1+1]) sub.dim = self.dim return sub
[docs] def filter_by_length(self,l,t_begin=None,tolerance=0.5): """Filter the track starting from a `t_begin` having a given length `l`. If `t_begin` is None, then the first time point is taken. Otherwise, the time point corresponding to the lower bound of the `t_begin` is chosen. Work in 3D and 2D. Args: l (numeric): a length. t_begin (numeric): time begin, if None, then take the first time point. tolerance (numeric): allow to accept the result if it is within [l-tolerance,l+tolerance]. This make faster search given provided error. Returns: A subtrack. """ # compute the first time index if t_begin is None: i0 = 0 else: ids = np.argwhere(self.time <= t_begin).flatten() if len(ids) == 0: raise Exception("t_begin is not valid.") i0 = ids[-1] # the lower bound index actual_l = curves.Curve(self.coors[i0:]).compute_length() if (actual_l < (l - tolerance)): return None # actual length is too small to reach the queried length `l` # if (actual_l + tolerance) < l: # return None # queried lenght `l` is greater than actual length # build distances table idlst = np.arange(i0,self.nb_of_points) dlst = np.sum((self.coors[idlst[1:]] - self.coors[idlst[:-1]])**2,axis=1)**0.5 # query length by binary search stop = False i_lb = 0 i_ub = len(idlst) - 1 i1 = None while(not stop): # print(i_lb,i_ub) i_query = int((i_lb + i_ub)/2) l_query = np.sum(dlst[:i_query]) if (l_query >= (l - tolerance)) & (l_query <= (l + tolerance)): i1 = idlst[i_query] stop = True elif (l_query > (l + tolerance)): i_ub = i_query else: i_lb = i_query if (i_lb >= (i_ub-1)): i1 = None stop = True if i1 is None: return None else: sub = SimpleTrack(self.coors[i0:i1+1],self.time[i0:i1+1]) sub.dim = self.dim return sub