I am attempting to align two point clouds, one representing the upper portion of a rock and the other the lower part. These point clouds share a common section, which is the side of the rock. My objective is to merge or align these point clouds so that the common side is properly matched, and the object is accurately reconstructed and displayed in 3D. This is done well in revoScan5 but I would like to automate the process of alignment using Python. I would like the two point clouds to align like this
Thanks in advance for your help!
To begin, I downsampled the point clouds and removed outliers. Then, I rotated and translated the bottom scan to align it roughly in order to facilitate matching between the top and bottom parts.
Here's an image of the source and destination point clouds BEFORE alignement
Then, I tried three different methods to align the two point clouds:
1. The global registration method (Ransac and ICP) with open3d (http://www.open3d.org/docs/0.7.0/tutorial/Advanced/global_registration.html#global-registration)
I tried varying parameters but I keep obtaining a low fitness score and a bad matching of the two point clouds, i.e. the script is not able to reproduce the rock in 3D. I have tried with different rocks but so far I did not succeed.
Here's an image of the result after Ransac
Here's an image of the result after ICP
def execute_global_registration(source_down, target_down, source_fpfh,
target_fpfh, voxel_size):
distance_threshold = voxel_size * 1.5
print(":: RANSAC registration on downsampled point clouds.")
print(" Since the downsampling voxel size is %.3f," % voxel_size)
print(" we use a liberal distance threshold %.3f." % distance_threshold)
result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
source_down, target_down, source_fpfh, target_fpfh, True,
distance_threshold,
o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
3, [
o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(
0.9),
o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(
distance_threshold)
], o3d.pipelines.registration.RANSACConvergenceCriteria(1000000000, 0.999))
return result
def draw_registration_result(source, target, transformation, window_title):
source_temp = copy.deepcopy(source)
target_temp = copy.deepcopy(target)
source_temp.paint_uniform_color([1, 0.706, 0])
target_temp.paint_uniform_color([0, 0.651, 0.929])
source_temp.transform(transformation)
o3d.visualization.draw_geometries([source_temp, target_temp], window_name=window_title)
#draw_registration_result(source, target, np.identity(4))
def preprocess_point_cloud(pcd, voxel_size):
#print(":: Downsample with a voxel size %.3f." % voxel_size)
#pcd_down = pcd.voxel_down_sample(voxel_size) --> already downsampled in previous section
radius_normal = voxel_size * 1 #initially 2
print(":: Estimate normal with search radius %.3f." % radius_normal)
pcd.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=500))
radius_feature = voxel_size * 5
print(":: Compute FPFH feature with search radius %.3f." % radius_feature)
pcd_fpfh =o3d.pipelines.registration.compute_fpfh_feature(pcd, o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100))
return pcd, pcd_fpfh
def refine_registration(source, target, source_fpfh, target_fpfh, voxel_size):
distance_threshold = voxel_size * 0.6
result = o3d.pipelines.registration.registration_icp(
source, target, distance_threshold, result_ransac.transformation,
o3d.pipelines.registration.TransformationEstimationPointToPlane())
return result
source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
result_ransac = execute_global_registration(source_down, target_down,
source_fpfh, target_fpfh,
voxel_size)
o3d.visualization.draw_geometries([source, target], window_name='Before registration')
print('\n')
print(result_ransac)
draw_registration_result(source, target, result_ransac.transformation, 'Ransac result')
result_icp = refine_registration(source, target, source_fpfh, target_fpfh,
voxel_size)
print('\n')
print(result_icp)
draw_registration_result(source, target, result_icp.transformation, 'ICP result')
2. ICP
This method uses the vertices. After ICP, the source point cloud is more misaligned than before compared to the destination. Here's an image of the result after ICP
import numpy as np
from sklearn.neighbors import NearestNeighbors
import matplotlib.pyplot as plt
# Function to find the nearest neighbors between source and destination point clouds
def find_nearest_neighbors(src, dst):
neigh = NearestNeighbors(n_neighbors=1)
neigh.fit(dst)
distances, indices = neigh.kneighbors(src, return_distance=True)
return distances.flatten(), indices.flatten()
# Function to compute the best-fit transform between the aligned source and destination points
def best_fit_transform(src, dst, weights=None):
centroid_src = np.average(src, axis=0, weights=weights)
centroid_dst = np.average(dst, axis=0, weights=weights)
src_centered = src - centroid_src
dst_centered = dst - centroid_dst
H = np.dot((src_centered * weights[:, None]).T, dst_centered)
U, _, Vt = np.linalg.svd(H)
R = np.dot(Vt.T, U.T)
t = centroid_dst - np.dot(centroid_src, R)
return R, t
# The main ICP algorithm
def icp(src, dst, max_iterations=10000, tolerance=1e-6, reject_ratio=0.2):
prev_error = 0
for i in range(max_iterations):
distances, indices = find_nearest_neighbors(src, dst)
sorted_idx = np.argsort(distances)
reject_count = int(reject_ratio * len(sorted_idx))
src_filtered = src[sorted_idx[:-reject_count]]
indices_filtered = indices[sorted_idx[:-reject_count]]
dst_matched = dst[indices_filtered]
weights = np.exp(-distances[sorted_idx[:-reject_count]] ** 2)
R, t = best_fit_transform(src_filtered, dst_matched, weights)
src = np.dot(src, R.T) + t
mean_error = np.mean(distances)
if np.abs(prev_error - mean_error) < tolerance:
break
prev_error = mean_error
return src
# Function to plot point clouds
def plot_point_clouds(src, dst, title):
fig = plt.figure(figsize=(15, 10))
ax = fig.add_subplot(111, projection='3d')
ax.scatter(src[:, 0], src[:, 1], src[:, 2], c='r', marker='o', label='Source', s=s)
ax.scatter(dst[:, 0], dst[:, 1], dst[:, 2], c='b', marker='^', label='Destination', s=s)
ax.set_title(title)
ax.legend()
plt.show()
# Extract the vertices (3D points)
vertices_source = np.asarray(source.points)
# Extract the vertices (3D points)
vertices_target = np.asarray(target.points)
# Plot point clouds before ICP
plot_point_clouds(vertices_source, vertices_target, 'Before ICP')
# Apply ICP to align the source point cloud to the destination
aligned_src = icp(vertices_source, vertices_target)
# Plot point clouds after ICP
plot_point_clouds(aligned_src, vertices_target, 'After ICP')
3. CPD
Last, I tried using the CPD method. However, I get this error LinAlgError: SVD did not converge
indicating that the model did not converge. I increased the iteration number (max_iterations) a lot but it still does not converge.
# Function to compute Gaussian kernel matrix between two sets of vectors
def gaussian_kernel(x, y, beta):
"""Computes Gaussian kernel between x and y with width beta"""
return np.exp(-beta * np.sum((x[:, None] - y[None, :]) ** 2, axis=2))
# Main function for Coherent Point Drift (CPD) algorithm
def cpd_rigid(X, Y, max_iterations=200, tolerance=1e-6, w=0.0, beta=2.0):
"""Performs rigid CPD between source X and target Y"""
N, D = X.shape # Number of points and dimensions in source
M = Y.shape[0] # Number of points in target
# Initialize sigma^2
sigma2 = np.sum((X - np.mean(X, axis=0)) ** 2) + np.sum((Y - np.mean(Y, axis=0)) ** 2)
sigma2 /= (D * (N + M))
TY = np.copy(Y) # Transformed target point cloud
P = np zeros((M, N)) # Matrix for storing correspondence probabilities
prev_error = 0 # Variable to store error from previous iteration
# Iterative optimization
for i in range(max_iterations):
# E-step: Compute correspondence probabilities using Gaussian kernel
P = gaussian_kernel(X, TY, 1 / (2 * sigma2))
C = np.sum(P, axis=0)
P /= C[None, :] # Normalize probabilities
# M-step: Update the transformation parameters
Np = np.sum(P) # Normalization factor
muX = np.sum(np.dot(P, X), axis=0) / Np # Weighted mean of source points
muY = np.sum(TY * np.sum(P, axis=1)[:, None], axis=0) / Np # Weighted mean of target points
# Centered matrices
X_hat = X - muX
Y_hat = TY - muY
# Compute new transformation using SVD
A = np.dot(X_hat.T, np.dot(np.diag(np.sum(P, axis=0)), Y_hat)) / Np
U, _, V = np.linalg.svd(A)
R = np.dot(U, V)
t = muX - np.dot(R, muY)
# Update the model to transform the target point cloud
TY = np.dot(R, Y.T).T + t
# Update sigma^2
error = np.sum(np.sum(P * np.sum((X[:, None] - TY[None, :]) ** 2, axis=2), axis=0)) / Np
sigma2 = (1 - w) * error / (D * Np)
# Check for convergence based on the change in error
if np.abs(prev_error - error) < tolerance:
break
prev_error = error
return TY
# Function to plot point clouds
def plot_point_clouds(src, dst, title):
fig = plt.figure(figsize=(15, 10))
ax = fig.add_subplot(111, projection='3d')
ax.scatter(src[:, 0], src[:, 1], src[:, 2], c='r', marker='o', label='Source', s=s)
ax.scatter(dst[:, 0], dst[:, 1], dst[:, 2], c='b', marker='^', label='Destination', s=s)
ax.set_title(title)
ax.legend()
plt.show()
# Assuming you have matrices vertices_s and vertices_t
num_rows_to_remove = vertices_t.shape[0] - vertices_s.shape[0]
# Slice vertices_t to remove the extra rows
vertices_t_trimmed = vertices_t[:-num_rows_to_remove, :]
# Plot point clouds before alignment
plot_point_clouds(vertices_s, vertices_t_trimmed, 'Before CPD')
# Perform CPD alignment
aligned_X = cpd_rigid(vertices_s, vertices_t_trimmed)
# Plot point clouds after alignment
plot_point_clouds(aligned_X, vertices_t_trimmed, 'After CPD')