import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
import re
import numpy as np
import pandas as pd
from scipy import interpolate
from scipy.ndimage.filters import gaussian_filter1d
from scipy.ndimage.measurements import label
from scipy.signal import argrelextrema
from PyAstronomy import pyasl
from sklearn import metrics
from genepy3d.util.geo import active_brownian_2d, active_brownian_3d, geo_len, norm, angle3points, vector2points, vector_axes_angles, angle2vectors, l2 as distl2
from genepy3d.util import plot as pl
from matplotlib.colors import ListedColormap
[docs]class Curve:
"""Curve in 3D. Also work with 2D curve.
Check the documentation of each function to see if it supports 2D.
We assumme that curve is a parametric function g(t) = (x(t),y(t),z(t)).
If 2D points are passed then we consider as X and Y coordinates and set Z = 0.
Attributes:
coors (array | tuple): list of 3d points.
- if ``array`` is given, then each column is the x, y, z coordinates of a specific point.
- if ``tuple`` is given, then the items are the three arrays of x, y and z coordinates.
curve_id (int): curve ID (optional).
curve_name (str): curve name (optional).
"""
def __init__(self, coors, curve_id=None, curve_name=None):
if isinstance(coors,np.ndarray):
self.coors = coors
elif isinstance(coors,(tuple,list)):
self.coors = np.array(coors).T
self.dim = 3 # default is 3D
# check 2D input
if self.coors.shape[1]==2:
z = np.zeros(self.coors.shape[0])
newcoors = np.array([self.coors[:,0],self.coors[:,1],z]).T
self.coors = newcoors
self.dim = 2 # mark this curve is 2D
self.nb_of_points = self.coors.shape[0]
if curve_id is None:
self.curve_id = 0 # default value
else:
self.curve_id = curve_id
if curve_name is None:
self.curve_name = 'GeNePy3D'
else:
self.curve_name = curve_name
[docs] @classmethod
def from_csv(cls,filepath,column_names=["x","y","z"],scales=(1.,1.,1.),args={}):
"""Constructor from csv file.
Args:
filepath (str): csv file.
column_names (list of str): coordinates columns in csv file.
scales (tuple of float): scales in x, y and z.
args (dict): overried parameters of pandas.read_csv().
Returns:
Points.
"""
# read file
df = pd.read_csv(filepath,**args)
# make lower case
refined_column_names = [name.lower() for name in column_names]
# extract all column names, remove irregular characters (e.g. space)
# treat upper and lower cases as the same
labels = df.columns.values
refined_labels = []
for lbl in labels:
refined_labels.append(lbl.split()[0].lower())
df.columns = refined_labels
if all(lbl in refined_labels for lbl in refined_column_names):
coors = df[refined_column_names].values / np.array(scales)
return cls(coors)
else:
raise Exception("can not find column names in file.")
[docs] @classmethod
def from_text(cls,filepath):
"""Read from text-liked file (.txt, .xyz).
Note:
Assuming the first three columns correspond to x, y, z coordinates.
"""
try:
data = []
f = open(filepath,'r')
for line in f:
tmp = []
elements = re.split(r'\r|\n| |\s', line)
for ile in elements:
if ile!='':
tmp.append(float(ile))
data.append(tmp)
f.close()
data = np.array(data)
return cls(data[:,:3])
except:
raise Exception("failed when importing from text file")
[docs] def compute_derivative(self,deg,dt=1):
"""Derivative using np.gradient.
Support 2D and 3D. If in 2D, then return 0 for the derivative of Z.
Args:
deg (int): derivative degree.
dt (float): delta t.
Returns:
array of float where each row is the derivative in x, y and z at a specific point.
"""
dx, dy, dz = self.coors[:,0].copy(), self.coors[:,1].copy(), self.coors[:,2].copy()
for i in range(deg):
dx = np.gradient(dx,dt,edge_order=1)
dy = np.gradient(dy,dt,edge_order=1)
dz = np.gradient(dz,dt,edge_order=1)
return np.array([dx, dy, dz]).T
# def get_norm(self):
# """Norms of points on curve.
# NOTE not useful, will remove.
# Returns:
# array of float.
# """
# if self.norm is None:
# self.norm = norm(self.coors)
# return self.norm
[docs] def compute_length(self):
"""Length of curve.
Support 2D and 3D.
Returns:
float.
"""
return geo_len(self.coors)
[docs] def compute_orientation(self,u=None):
"""Orientation vectors and angles w.r.t. a vector u for every point on the curve.
Args:
u (array (float)): vector used to compute angles. If None then return angles with three x, y and z axes.
"""
vecs = []
angles = []
for i in range(self.coors.shape[0]):
if i==0:
a = self.coors[i]
b = self.coors[i+1]
elif i==(self.coors.shape[0]-1):
a = self.coors[i-1]
b = self.coors[i]
else:
a = self.coors[i-1]
b = self.coors[i+1]
vec = vector2points(a, b)
vecs.append(vec)
if u is None:
# angles w.r.t. three axes x, y and z
angles.append(vector_axes_angles(vec))
else:
angles.append(angle2vectors(vec,u))
return (np.array(vecs),np.array(angles))
[docs] def compute_curvature(self):
"""Curvatures of curve.
Support 2D and 3D.
Returns:
array of float.
"""
d1 = self.compute_derivative(1)
d2 = self.compute_derivative(2)
d1norm = norm(d1)
cp = np.cross(d1,d2)
cpnorm = norm(cp)
res = np.zeros(len(cpnorm))
idx = np.argwhere(d1norm!=0).flatten()
if len(idx)!=0:
res[idx] = cpnorm[idx]/(d1norm[idx]**3)
return res
[docs] def compute_torsion(self):
"""Torsions of curve.
Only in 3D.
Returns:
array of float.
"""
d1 = self.compute_derivative(1)
d2 = self.compute_derivative(2)
d3 = self.compute_derivative(3)
cp = np.cross(d1,d2)
cpnorm = norm(cp)
res = np.zeros(len(cpnorm))
idx = np.argwhere(cpnorm!=0).flatten()
if len(idx)!=0:
res[idx] = np.sum(cp[idx]*d3[idx],axis=1)/(cpnorm[idx]**2)
return res
[docs] def compute_wiggliness(self):
"""Wiggliness of curve.
Support 2D and 3D.
Wiggliness = Length / Distance
Returns:
array of float.
"""
dist = np.sqrt(np.sum((self.coors[0]-self.coors[-1])**2))
if dist == 0:
wiggliness = 0.
else:
wiggliness = self.compute_length()*1./dist
return wiggliness
[docs] def compute_tortuosity(self):
"""Tortuosity of curve.
Same as compute_wiggliness()
Tortuosity = Length / Distance
Returns:
array of float.
"""
return self.compute_wiggliness()
# def compute_curviness(self):
# """Curviness of curve.
# Curviness is computed from local maxima of curvatures.
# Returns:
# array of float.
# """
# kappa = self.compute_curvature()
# extid = argrelextrema(kappa,np.greater)[0] # get indices of local maxima of curvatures
# k = len(extid)
# if k==0:
# curviness = 0
# else:
# curviness = (1./(1.+abs(k)))*np.sum(kappa[extid])
# return curviness
def _extract_plane(self,tor_thr,cur_thr,ext_thr):
"""Compute indices where the curve is locally planar or linear based on torsions and curvatures.
Used in ``scale_space()``.
Args:
tor_thr (float): torsion threshold.
cur_thr (float): curvature threshold.
ext_thr (float): threshold for penalize extreme values of torsions (this is not useful).
Returns:
array of int where 1 indicate *planar or linear* point, and 0 otherwise.
"""
tor = self.compute_torsion()
cur = self.compute_curvature()
T = np.zeros(len(tor),dtype=np.uint)
idx1 = np.argwhere((np.abs(tor) <= tor_thr)|(np.abs(tor) >= ext_thr)).flatten()
# idx1 = np.argwhere((np.abs(tor) <= tor_thr)).flatten()
idx2 = np.argwhere(cur <= cur_thr).flatten()
idx = np.union1d(idx1,idx2)
if len(idx)!=0:
T[idx] = 1 # plane flag
return T
[docs] def convolve_gaussian(self,sigma,mo="nearest",kerlen=4.0):
"""Get curve at a specific scale *sigma* by Gaussian convolution.
Support 2D and 3D.
Args:
sigma (float or list of floats): scale.
mo (str): mode used to treat the border effect (see gaussian_filter1d() for detail).
kerlen (float): specify the length of Gaussian kernel (see gaussian_filter1d() for detail).
Returns:
Curve.
"""
if np.issubdtype(type(sigma),np.integer) | np.issubdtype(type(sigma),np.floating):
sigx, sigy, sigz = sigma, sigma, sigma
else:
try:
sigx, sigy, sigz = sigma[0], sigma[1], sigma[2]
except:
raise Exception('sigma must be a single number of a list')
# if sigma == 0:
# return self
x, y, z = self.coors[:,0], self.coors[:,1], self.coors[:,2]
if sigx == 0:
xs = x
else:
xs = gaussian_filter1d(x,sigx,mode=mo,truncate=kerlen)
if sigy == 0:
ys = y
else:
ys = gaussian_filter1d(y,sigy,mode=mo,truncate=kerlen)
if sigz == 0:
zs = z
else:
zs = gaussian_filter1d(z,sigz,mode=mo,truncate=kerlen)
return Curve(np.array([xs,ys,zs]).T)
[docs] def resample(self,npoints=None,unit_length=None,spline_order=1,return_interp_param=False):
"""Resample the curve using spline interpolation.
Support 2D and 3D.
Args:
npoints (int): number of resampled points.
unit_length (float): if not None, then ``npoints`` is calculated from it.
spline_order (uint): 1 means linear interpolation.
return_interp_param (bool): if True then return interpolation parameters.
Returns:
Curve() resampled curve.
"""
if unit_length is None:
if npoints is None:
n = int(self.coors.shape[0]*2)
else:
n = npoints
else:
n = int(np.round(self.compute_length()/unit_length))
# ignore when the nb. points or resampled one are smaller than spline order
if ((n <= spline_order) | (self.coors.shape[0] <= spline_order)):
new_coors = self.coors
else:
# try to remove duplicates before interpolation
_, uix = np.unique(self.coors,axis=0,return_index=True)
uix = np.sort(uix)
unique_coors = self.coors[uix]
# interpolation
if self.dim == 3:
try:
coef, to = interpolate.splprep([unique_coors[:,0],unique_coors[:,1],unique_coors[:,2]],k=spline_order,s=0)
except:
raise Exception("Given coordinates can not be interpolated by spline.")
ti = np.linspace(0,1,n)
xi, yi, zi = interpolate.splev(ti,coef)
new_coors = np.array([xi,yi,zi]).T
else: # 2D
try:
coef, to = interpolate.splprep([unique_coors[:,0],unique_coors[:,1]],k=spline_order,s=0)
except:
raise Exception("Given coordinates can not be interpolated by spline.")
ti = np.linspace(0,1,n)
xi, yi = interpolate.splev(ti,coef)
zi = [0. for _ in xi]
new_coors = np.array([xi,yi,zi]).T
if return_interp_param==False:
return Curve(new_coors,self.curve_id,self.curve_name)
else:
try:
return (Curve(new_coors,self.curve_id,self.curve_name),to,coef,ti,uix)
except:
return Curve(new_coors,self.curve_id,self.curve_name)
[docs] def denoise(self,return_std=False):
"""Automatically denoise a curve assuming Gaussian noise. Noise variance acrooss the three dimensions is estimated using the beta-sigma method implemented in PyAstronomy [1].
[1] https://www.aanda.org/articles/aa/pdf/2018/01/aa30618-17.pdf
args:
return_std (bool): return the estimated sigma as well (default: False)
return:
denoised curve (if return_std is False)
(denoised curve, estimated sigma) (if return_std is True)
"""
with warnings.catch_warnings():
warnings.filterwarnings('error')
# estimate denoised scale
try:
x, y, z = self.coors[:,0], self.coors[:,1], self.coors[:,2]
max_iter = self.coors.shape[0]//2
beq = pyasl.BSEqSamp()
N, j = 1, 1
xsig, _ = beq.betaSigma(x, N, j, returnMAD=True)
ysig, _ = beq.betaSigma(y, N, j, returnMAD=True)
zsig, _ = beq.betaSigma(z, N, j, returnMAD=True)
estsig = np.round(np.max([xsig,ysig,zsig]),2)
# print(estsig)
s = 0.
step = 1.0
ite = 0
xflag, yflag, zflag = True, True, True
while((xflag | yflag | zflag) & (ite < max_iter)):
crvs = self.convolve_gaussian(s)
xs, ys, zs = crvs.coors[:,0], crvs.coors[:,1], crvs.coors[:,2]
xsig = np.std(xs-x)
ysig = np.std(ys-y)
zsig = np.std(zs-z)
if xsig >= estsig:
xflag = False
if ysig >= estsig:
yflag = False
if zsig >= estsig:
zflag = False
ite = ite + 1
s = s + step
est_noise_scale = s
# print(est_noise_scale)
except:
est_noise_scale = 0
# denoise curve
if est_noise_scale != 0:
crvd = self.convolve_gaussian(est_noise_scale)
# print("ok")
else:
crvd = self
if return_std == False:
return crvd
else:
return (crvd,estsig)
[docs] def scale_space(self,scale_range,features={'curvature','torsion','ridge','valley','planeline','line'},eps_kappa=0.01,eps_tau=0.01,eps_seg=10,mo="nearest",kerlen=4.0):
"""Compute features of a curve across scales, i.e. a scale space of gaussian convolution.
Can compute various features at one time.
Args:
scale_range (array of float): range of scales (sigma of gaussian convolutions).
features (dic): list of features (detail see below).
eps_kappa (float): curvature threshold. (for planeline feature)
eps_tau (float): torsion threshold. (for line and planeline feature)
eps_seg (float): segment length threshold (in number of points).
Returns:
dictionary whose each item is a scale space matrix of a specific feature,
where rows are the scale range and columns are the curve indices.
Note:
we support following features
- curvature: curvature scale space.
- torsion: torsion scale space.
- ridge: local maxima of curvature
- valley: local minima of curvature
- planeline: plane+line scale space.
- line: line scale space.
"""
x, y, z = self.coors[:,0], self.coors[:,1], self.coors[:,2]
xs, ys, zs = x.copy(), y.copy(), z.copy()
npoints = self.coors.shape[0]
# initialize data
data = {}
for f in features:
data[f] = np.zeros((len(scale_range),npoints),dtype=np.float)
for i in range(len(scale_range)):
# scaled curve
if scale_range[i]!=0:
# smooth curve with gaussian func
xs = gaussian_filter1d(x,scale_range[i],mode=mo,truncate=kerlen)
ys = gaussian_filter1d(y,scale_range[i],mode=mo,truncate=kerlen)
zs = gaussian_filter1d(z,scale_range[i],mode=mo,truncate=kerlen)
scaled_curve = Curve(coors=(xs,ys,zs))
cur = scaled_curve.compute_curvature()
tor = scaled_curve.compute_torsion()
# curvature
if 'curvature' in features:
data['curvature'][i,:] = cur
# torsion
if 'torsion' in features:
data['torsion'][i,:] = tor
# ridge
if 'ridge' in features:
extid = argrelextrema(cur,np.greater)[0]
data['ridge'][i,extid] = 1
# valley
if 'valley' in features:
extid = argrelextrema(cur,np.less)[0]
data['valley'][i,extid] = 1
# estimate local planes (include also lines)
if 'planeline' in features:
ext_thr = 1e6 # peak threshold (not useful, so make very large value) => see extract_plane()
if((i==0)&(len(np.argwhere(self._extract_plane(eps_tau,eps_kappa,ext_thr)==1).flatten())==npoints)):
data['planeline'][i,:] = 1
elif((i!=0)&(len(np.argwhere(data['planeline'][i-1,:]==1).flatten())==npoints)):
data['planeline'][i,:] = 1
else:
data['planeline'][i,:] = scaled_curve._extract_plane(eps_tau,eps_kappa,ext_thr)
connected_compo, nb_compo = label(data['planeline'][i,:])
if nb_compo!=0:
for j in range(1,nb_compo+1):
connected_idx = np.argwhere(connected_compo==j).flatten()
if ((len(connected_idx)<npoints)&(len(connected_idx)<=eps_seg)):
data['planeline'][i,connected_idx] = 0 # remove plane artifacts based on segment length threshold
# estimate local lines
if 'line' in features:
idx = np.argwhere(cur <= eps_kappa).flatten()
if((i==0)&(len(np.argwhere(self.compute_curvature() <= eps_kappa).flatten())==npoints)):
data['line'][i,:] = 1
elif((i!=0)&(len(np.argwhere(data["line"][i-1,:]==1).flatten())==npoints)):
data["line"][i,:] = 1
elif len(idx)!=0:
data['line'][i,idx] = 1
connected_compo, nb_compo = label(data['line'][i,:])
if nb_compo!=0:
for j in range(1,nb_compo+1):
connected_idx = np.argwhere(connected_compo==j).flatten()
if ((len(connected_idx)<npoints)&(len(connected_idx)<=eps_seg)):
data['line'][i,connected_idx] = 0 # remove plane artifacts
return data
def _find_best_segments(self,I):
"""Estimate the best combination of planes or lines from indicator I.
This is the support function for ``decompose_intrinsicdim()``.
Args:
I (array of float): scale space (planeline or line).
This can be obtained from ``scale_space()``.
Returns:
dictionary containing estimated segments detail.
"""
nb_seg = [] # nb of segments at each scale
seg_info = [] # begin and end indices of each segment at each scale
seg_len = [] # segment length at each scale
nb_seg_groups = [] # list of various combinations of segments within the scale range
duration = [] # number of scale each combination of segment appear within the scale range
nscale, npoint = I.shape
it = 0 # iterator for nb_seg_groups
# check segments duration
for il in range(nscale):
# get connected components at this scale
connected_compo, nb_compo = label(I[il])
nb_seg.append(nb_compo)
# save segment information
if nb_compo==0:
seg_info.append([])
seg_len.append([])
else:
tmp, tmp2 = [], []
for icompo in range(1,nb_compo+1):
connected_idx = np.argwhere(connected_compo==icompo).flatten()
tmp.append([connected_idx[0],connected_idx[-1]]) # save component indice interval
tmp2.append(len(connected_idx)) # save component length
seg_info.append(tmp)
seg_len.append(tmp2)
# calculate its duration
if len(nb_seg_groups)==0: # this first time
nb_seg_groups.append(nb_compo)
duration.append(1)
elif nb_seg_groups[it]==nb_compo: # update step: compare states between the current scale with the previous scale (it)
duration[it] = duration[it]+1
else: # if different, reset to new combination
it = it + 1
nb_seg_groups.append(nb_compo)
duration.append(1)
# select the interval that show the longest seg life
nb_seg, nb_seg_groups, duration = np.array(nb_seg),np.array(nb_seg_groups),np.array(duration)
tmp_duration = duration.copy()
idx = np.argwhere(nb_seg_groups==0).flatten() # check seg groups that have nothing
if len(idx)!=0:
tmp_duration[idx] = -1 # penalize 0 segments
best_group = np.argmax(tmp_duration) # the best combination is the longest life
# compute max, min and avg scales within the best combination
max_level = int(np.sum(duration[:best_group+1]) - 1)
min_level = int(np.sum(duration[:best_group]))
avg_level = int((max_level + min_level)/2)
# estimate best configuration of segments from the best combination
# NOTE: the nb. of segments is similar across scales within the best combination but with various lengths.
# Thus, select the subinterval of seg_len within the maximal range
subseg = np.array(seg_len[min_level:max_level+1])
max_seg_ids = np.argmax(subseg,axis=0)+min_level # get the maximal length for each component
# combine all segments indices into one array
best_seg_ids = np.zeros(npoint,dtype=np.uint)
for iseg in range(len(max_seg_ids)):
id1, id2 = seg_info[max_seg_ids[iseg]][iseg][0], seg_info[max_seg_ids[iseg]][iseg][1]
best_seg_ids[id1:id2+1] = best_seg_ids[id1:id2+1] + 1
# show intersected segments (marked by value > 1)
conflit_ids = np.zeros(npoint,dtype=np.uint)
idx = np.argwhere(best_seg_ids>1).flatten() # the intersected zones are marked by value > 1.
conflit_ids[idx]=1
if len(idx)!=0:
connected_compo, nb_compo = label(conflit_ids) # get all intersected zones
for icompo in range(1,nb_compo+1):
connected_idx = np.argwhere(connected_compo==icompo).flatten()
best_seg_ids[connected_idx] = 0 # first assign this zone by 0
if len(connected_idx)>2: # if the intersected segment has a least 2 points.
seg_1 = connected_idx[0:len(connected_idx)//2]
seg_2 = connected_idx[(len(connected_idx)//2)+1:]
best_seg_ids[seg_1] = 1
best_seg_ids[seg_2] = 1
# finally, extract again all segments from best_seg_ids
pred_segs = []
connected_compo, nb_compo = label(best_seg_ids)
if nb_compo!=0:
for icompo in range(1,nb_compo+1):
connected_idx = np.argwhere(connected_compo==icompo).flatten()
pred_segs.append([connected_idx[0],connected_idx[-1]])
dic = {}
dic['pred_segs'] = pred_segs
dic['max_level'] = max_level
dic['min_level'] = min_level
dic['avg_level'] = avg_level
dic['nb_seg'] = nb_seg
dic['seg_info'] = seg_info
dic['seg_len'] = seg_len
dic['nb_seg_groups'] = nb_seg_groups
dic['duration'] = duration
return dic
[docs] def decompose_intrinsicdim(self,sig_c,delta_sig=None,eps_seg_len=None,eps_crv_len=None,sig_step=1,eps_kappa=0.01,eps_tau=0.01):
"""Decompose curve into multiscale hierarchichal intrinsic segments.
Args:
sig_c (float): central scale of the searching region.
delta_sig (float): width of the searching interval.
sig_step (int): scale step within the searching region.
eps_kappa (float): curvature threshold.
eps_tau (float): torsion threshold.
eps_seg_len (float): segment length threshold.
eps_crv_len (float): curve length threshold.
Returns:
list of segment indices specifying line, plane+line.
"""
if delta_sig is None:
search_width = 0.3*sig_c # 30% of sig_c
else:
search_width = delta_sig
# setting scale range
l1 = sig_c - search_width
if l1 < 0:
l1 = 0
l2 = sig_c + search_width
scale_range = np.arange(l1,l2+sig_step,sig_step)
# setting minimal segment length
if eps_seg_len is None:
eps_seg = int(np.round(self.coors.shape[0]*0.05)) # 5% of nb. of points
else:
# converting eps_seg_len in nb. of points
if self.compute_length() <= eps_seg_len:
eps_seg = self.coors.shape[0] # all points
else:
eps_seg = int(np.round(self.coors.shape[0]*eps_seg_len*1./self.compute_length())) # fraction of points
# compute planeline and line indicators
data = self.scale_space(scale_range,{'planeline','line'},eps_kappa,eps_tau,eps_seg)
planeline_param = self._find_best_segments(data['planeline'])
min_pl_level = planeline_param['min_level']
max_pl_level = planeline_param['max_level']
line_param = self._find_best_segments(data['line'][min_pl_level:max_pl_level+1])
# post processing for scaled curve whose length <= given threshold : esp_crv_len
plids = planeline_param['pred_segs']
lids = line_param['pred_segs']
# setting minimal curve length
if eps_crv_len is None:
if eps_seg_len is not None:
eps_crv = eps_seg_len
else:
eps_crv = self.compute_length()*0.01 # 1% of curve length
else:
eps_crv = eps_crv_len
if (self.convolve_gaussian(sig_c).compute_length() <= eps_crv):
# then we simply see the curve as a straight line
plids = [[0, len(self.coors)-1]]
lids = [[0, len(self.coors)-1]]
planeline_param['pred_segs'] = plids
line_param['pred_segs'] = lids
self.intrinsic_dims = {}
self.intrinsic_dims['scale_range'] = scale_range
self.intrinsic_dims['planeline_scales'] = data['planeline']
self.intrinsic_dims['line_scales'] = data['line']
self.intrinsic_dims['planeline_param'] = planeline_param
self.intrinsic_dims['line_param'] = line_param
return {'planeline_pred':plids,'line_pred':lids}
[docs] def plot_decomposed_table(self,ax,show_selection=True,show_scales=True,aspect=3,xdiv=50,ydiv=10):
"""Display decomposed intrinsic table.
This is only used after running ``decompose_intrinsicdim()``.
Args:
ax : plot axis.
show_selection (bool): if True, then display selection interval.
show_scales (bool): if True, display scale range in y axis.
aspect (int): aspect ratio between x and y axes.
xdiv (int): to calculate xticks.
ydiv (int): to calculate yticks.
"""
newcolors = np.array([[0.,0.7,0.],[0.9,0.9,0.]])
mycmap = ListedColormap(newcolors)
npoints = len(self.coors)
ax.imshow(self.intrinsic_dims['planeline_scales'],cmap=mycmap,aspect=aspect)
ax.contourf(self.intrinsic_dims['line_scales'], 1, hatches=['', '//// \\\\\\\\'], alpha=0.5) # use this to mark line within plane
if show_scales == True:
ax.set_yticks(np.arange(0,len(self.intrinsic_dims['scale_range']),ydiv))
ax.set_yticklabels((np.round(self.intrinsic_dims['scale_range'][0::ydiv],2)).astype(np.int))
else:
ax.set_yticks(np.arange(0,len(self.intrinsic_dims['scale_range']),ydiv))
if show_selection == True:
min_pl_level = self.intrinsic_dims['planeline_param']['min_level']
max_pl_level = self.intrinsic_dims['planeline_param']['max_level']
min_l_level = self.intrinsic_dims['line_param']['min_level']
max_l_level = self.intrinsic_dims['line_param']['max_level']
ax.hlines(min_pl_level,0,npoints,color='yellow',linewidth=5)
ax.hlines(max_pl_level,0,npoints,color='yellow',linewidth=5)
ax.hlines(min_pl_level+min_l_level,0,npoints,color='blue')
ax.hlines(min_pl_level+max_l_level,0,npoints,color='blue')
ax.set_xlim(0,npoints);
ax.set_xticks(np.arange(0,npoints,xdiv));
ax.invert_yaxis();
ax.grid('on');
ax.set_ylabel('scale')
ax.set_xlabel('u')
[docs] def plot_intrinsicdim(self,ax,projection='3d',scales=(1.,1.,1.),
root_args={"c":"blue","s":100},
dim1d_args={"c":"magenta","lw":1,"alpha":1.},
dim2d_args={"c":"yellow","lw":3,"alpha":0.7},
dim3d_args={"c":"green","lw":3,"alpha":0.7},
plane_color="yellow",
overrided_curve=None):
"""Display curve superimposed by estimated intrinsic dimensions.
This is only used after running ``decompose_intrinsicdim()``.
Args:
ax : plot axis.
projection (str): support *3d, xy, xz, yz* modes.
scales (tuple of float): specify x, y, z scales.
overrided_curve (Curve): superimpose estimated intrinsic dimensions on the overrided_curve.
if None, then a scaled curve computed from ``decompose_intrinsicdim()`` is used as default.
"""
from genepy3d.obj.points import Points
if overrided_curve is not None:
P = overrided_curve.coors
else:
min_pl_level = self.intrinsic_dims['planeline_param']['min_level']
max_pl_level = self.intrinsic_dims['planeline_param']['max_level']
avg_pl_level = int((min_pl_level+max_pl_level)/2)
avg_scale = self.intrinsic_dims['scale_range'][avg_pl_level]
P = self.convolve_gaussian(avg_scale).coors
pl.plot_point(ax,projection,P[0,0],P[0,1],P[0,2],scales,point_args=root_args)
pl.plot_line(ax,projection,P[:,0],P[:,1],P[:,2],scales,line_args=dim3d_args)
pred_pl_ids = self.intrinsic_dims['planeline_param']['pred_segs']
pred_l_ids = self.intrinsic_dims['line_param']['pred_segs']
for i in pred_pl_ids:
idx = range(i[0],i[1]+1)
pl.plot_line(ax,projection,P[idx,0],P[idx,1],P[idx,2],scales,line_args=dim2d_args)
if projection == '3d':
data = P[idx]
c, normal = Points(data).fit_plane()
maxx = np.max(data[:,0])
maxy = np.max(data[:,1])
minx = np.min(data[:,0])
miny = np.min(data[:,1])
pnt = np.array([0.0, 0.0, c])
d = -pnt.dot(normal)
xx, yy = np.meshgrid([minx, maxx], [miny, maxy])
z = (-normal[0]*xx - normal[1]*yy - d)*1. / normal[2]
ax.plot_surface(xx, yy, z, color=plane_color,alpha=0.4)
for i in pred_l_ids:
idx = range(i[0],i[1]+1)
pl.plot_line(ax,projection,P[idx,0],P[idx,1],P[idx,2],scales,line_args=dim1d_args)
if projection != '3d':
ax.axis('equal')
else:
param = pl.fix_equal_axis(self.coors / np.array(scales))
ax.set_xlim(param['xmin'],param['xmax'])
ax.set_ylim(param['ymin'],param['ymax'])
ax.set_zlim(param['zmin'],param['zmax'])
[docs] def compute_local_3d_scale_sigma(self,sig_lst,delta_sig=0,sig_step=1,eps_seg_len=None,eps_crv_len=None,eps_kappa=0.01,eps_tau=0.01,return_dim_results=False):
"""Computing local 3d scale of curve from the list of sigma i.e stdev of Gaussian function.
Local 3D scale: scale at which point transforms from 3D to 2D/1D.
Args:
sig_lst (list (float)): list of sigma values.
delta_sig (float): width of the searching interval.
sig_step (int|float): scale step within the searching region.
eps_seg_len (float): segment length threshold.
eps_crv_len (float): curve length threshold.
eps_kappa (float): curvature threshold.
eps_tau (float): torsion threshold.
return_dim_results (bool): if True, then return intrinsic dim estimation of r_lst
Returns:
list of local 3d scales for every points on curve.
"""
npnt = self.coors.shape[0]
nrad = len(sig_lst)
counter = np.zeros(npnt)
maxlen = np.ones(npnt)*-1.
firstid = np.ones(npnt)*(nrad-1)
if return_dim_results==True:
dim_results = []
# default delta sigma
if delta_sig is None:
delta_sig = 3 * sig_step
# intrinsic dim estimation
for ir in range(nrad):
sc = sig_lst[ir]
res = self.decompose_intrinsicdim(sc,delta_sig,
eps_seg_len,eps_crv_len,sig_step,
eps_kappa,eps_tau)
if return_dim_results==True:
dim_results.append(res)
dimid = []
for item in res['planeline_pred']:
dimid = dimid + list(range(item[0],item[1]+1))
countedid = np.argwhere(counter>0).flatten()
intersectedid = np.intersect1d(countedid,dimid)
diffid = np.setdiff1d(countedid,dimid)
if len(intersectedid)>0:
counter[intersectedid] = counter[intersectedid] + 1.
if len(diffid)>0:
checkid = np.argwhere(counter[diffid]>maxlen[diffid]).flatten()
if len(checkid)>0:
updateid = diffid[checkid]
maxlen[updateid] = counter[updateid]
firstid[updateid] = ir - counter[updateid]
counter[updateid] = 0. # reset counter
uncountedid = np.argwhere(counter==0).flatten()
intersectedid = np.intersect1d(uncountedid,dimid)
if len(intersectedid)>0:
counter[intersectedid] = 1.
# remove peaks
nonvalidid = np.argwhere(maxlen==1.).flatten()
firstid[nonvalidid] = nrad-1
# when the point has 2D/1D until the end of radius
finalcheckid = np.argwhere((counter>maxlen)&(counter!=0)).flatten()
if len(finalcheckid)>0:
firstid[finalcheckid] = nrad - counter[finalcheckid]
if return_dim_results==False:
return list(sig_lst[firstid.astype(np.uint)])
else:
return (list(sig_lst[firstid.astype(np.uint)]),dim_results)
[docs] def compute_local_3d_scale_radius(self,r_lst,eps_seg_len=None,eps_crv_len=None,sig_step=1,eps_kappa=0.01,eps_tau=0.01,return_dim_results=False):
"""Computing local 3d scale of curve from the list of radius of curvatures.
Local 3D scale: scale at which point transforms from 3D to 2D/1D.
Args:
r_lst (list (float)): list of radii of curvatures.
eps_seg_len (float): segment length threshold.
eps_crv_len (float): curve length threshold.
sig_step (int): scale step within the searching region.
eps_kappa (float): curvature threshold.
eps_tau (float): torsion threshold.
return_dim_results (bool): if True, then return intrinsic dim estimation of r_lst
Returns:
list of local 3d scales for every points on curve.
"""
npnt = self.coors.shape[0]
nrad = len(r_lst)
counter = np.zeros(npnt)
maxlen = np.ones(npnt)*-1.
firstid = np.ones(npnt)*(nrad-1)
rstbl = RadiusScaleTable(self,max(r_lst)+1)
if return_dim_results==True:
dim_results = []
# intrinsic dim estimation
for ir in range(nrad):
try:
sc, delta_sig = rstbl.compute_scales_from_radius(r_lst[ir])
except:
raise Exception("failed at ir={}".format(ir))
res = self.decompose_intrinsicdim(sc,delta_sig,
eps_seg_len,eps_crv_len,sig_step,
eps_kappa,eps_tau)
if return_dim_results==True:
dim_results.append(res)
dimid = []
for item in res['planeline_pred']:
dimid = dimid + list(range(item[0],item[1]+1))
countedid = np.argwhere(counter>0).flatten()
intersectedid = np.intersect1d(countedid,dimid)
diffid = np.setdiff1d(countedid,dimid)
if len(intersectedid)>0:
counter[intersectedid] = counter[intersectedid] + 1.
if len(diffid)>0:
checkid = np.argwhere(counter[diffid]>maxlen[diffid]).flatten()
if len(checkid)>0:
updateid = diffid[checkid]
maxlen[updateid] = counter[updateid]
firstid[updateid] = ir - counter[updateid]
counter[updateid] = 0. # reset counter
uncountedid = np.argwhere(counter==0).flatten()
intersectedid = np.intersect1d(uncountedid,dimid)
if len(intersectedid)>0:
counter[intersectedid] = 1.
# remove peaks
nonvalidid = np.argwhere(maxlen==1.).flatten()
firstid[nonvalidid] = nrad-1
# when the point has 2D/1D until the end of radius
finalcheckid = np.argwhere((counter>maxlen)&(counter!=0)).flatten()
if len(finalcheckid)>0:
firstid[finalcheckid] = nrad - counter[finalcheckid]
if return_dim_results==False:
return list(r_lst[firstid.astype(np.uint)])
else:
return (list(r_lst[firstid.astype(np.uint)]),dim_results)
[docs] def compute_local_3d_scale(self,r_lst,eps_seg_len=None,eps_crv_len=None,sig_step=1,eps_kappa=0.01,eps_tau=0.01,return_dim_results=False):
"""Computing local 3d scale of curve. [1]
Local 3D scale: scale at which point transforms from 3D to 2D/1D.
[1] https://doi.org/10.1101/2020.06.01.127035
This function is DEPRECATED. Use compute_local_3d_scale_radius().
Args:
r_lst (list (float)): list of radii of curvatures.
eps_seg_len (float): segment length threshold.
eps_crv_len (float): curve length threshold.
sig_step (int): scale step within the searching region.
eps_kappa (float): curvature threshold.
eps_tau (float): torsion threshold.
return_dim_results (bool): if True, then return intrinsic dim estimation of r_lst
Returns:
list of local 3d scales for every points on curve.
"""
npnt = self.coors.shape[0]
nrad = len(r_lst)
counter = np.zeros(npnt)
maxlen = np.ones(npnt)*-1.
firstid = np.ones(npnt)*(nrad-1)
rstbl = RadiusScaleTable(self,max(r_lst)+1)
if return_dim_results==True:
dim_results = []
# intrinsic dim estimation
for ir in range(nrad):
try:
sc, delta_sig = rstbl.compute_scales_from_radius(r_lst[ir])
except:
raise Exception("failed at ir={}".format(ir))
res = self.decompose_intrinsicdim(sc,delta_sig,
eps_seg_len,eps_crv_len,sig_step,
eps_kappa,eps_tau)
if return_dim_results==True:
dim_results.append(res)
dimid = []
for item in res['planeline_pred']:
dimid = dimid + list(range(item[0],item[1]+1))
countedid = np.argwhere(counter>0).flatten()
intersectedid = np.intersect1d(countedid,dimid)
diffid = np.setdiff1d(countedid,dimid)
if len(intersectedid)>0:
counter[intersectedid] = counter[intersectedid] + 1.
if len(diffid)>0:
checkid = np.argwhere(counter[diffid]>maxlen[diffid]).flatten()
if len(checkid)>0:
updateid = diffid[checkid]
maxlen[updateid] = counter[updateid]
firstid[updateid] = ir - counter[updateid]
counter[updateid] = 0. # reset counter
uncountedid = np.argwhere(counter==0).flatten()
intersectedid = np.intersect1d(uncountedid,dimid)
if len(intersectedid)>0:
counter[intersectedid] = 1.
# remove peaks
nonvalidid = np.argwhere(maxlen==1.).flatten()
firstid[nonvalidid] = nrad-1
# when the point has 2D/1D until the end of radius
finalcheckid = np.argwhere((counter>maxlen)&(counter!=0)).flatten()
if len(finalcheckid)>0:
firstid[finalcheckid] = nrad - counter[finalcheckid]
if return_dim_results==False:
return list(r_lst[firstid.astype(np.uint)])
else:
return (list(r_lst[firstid.astype(np.uint)]),dim_results)
def _find_ridge(self,idx,C,step):
"""Find ridge starting from index idx at level 0 (noiseless case) in scale space C.
The interval size to find the next closest index of upper level is specified by step.
This is the support function for ``principal_turns()``.
Args:
idx (int): index to start identifying the ridge.
C (array of float): matrix of scale space where searching for the ridge.
step (int): threshold used to search the next closest index at the upper levels.
Returns:
array specifying the indices of ridge and a warning flag.
Note:
If warning flag = 1, then there's error when finding ridge.
It could be due to the conflicts with anothers ridges during ridge evolution.
"""
check_idx = idx
ridge_idx = []
ridge_idx.append((0,check_idx)) # add first ridge index
warning = 0
m, n = C.shape[0], C.shape[1]
for i in range(1,m):
subC = np.zeros(2*step+1)
subC_start, subC_stop = 0, 2*step
C_start, C_stop = check_idx-step, check_idx+step
# check border condition
if (check_idx-step)<0:
C_start = 0
subC_start = 0 - (check_idx-step)
if (check_idx+step)>(n-1):
C_stop = n-1
subC_stop = (2*step) - ((check_idx+step) - (n-1))
# extract local maxima at the upper level between interval specified by step
subC[subC_start:(subC_stop+1)] = C[i,C_start:(C_stop+1)]
candidate_idx = np.argwhere(subC!=0).flatten()
# select the suitable index based on distance
if len(candidate_idx)==0:
break
elif len(candidate_idx)==1:
check_idx = check_idx + candidate_idx[0] - step
ridge_idx.append((i,check_idx))
else:
darr = np.abs(candidate_idx-step)
smallest_idx = np.argwhere(darr==min(darr)).flatten()
if len(smallest_idx)==1:
check_idx = check_idx + candidate_idx[smallest_idx[0]] - step
ridge_idx.append((i,check_idx))
else:
warning = 1
break
return (np.array(ridge_idx), warning)
def _removepnt_byangle(self,breakid,angle_thr):
"""Remove indices based on angle threshold.
This is support function for ``principal_turns()``.
Args:
breakid (array of int): list of potential principal indices.
angle_thr (float): angle threshold in degree.
Returns:
new break indices.
"""
x, y, z = self.coors[:,0], self.coors[:,1], self.coors[:,2]
newbreakid = breakid.copy()
idx = 2
while(idx<len(newbreakid)):
ia = newbreakid[idx-2]
ib = newbreakid[idx-1]
ic = newbreakid[idx]
a = np.array([x[ia],y[ia],z[ia]])
b = np.array([x[ib],y[ib],z[ib]])
c = np.array([x[ic],y[ic],z[ic]])
if np.degrees(angle3points(a,b,c))<=angle_thr:
newbreakid = np.delete(newbreakid,(idx-1))
else:
idx = idx + 1
return newbreakid
[docs] def main_turns(self,nbscales=None,search_step=5,eps_kappa=None,ridgelength_thr=0.1,angle_thr=20,min_dist=None):
"""Compute the main turns of curve.
Args:
nbscales (int): number of scales used to check the curve evolution.
search_step (int): number of up-scale levels to search for the next point of ridge.
eps_kappa (float): curvature threshold.
ridgelength_thr (float): ridge length threshold (percentage compared to the longest ridge).
angle_thr (float): angle threshold in degree.
min_dist (float): minimal distance between two main turns.
Returns:
indices of main turns.
"""
# local maximal curvatures indices at the original scale
# extid = argrelextrema(crvr.compute_curvature(),np.greater)[0]
if nbscales is None:
ref_r = self.compute_length()/3.
rstbl = RadiusScaleTable(self,ref_r)
_nbscales, _ = rstbl.compute_scales_from_radius()
else:
_nbscales = nbscales
# scale space
data = self.scale_space(range(int(_nbscales)),features={'curvature','ridge'})
# setting kappa threshold
if eps_kappa is None:
kappa_max = np.max(self.compute_curvature())
kappa_thr = kappa_max*0.05 # 1/20 of maximal curvature
else:
kappa_thr = eps_kappa
# identify ridges in curvature scale space
ridge_lst, length_lst, warning_lst = [],[],[]
try:
start_idx = np.argwhere((data['ridge'][0,:]!=0)&(data['curvature'][0,:]>kappa_thr)).flatten()
except:
return [] # there is no data for ridge and curvature
for idx in start_idx:
ridge,warning = self._find_ridge(idx,data['ridge'],search_step)
ridge_lst.append(ridge)
length_lst.append(len(ridge))
warning_lst.append(warning)
length_arr = np.array(length_lst)
if (len(length_arr)==0):
return []
# select ridges based on ridge length
p = max(length_arr)*ridgelength_thr
selected_ridges = length_arr>=p
selected_idx = start_idx[selected_ridges]
# print(selected_idx)
# remove indices based on angle criteria
npoints = self.coors.shape[0]
breakid = np.sort(np.array([0]+selected_idx.tolist()+[npoints-1]))
newbreakid = self._removepnt_byangle(breakid,angle_thr)
# print(newbreakid)
# remove points very close to two sides (to compensate the error? but not know where...)
# Modify into distance threshold
if len(newbreakid)>2:
if(newbreakid[1]<(1/50.)*npoints):
newbreakid = np.delete(newbreakid,1)
if len(newbreakid)>2:
idx = len(newbreakid)-2
if((npoints-1-newbreakid[idx])<(1/50.)*npoints):
newbreakid = np.delete(newbreakid,idx)
selected_idx = newbreakid[1:-1]
# remove turns whose distance is too small
if (min_dist != None) & (len(selected_idx)>1):
check_idx = 0
while(check_idx<len(selected_idx)-1):
ix1, ix2 = selected_idx[check_idx], selected_idx[check_idx+1]
coors = self.coors[ix1:ix2+1]
if(geo_len(coors)<=min_dist):
selected_idx = np.delete(selected_idx,check_idx+1)
else:
check_idx += 1
# ignored_idx = np.setdiff1d(extid,selected_idx)
# self.ppt = {} # main turns results
# self.ppt['scale_range'] = scale_range
# self.ppt['curvature_scales'] = data['curvature']
# self.ppt['ridge_scales'] = data['ridge']
# self.ppt['ridge_lst'] = ridge_lst
# self.ppt['ridge_ids'] = start_idx
# self.ppt['selected_ridge_ids'] = start_idx[selected_ridges]
# self.ppt['ppt_ids'] = selected_idx
# self.ppt['excluded_ids'] = ignored_idx
return selected_idx
[docs] def plot_ridge_map(self,ax,show_scales=True):
"""Display ridge finding from principal turns computation.
This is only used after running ``main_turns()``.
Args:
ax : plot axis.
show_scales (bool): if True, display scale range in y axis.
"""
ridge_ids = self.ppt['ridge_ids']
ridge_lst = self.ppt['ridge_lst']
selected_ridge_lst = self.ppt['selected_ridge_ids']
ppt_ids = self.ppt['ppt_ids']
scale_range = self.ppt['scale_range']
for i in range(len(ridge_ids)):
rid = ridge_ids[i]
r = ridge_lst[i]
if rid in selected_ridge_lst:
if rid in ppt_ids:
ax.plot(r[:,1],r[:,0],c='green')
else:
ax.plot(r[:,1],r[:,0],c='magenta')
else:
ax.plot(r[:,1],r[:,0],c='red')
ax.set_ylim(0,len(scale_range))
ax.set_yticks(range(len(scale_range)))
if show_scales == True:
ax.set_yticklabels(scale_range)
[docs] def to_points(self):
"""Convert Curve object to Points object.
"""
from genepy3d.obj.points import Points
return Points(self.coors)
[docs] def plot(self,ax,projection="3d",scales=(1.,1.,1.),show_root=True,root_args={"color":"red"},line_args={'c':'blue',"lw":1},point_args=None):
"""Plot curve.
"""
line_pl = None
if line_args is not None:
line_pl = pl.plot_line(ax,projection,self.coors[:,0],self.coors[:,1],self.coors[:,2],scales,line_args=line_args)
point_pl = None
if point_args is not None:
point_pl = pl.plot_point(ax,projection,self.coors[:,0],self.coors[:,1],self.coors[:,2],scales,point_args=point_args)
root_pl = None
if show_root == True:
root_pl = pl.plot_point(ax,projection,self.coors[0,0],self.coors[0,1],self.coors[0,2],scales,point_args=root_args)
return (line_pl, point_pl, root_pl)
# Default parameters for simulation model
DEFAULT_PARAMS_3D = {
'n': [np.uint32(3*1e4)],
'v': [1*1e-6],
'omega': [2*np.pi],
'zoom': range(5,11,1)
}
DEFAULT_PARAMS_PLANE = {
'n': [np.uint32(3*1e4)],
'v': [i*1e-6 for i in range(10,12,1)],
'omega': [0]
}
DEFAULT_PARAMS_LINE = {
'length': range(100,151,10)
}
[docs]class SimuIntrinsic:
"""Generate curve in 3D with random intrinsic dimension segments.
The process is simulated based on 2D and 3D Brownian motion.
Attributes:
seg_lbl (array of int): list of intrinsic dimensions, 0 for 3d, 1 for 2d, 2 for 1d.
npoints (int): number of points on simulated curve.
sigma (float): for adding gaussian noise.
params (array of dic | dic): simulation parameters.
random_state (int): seed number used to memorize the simulated curve.
max_seg (int): maximal number of segments.
if this is set, then number of simulated intrinsic segments will be <= max_seg.
nb_seg (int): number of segments.
if this is set, then number of simulated intrinsic segments will be fixed at nb_seg.
remove_zero_replica (bool): if True, then replace the '0,0,..' sequence into only one '0'.
Note:
specify either seg_lbl, max_seg or nb_seg whose priority order is seg_lbl, then max_seg and then nb_seg.
"""
def __init__(self,seg_lbl=[0,1,2],npoints=1000,sigma=1.,random_state=None,params={},max_seg=None,nb_seg=None,remove_zero_replica=True,spline_order=1):
self.npoints = npoints
self.sigma = sigma
self.random_state = random_state
self.spline_order = spline_order
# generate segment labels
if nb_seg is not None:
self.seg_lbl = self._gen_seg_lbl(nb_seg=nb_seg,remove_zero_replica=remove_zero_replica)
elif max_seg is not None:
self.seg_lbl = self._gen_seg_lbl(nb_seg=max_seg,is_max=True,remove_zero_replica=remove_zero_replica)
elif seg_lbl is not None:
if remove_zero_replica == True:
# remove "0,0,.." pattern
if remove_zero_replica==True:
ilbl = 0
while (ilbl<len(seg_lbl)):
if(seg_lbl[ilbl]!=0):
ilbl = ilbl + 1
elif(ilbl+1)<len(seg_lbl):
if seg_lbl[ilbl+1]==0:
seg_lbl.pop(ilbl)
else:
ilbl = ilbl + 1
else:
ilbl = ilbl + 1
self.seg_lbl = seg_lbl
else:
raise Exception("must provide seg_lbl or nb_seg or max_seg.")
# generate segments parameters
self.seg_param = self._gen_seg_para(params)
def _gen_seg_lbl(self,nb_seg,is_max=False,remove_zero_replica=True):
"""Generate a list of random segment labels: 0 is pure 3d, 1 is plane and 2 is line.
This is the internal method used in ``__init__()``.
Args:
nb_seg (int): number of segments.
is_max (bool): if True generate number of segments <= nb_seg.
remove_zero_replica (bool): if True, then replace the '0,0,..' sequence into only one '0'.
Returns:
seg_lbl (array of int): list of intrinsic segments labels.
"""
# init random seed
if self.random_state is not None:
np.random.seed(self.random_state)
# init a random nb. of segment
if is_max==False:
ns = nb_seg
else:
ns = np.random.permutation(range(1,nb_seg+1))[0]
# init a random nb. of plane and line segments <= nb_seg
nb_planes_lines = np.random.permutation(range(1,ns+1))[0]
seg_lbl = np.zeros(ns,dtype=np.uint8)
ids = np.random.permutation(ns)[:nb_planes_lines] # random set of plane and line indices
seg_lbl[ids] = np.random.randint(1,3,len(ids)) # random assign plane or line labels
# convert to list
seg_lbl = seg_lbl.tolist()
# remove "0,0,.." pattern
if remove_zero_replica==True:
ilbl = 0
while (ilbl<len(seg_lbl)):
if(seg_lbl[ilbl]!=0):
ilbl = ilbl + 1
elif(ilbl+1)<len(seg_lbl):
if seg_lbl[ilbl+1]==0:
seg_lbl.pop(ilbl)
else:
ilbl = ilbl + 1
else:
ilbl = ilbl + 1
return seg_lbl
def _gen_seg_para(self,params):
"""Generate parameters for each segment from ``seg_lbl``.
This is the internal method used in ``__init()__``.
Args:
params (array of dic | dic): ranges of segment parameters.
Returns:
seg_para (array of dic): list of segments parameters.
"""
if self.random_state is not None:
np.random.seed(self.random_state)
params_3d = DEFAULT_PARAMS_3D
params_plane = DEFAULT_PARAMS_PLANE
params_line = DEFAULT_PARAMS_LINE
if isinstance(params,dict):
if '3d' in params.keys():
params_3d = params['3d']
if 'plane' in params.keys():
params_plane = params['plane']
if 'line' in params.keys():
params_line = params['line']
seg_para = []
for i in range(len(self.seg_lbl)):
ilbl = self.seg_lbl[i]
record = {}
if ilbl==0: # pure 3d segment
try:
_para = params[i]['n']
except:
_para = params_3d['n']
idx = np.random.permutation(range(len(_para)))[0]
record['n'] = _para[idx]
try:
_para = params[i]['v']
except:
_para = params_3d['v']
idx = np.random.permutation(range(len(_para)))[0]
record['v'] = _para[idx]
try:
_para = params[i]['omega']
except:
_para = params_3d['omega']
idx = np.random.permutation(range(len(_para)))[0]
record['omega'] = _para[idx]
try:
_para = params[i]['zoom']
except:
_para = params_3d['zoom']
idx = np.random.permutation(range(len(_para)))[0]
record['zoom'] = _para[idx]
elif ilbl==1: # plane
try:
_para = params[i]['n']
except:
_para = params_plane['n']
idx = np.random.permutation(range(len(_para)))[0]
record['n'] = _para[idx]
try:
_para = params[i]['v']
except:
_para = params_plane['v']
idx = np.random.permutation(range(len(_para)))[0]
record['v'] = _para[idx]
try:
_para = params[i]['omega']
except:
_para = params_plane['omega']
idx = np.random.permutation(range(len(_para)))[0]
record['omega'] = _para[idx]
else: # line
try:
_para = params[i]['length']
except:
_para = params_line['length']
idx = np.random.permutation(range(len(_para)))[0]
record['length'] = _para[idx]
seg_para.append(record)
return seg_para
[docs] def gen(self):
"""Simulate a 3D curve with random intrinsic lines, planes and 3D.
Returns:
itself.
Note:
generated curve for noiseless case can contain duplicata points.
"""
from genepy3d.obj.points import Points
tmp = []
ipl, iln = 0,0
seg_len = np.zeros(len(self.seg_param))
# read each segment para
for iseg in range(len(self.seg_lbl)):
lbl = self.seg_lbl[iseg]
par = self.seg_param[iseg]
if lbl==0: # 3d segment
# generate 3d active brownian process
P,_ = active_brownian_3d(n=par['n'],v=par['v'],omega=par['omega'],seed_point=self.random_state)
P = P * 1e6 # to micron
# interpolation
coef, to = interpolate.splprep([P[:,0],P[:,1],P[:,2]],k=self.spline_order,s=0)
ti = np.linspace(0,1,1000)
xi, yi, zi = interpolate.splev(ti,coef)
Pi = np.array([xi,yi,zi]).T
# smooth a bit
Ps = Curve(Pi).convolve_gaussian(1.0).coors
# random transformation
phi, theta, psi = (((2*np.pi) - (0.)) * np.random.random(3) + (0.))
zx,zy,zz = par['zoom'], par['zoom'], par['zoom']
Pt = Points(Ps).transform(phi,theta,psi,0.,0.,0.,zx,zy,zz).coors
tmp.append(Pt)
seg_len[iseg] = geo_len(Pt)
elif lbl==1: # plane segment
ipl = ipl + 1 # plane numerator
# generate 2d active brownian process
P,_ = active_brownian_2d(n=par['n'],v=par['v'],seed_point=self.random_state)
P = P * 1e6 # to micron
P = np.append(P,np.zeros((P.shape[0],1)),axis=1) # add z coordinates
# interpolation
coef, to = interpolate.splprep([P[:,0],P[:,1],P[:,2]],k=self.spline_order,s=0)
ti = np.linspace(0,1,1000)
xi, yi, zi = interpolate.splev(ti,coef)
Pi = np.array([xi,yi,zi]).T
# random transformation
alpha = 2*np.pi/3.
# alpha = (((np.pi/2.) - (2*np.pi/3.)) * np.random.random(1) + (2*np.pi/3.))[0]
beta = (((2*np.pi) - (0.)) * np.random.random(1) + (0.))[0]
if (ipl%2)!=0:
phi, theta, psi = alpha,0.,beta
else:
phi, theta, psi = 0.,alpha,beta
Pt = Points(Pi).transform(phi,theta,psi,0.,0.,0.).coors
tmp.append(Pt)
seg_len[iseg] = geo_len(Pt)
else: # line
iln = iln + 1 # line numerator
# a simple line in z axis
P = np.array([np.zeros(par['length']),np.zeros(par['length']),np.arange(par['length'])]).T
# random transformation
alpha = -np.pi/3.
# alpha = (((-np.pi/4.) - (-np.pi/3.)) * np.random.random(1) + (-np.pi/3.))[0]
beta = (((2*np.pi) - (0.)) * np.random.random(1) + (0.))[0]
# print(beta)
if (iln%2)==0:
phi, theta, psi = alpha,0.,beta
else:
phi, theta, psi = 0.,alpha,beta
Pt = Points(P).transform(phi,theta,psi,0.,0.,0.).coors
tmp.append(Pt)
seg_len[iseg] = geo_len(Pt)
# concatenate segments into curve
Pcrv = []
break_ids = np.zeros(len(self.seg_param)+1,dtype=np.int16)
plane_ids = []
line_ids = []
s = 0
full_len = np.sum(seg_len)
for iseg in range(len(self.seg_param)):
lbl = self.seg_lbl[iseg]
# compute proportion of nb. of. points w.r.t. segment length
if iseg!=(len(self.seg_param)-1):
npoints_seg = int((seg_len[iseg]*1./full_len)*self.npoints)
s = s + npoints_seg
else:
npoints_seg = self.npoints - s
# interpolation
P = tmp[iseg]
coef, to = interpolate.splprep([P[:,0],P[:,1],P[:,2]],k=self.spline_order,s=0)
ti = np.linspace(0,1,npoints_seg)
xi, yi, zi = interpolate.splev(ti,coef)
Pi = np.array([xi,yi,zi]).T
# concatenation
if iseg==0:
Pcrv = Pcrv + Pi.tolist()
else:
dx, dy, dz = Pcrv[-1] - Pi[0]
Pt = Points(Pi).transform(0.,0.,0.,dx,dy,dz).coors
Pcrv = Pcrv + Pt.tolist()
# save break index
break_ids[iseg+1] = len(Pcrv)
if lbl==1: # plane
plane_ids.append([len(Pcrv)-npoints_seg,len(Pcrv)-1])
elif lbl==2: # line
line_ids.append([len(Pcrv)-npoints_seg,len(Pcrv)-1])
# add noise to 3d curve
Pcrv = np.array(Pcrv)
Pcrv_noise = Pcrv + np.random.randn(len(Pcrv),3)*self.sigma
self.coors = Pcrv
self.coors_noise = Pcrv_noise
self.break_ids = break_ids
self.plane_ids = plane_ids
self.line_ids = line_ids
return self
[docs] def add_noise(self,sigma):
"""Add Gaussian noise to curve.
Args:
sigma (float): std of Gaussian kernel.
Returns:
itself.
"""
self.sigma = sigma
self.coors_noise = self.coors + np.random.randn(len(self.coors),3)*self.sigma
return self
[docs] def get_curve(self,noisy=True):
"""Return curve with/without noise.
Args:
noisy (bool): if True, then return noisy curve, and False otherwise.
Returns:
Curve
"""
if noisy==True:
return Curve(self.coors_noise)
else:
return Curve(self.coors)
[docs] def evaluate(self,pred_line_ids,pred_plane_ids,metric="f1"):
"""Evaluate accuracies of intrinsic dimension estimation.
Args:
pred_line_ids (list): list of predicted lines indices.
pred_plane_ids (list): list of predicted planes indices.
metric (str): accuracy metric, we support: *f1, jaccard, balanced* as in scikit learn.
Returns:
A tuple including
- [line_acc, line_pre, line_rec]: line accuracy, precision and recall.
- [plane_acc, plane_pre, plane_rec]: plane accuracy, precision and recall.
- [nonplane_acc, nonplane_pre, nonplane_rec]: 3D accuracy, precision and recall.
"""
true_line_ids = self.line_ids
true_plane_ids = self.plane_ids
n = self.npoints
def _get_segs(P):
"""Get list of local planes or lines for a given scale of the planes or lines evolution data.
"""
seg_list = []
connected_compo, nb_compo = label(P)
if nb_compo != 0:
for i_compo in range(1,nb_compo+1):
idx = np.argwhere(connected_compo==i_compo).flatten()
seg_list.append([idx[0],idx[-1]])
return seg_list
# line score
if((len(true_line_ids)==0) | (len(pred_line_ids)==0)):
line_acc = -1 # not consider this case for simplcity
line_pre = -1
line_rec = -1
else:
acc_line = []
pre_line, rec_line = [], []
for true_idx in true_line_ids:
true_tmp = np.zeros(n,dtype=np.uint)
true_tmp[true_idx[0]:true_idx[-1]+1]=1
acc = 0.
pre, rec = 0., 0.
for pred_idx in pred_line_ids:
pred_tmp = np.zeros(n,dtype=np.uint)
pred_tmp[pred_idx[0]:pred_idx[-1]+1]=1
pre_tmp = metrics.precision_score(true_tmp,pred_tmp)
if pre_tmp > pre:
pre = pre_tmp
rec_tmp = metrics.recall_score(true_tmp,pred_tmp)
if rec_tmp > rec:
rec = rec_tmp
if metric=="f1":
acc_tmp = metrics.f1_score(true_tmp,pred_tmp)
elif metric=="jaccard":
acc_tmp = metrics.jaccard_score(true_tmp,pred_tmp)
else:
acc_tmp = metrics.balanced_accuracy_score(true_tmp,pred_tmp)
if acc_tmp > acc:
acc = acc_tmp
acc_line.append(acc)
pre_line.append(pre)
rec_line.append(rec)
line_acc = np.mean(np.array(acc_line))
line_pre = np.mean(np.array(pre_line))
line_rec = np.mean(np.array(rec_line))
# plane score
if((len(true_plane_ids)==0) | (len(pred_plane_ids)==0)):
plane_acc = -1 # not consider this case for simplcity
plane_pre = -1
plane_rec = -1
else:
acc_plane = []
pre_plane, rec_plane = [], []
for true_idx in true_plane_ids:
true_tmp = np.zeros(n,dtype=np.uint)
true_tmp[true_idx[0]:true_idx[-1]+1]=1
acc = 0.
pre, rec = 0., 0.
for pred_idx in pred_plane_ids:
pred_tmp = np.zeros(n,dtype=np.uint)
pred_tmp[pred_idx[0]:pred_idx[-1]+1]=1
pre_tmp = metrics.precision_score(true_tmp,pred_tmp)
if pre_tmp > pre:
pre = pre_tmp
rec_tmp = metrics.recall_score(true_tmp,pred_tmp)
if rec_tmp > rec:
rec = rec_tmp
if metric=="f1":
acc_tmp = metrics.f1_score(true_tmp,pred_tmp)
elif metric=="jaccard":
acc_tmp = metrics.jaccard_score(true_tmp,pred_tmp)
else:
acc_tmp = metrics.balanced_accuracy_score(true_tmp,pred_tmp)
if acc_tmp > acc:
acc = acc_tmp
acc_plane.append(acc)
pre_plane.append(pre)
rec_plane.append(rec)
plane_acc = np.mean(np.array(acc_plane))
plane_pre = np.mean(np.array(pre_plane))
plane_rec = np.mean(np.array(rec_plane))
# 3d score
tmp = np.ones(n,dtype=np.uint)
for ip in (true_plane_ids+true_line_ids):
tmp[ip[0]:ip[-1]+1]=0
true_nonplane_ids = _get_segs(tmp)
# print(true_nonplane_ids)
tmp = np.ones(n,dtype=np.uint)
for ip in (pred_plane_ids+pred_line_ids):
tmp[ip[0]:ip[-1]+1]=0
pred_nonplane_ids = _get_segs(tmp)
# print(pred_nonplane_ids)
if((len(true_nonplane_ids)==0) | (len(pred_nonplane_ids)==0)):
nonplane_acc = -1 # not consider this case for simplcity
nonplane_pre = -1
nonplane_rec = -1
else:
acc_nonplane = []
pre_nonplane, rec_nonplane = [], []
for true_idx in true_nonplane_ids:
true_tmp = np.zeros(n,dtype=np.uint)
true_tmp[true_idx[0]:true_idx[-1]+1]=1
acc = 0.
pre, rec = 0., 0.
for pred_idx in pred_nonplane_ids:
pred_tmp = np.zeros(n,dtype=np.uint)
pred_tmp[pred_idx[0]:pred_idx[-1]+1]=1
pre_tmp = metrics.precision_score(true_tmp,pred_tmp)
if pre_tmp > pre:
pre = pre_tmp
rec_tmp = metrics.recall_score(true_tmp,pred_tmp)
if rec_tmp > rec:
rec = rec_tmp
if metric=="f1":
acc_tmp = metrics.f1_score(true_tmp,pred_tmp)
elif metric=="jaccard":
acc_tmp = metrics.jaccard_score(true_tmp,pred_tmp)
else:
acc_tmp = metrics.balanced_accuracy_score(true_tmp,pred_tmp)
if acc_tmp > acc:
acc = acc_tmp
acc_nonplane.append(acc)
pre_nonplane.append(pre)
rec_nonplane.append(rec)
nonplane_acc = np.mean(np.array(acc_nonplane))
nonplane_pre = np.mean(np.array(pre_nonplane))
nonplane_rec = np.mean(np.array(rec_nonplane))
return ([line_acc, line_pre, line_rec],
[plane_acc, plane_pre, plane_rec],
[nonplane_acc, nonplane_pre, nonplane_rec])
[docs] def plot(self,ax,projection='3d',noisy=True,scales=(1.,1.,1.),show_interpoints=False):
"""Plot simulated curve with its intrinsic segments.
Args:
ax: plot axis.
projection (str): support *3d, xy, xz, yz* modes.
noisy (bool): plot noisy curve or noiseless curve.
scales (tuple of float): specify x, y and z scales.
show_interpoints (bool): if True, points in between intrinsic segments are displayed.
"""
from genepy3d.obj.points import Points
if noisy==True:
P = self.coors_noise
else:
P = self.coors
pl.plot_point(ax,projection,P[0,0],P[0,1],P[0,2],scales,point_args={'c':'blue','s':50})
pl.plot_line(ax,projection,P[:,0],P[:,1],P[:,2],scales,line_args={'c':'green','alpha':0.7})
if show_interpoints==True:
if len(self.break_ids)>2:
pl.plot_point(ax,projection,P[self.break_ids[1:-1],0],P[self.break_ids[1:-1],1],P[self.break_ids[1:-1],2],scales,point_args={'c':'cyan','s':20})
for pid in self.plane_ids:
pl.plot_line(ax,projection,P[pid[0]:pid[1],0],P[pid[0]:pid[1],1],P[pid[0]:pid[1],2],scales,line_args={'c':'yellow','alpha':0.7})
if projection == '3d':
data = P[pid[0]:pid[1]]
c, normal = Points(data).fit_plane()
maxx = np.max(data[:,0])
maxy = np.max(data[:,1])
minx = np.min(data[:,0])
miny = np.min(data[:,1])
pnt = np.array([0.0, 0.0, c])
d = -pnt.dot(normal)
xx, yy = np.meshgrid([minx, maxx], [miny, maxy])
z = (-normal[0]*xx - normal[1]*yy - d)*1. / normal[2]
ax.plot_surface(xx, yy, z, color='yellow',alpha=0.4)
for lid in self.line_ids:
pl.plot_line(ax,projection,P[lid[0]:lid[1],0],P[lid[0]:lid[1],1],P[lid[0]:lid[1],2],scales,line_args={'c':(0.85,0.25,0.6),'alpha':1.0})
if projection != '3d':
ax.axis('equal')
else:
param = pl.fix_equal_axis(self.coors_noise / np.array(scales))
ax.set_xlim(param['xmin'],param['xmax'])
ax.set_ylim(param['ymin'],param['ymax'])
ax.set_zlim(param['zmin'],param['zmax'])
[docs]class RadiusScaleTable:
"""Compute scales table of a curve from given radius of curvature.
Attributes:
crv (Curve): curve to compute radius scale table.
r (float): reference radius of curvature.
mo (str): method used in gaussian convolution to treat the extreme values.
kerlen (float): gaussian kernel length.
"""
def __init__(self, crv, r, mo=None, kerlen=None):
self.crv = crv
assert r > 0, "reference radius must be positive"
self.kappa = 1./r
if mo is None:
self.mo = "nearest"
else:
self.mo = mo
if kerlen is None:
self.kerlen = 4.0
else:
self.kerlen = kerlen
self.kappa_tbl = self.build_scales_from_radius()
[docs] def build_scales_from_radius(self, max_iter=None):
"""Compute scales range from given reference radius.
Support func of intrinsic dimensions decomposition.
Args:
max_iter (int): number of convolution iteration. If None, it will be set = haft of number of points.
Retuns:
kappa_tbl (array (float)): curvature table.
"""
x, y, z = self.crv.coors[:,0], self.crv.coors[:,1], self.crv.coors[:,2]
xs, ys, zs = x.copy(), y.copy(), z.copy()
# npoints = self.crv.coors.shape[0]
if max_iter is None:
num_max = 1e4 # need to check this point
else:
num_max = max_iter
ite = 1
sig = 0
kappa_tbl = []
flag = True
# compute kappa table
while(flag):
if sig!=0:
# smooth curve with gaussian func
xs = gaussian_filter1d(x,sig,mode=self.mo,truncate=self.kerlen)
ys = gaussian_filter1d(y,sig,mode=self.mo,truncate=self.kerlen)
zs = gaussian_filter1d(z,sig,mode=self.mo,truncate=self.kerlen)
scaled_curve = Curve(coors=(xs,ys,zs))
cur = scaled_curve.compute_curvature()
kappa_tbl.append(cur)
sig += 1. # sigma step = 1
ix = np.argwhere(cur > self.kappa).flatten()
if len(ix)==0:
flag = False
if ite > num_max:
flag = False
else:
ite += 1
kappa_tbl = np.array(kappa_tbl).T # row is scale, col is point index
return kappa_tbl
[docs] def compute_scales_from_radius(self,r=None):
"""Compute scales range from given reference radius.
Support func of intrinsic dimensions decomposition.
Args:
r (float): reference radius.
Retuns:
sig_c (float): central value of sigma
delta_sig (float): std value of sigma
"""
if r is None:
check_kappa = self.kappa
else:
assert r > 0, "radius r must be positive"
assert r <= (1./self.kappa), "radius r must be <= {}".format(1./self.kappa)
check_kappa = 1./r # inverse of radius
npoints = self.crv.coors.shape[0]
# estimate sig_c and delta_sig
if(len(self.kappa_tbl[self.kappa_tbl<=check_kappa])==0):
raise Exception("failed to compute curvature table.")
selected_sig = []
for i in range(npoints):
ix = np.argwhere(self.kappa_tbl[i]<=check_kappa).flatten()
if len(ix)!=0:
selected_sig.append(ix[0]) # take the first scale
if len(selected_sig)!=0:
sig_c = np.mean(selected_sig) # compute sig_c
delta_sig = np.std(selected_sig) # compute delta_c
else:
raise Exception("failed to compute curvature table.")
if sig_c == 0:
delta_sig = 0
return (sig_c, delta_sig)
[docs]class RadiusScaleMap:
"""Compute mapping between curvature radius and scale.
This class is used to estimate a scale of interest for a given curvature radius.
The scale of interest is then used in ``decompose_intrinsicdim()`` of Curve to estimate the intrinsic segments.
Attributes:
l (int): sampled unit length.
rmin (int): min radius.
rmax (int): max radius.
smin (int): min scale.
smax (int): max scale.
mo (str): method used in gaussian convolution to treat the extreme values.
kerlen (float): gaussian kernel length.
save_circles (bool): if True, then save the scaled circle for each scale.
Note:
This class is DEPRECATED.
"""
def __init__(self,l=1,rmin=1,rmax=100,smin=0,smax=200,mo="nearest",kerlen=4.0,save_circles=False):
tol = 0.
rlst = np.arange(rmin,rmax+1)
slst = np.arange(smin,smax+1)
# compute sampled points
nrlst = (np.round((np.pi * rlst)/l)).astype(np.int)
ix = np.argwhere(nrlst<3).flatten()
nrlst[ix] = 3
rtab = np.zeros((len(rlst),len(slst)))
if save_circles==True:
circles = {}
for i in range(len(rlst)):
theta = np.linspace(0,np.pi,nrlst[i])
x = rlst[i] * np.cos(theta)
y = rlst[i] * np.sin(theta)
if save_circles==True:
tmp = {}
tmp[-1]= np.array([x,y]).T
for j in range(len(slst)):
if slst[j]==0:
xs, ys = x.copy(), y.copy()
else:
xs = gaussian_filter1d(x,slst[j],mode=mo,truncate=kerlen)
ys = gaussian_filter1d(y,slst[j],mode=mo,truncate=kerlen)
ps = np.array([xs,ys]).T
if save_circles==True:
tmp[j] = ps
kap = self._kappa(ps)
if kap <= tol:
rtab[i,j] = -1
else:
rtab[i,j] = 1 / kap
if save_circles==True:
circles[i] = tmp
# assign infinite rtab by the maximum
ix = np.argwhere(rtab==-1)
if len(ix)!=0:
rtab_max = np.max(rtab)
rtab[ix[:,0],ix[:,1]] = rtab_max
self.rlst = rlst
self.slst = slst
self.rtab = rtab
if save_circles==True:
self.circles = circles
def _kappa(self,p):
"""Compute curvatures of 2D curve.
This is the support function, the inverse of kappa is the curvature radius.
Args:
p (array of float): list of points on curve.
Returns:
array of float.
"""
def _derv(deg,p,dt=1):
dx, dy = p[:,0].copy(), p[:,1].copy()
for i in range(deg):
dx = np.gradient(dx,dt,edge_order=1)
dy = np.gradient(dy,dt,edge_order=1)
return np.array([dx,dy]).T
d1 = _derv(1,p)
d2 = _derv(2,p)
nomi = np.abs(d1[:,0]*d2[:,1] - d1[:,1]*d2[:,0])
deno = (d1[:,0]**2 + d1[:,1]**2)**(1.5)
res = np.zeros(len(deno))
idx = np.argwhere(deno!=0).flatten()
if len(idx)!=0:
res[idx] = nomi[idx]/deno[idx]
return np.mean(res)
[docs] def get_scale(self,r):
"""Estimate the scale of interest corresponding to given curvature radius.
Args:
r (float): curvature radius.
Returns:
A tuple including
- sig_c (float): scale of interest.
- delta_c (float): parameter specifying searching region around sig_c.
- rmask (array of float): binary mask of radius-scale map.
"""
rmask = (self.rtab >= r)*1. # create mask from lookup table
ids = np.argwhere(self.rlst<r).flatten() # find radius smaller than the referenced radius
if len(ids)==0: # if all is impossible
sig_c, delta_c = 0, 0
else:
selected_sig = []
for i in ids:
# check if all equal to 0, exclude it
if(len(np.argwhere(rmask[i]==0).flatten())<len(rmask[i])):
selected_sig.append(np.argmax(rmask[i]))
if len(selected_sig)!=0:
sig_c = np.median(selected_sig) # compute sig_c
delta_c = np.std(selected_sig) # compute delta_c
else:
sig_c, delta_c = -1, -1
return (sig_c,delta_c,rmask)
[docs]def decompose_intrinsicdim_sequential(p,eps_line,eps_plane,eps_seg):
"""Decompose sequentially intrinsic segments with priority line to plane to 3d.
from paper of Jianyu Yang: https://www.sciencedirect.com/science/article/pii/S1047320316300438.
This method was selected to compare with our decompose_intrinsicdim().
Args:
p (array of float): list of 3D points on curve.
eps_line (float): line threshold.
eps_plane (float): plane_threshold.
eps_seg (float): segment length threshold.
Returns:
list of line and plane indices.
"""
# support functions
# ---
def determinant(a,b,c,d):
ab = b - a
ac = c - a
ad = d - a
m = np.array([[ab[0], ab[1], ab[2]],
[ac[0], ac[1], ac[2]],
[ad[0], ad[1], ad[2]]])
return abs((1./6)*np.linalg.det(m))
def label_line(p,eps,seg_len):
n = len(p)
lbl = np.zeros(n,dtype=np.uint)
for i in range(1,n-1):
p1 = p[i]-p[i-1]
p2 = p[i+1]-p[i]
if((norm(p1)==0) | (norm(p2)==0)):
lbl[i]=0 # collapse points
else:
delta = 1. - ((np.dot(p1,p2)) / (norm(p1)*norm(p2)))
if delta <= eps:
lbl[i] = 1 # line
else:
lbl[i] = 0
# remove small segment
connected_compo, nb_compo = label(lbl)
if nb_compo!=0:
for j in range(1,nb_compo+1):
connected_idx = np.argwhere(connected_compo==j).flatten()
if len(connected_idx)<=seg_len:
lbl[connected_idx] = 0
return lbl
def label_plane(p,lbl,eps,seg_len):
n = len(p)
lbl2 = lbl.copy()
for i in range(1,n-2):
if((lbl2[i]==0) & (lbl2[i+1]==0)):
delta = determinant(p[i-1],p[i],p[i+1],p[i+2])
if delta <= eps:
lbl2[i] = 2 # plane
lbl2[i+1] = 2
# remove small segment
idx = np.argwhere(lbl2==2).flatten()
lbltmp = np.zeros(n,dtype=np.uint)
lbltmp[idx]=1
connected_compo, nb_compo = label(lbltmp)
if nb_compo!=0:
for j in range(1,nb_compo+1):
connected_idx = np.argwhere(connected_compo==j).flatten()
if len(connected_idx)<=seg_len:
lbl2[connected_idx] = 0
return lbl2
def get_segments(lbl):
data = []
for ilbl in [0,1,2]:
segs = []
indicator = np.zeros(len(lbl))
tmp = np.argwhere(lbl==ilbl).flatten()
indicator[tmp]=1
connected_compo, nb_compo = label(indicator)
if nb_compo!=0:
for j in range(1,nb_compo+1):
idx = list(np.argwhere(connected_compo==j).flatten())
if len(idx)!=0:
segs.append([idx[0],idx[-1]])
data.append(segs)
return data
# ---
lbl1 = label_line(p,eps_line,eps_seg)
lbl2 = label_plane(p,lbl1,eps_plane,eps_seg)
segs = get_segments(lbl2)
return segs
[docs]def align(target, ref):
"""Align target curve to reference curve.
This func is used to align two curves that minimize their spatial differences.
The func is useful as a preparation before computing the distance between the two curves.
E.g. we provided emd() in Points class to compute the Earth Mover's Distance between two point clouds.
Args:
target (Curve): target curve.
ref (Curve): reference curve.
Return:
aligned target curve.
"""
c1 = ref.coors
c2 = target.coors
# shifting
c1 = c1 - c1[0]
c2 = c2 - c2[0]
# rotating based on two base lines
v1 = c1[-1] - c1[0]
v1 = v1 / np.sqrt(np.sum(v1**2))
v2 = c2[-1] - c2[0]
v2 = v2 / np.sqrt(np.sum(v2**2))
u = np.cross(v1,v2)
u = u / np.sqrt(np.sum(u**2))
ux, uy, uz = u[0], u[1], u[2]
theta = np.arccos(np.dot(v1,v2))
stheta = np.sin(theta)
ctheta = np.cos(theta)
R = np.array([
[ctheta + ux**2*(1-ctheta), ux*uy*(1-ctheta)-uz*stheta, ux*uz*(1-ctheta)+uy*stheta],
[uy*ux*(1-ctheta)+uz*stheta, ctheta + uy**2*(1-ctheta), uy*uz*(1-ctheta)-ux*stheta],
[uz*ux*(1-ctheta)-uy*stheta, uz*uy*(1-ctheta)+ux*stheta, ctheta + uz**2*(1-ctheta)]
])
c2 = np.matmul(c2,R) # rotate c2 to c1 based on base line vectors v2 to v1
c2 = c2 + ref.coors[0]
return Curve(c2)