How can I merge / align point clouds from scans (Lidar) to reconstruct an object in 3D in Python?

320 views Asked by At

I am attempting to align two point clouds, one representing the upper portion of a rock and the other the lower part. These point clouds share a common section, which is the side of the rock. My objective is to merge or align these point clouds so that the common side is properly matched, and the object is accurately reconstructed and displayed in 3D. This is done well in revoScan5 but I would like to automate the process of alignment using Python. I would like the two point clouds to align like this

Thanks in advance for your help!

To begin, I downsampled the point clouds and removed outliers. Then, I rotated and translated the bottom scan to align it roughly in order to facilitate matching between the top and bottom parts.

Here's an image of the source and destination point clouds BEFORE alignement

Then, I tried three different methods to align the two point clouds:

1. The global registration method (Ransac and ICP) with open3d (http://www.open3d.org/docs/0.7.0/tutorial/Advanced/global_registration.html#global-registration)

I tried varying parameters but I keep obtaining a low fitness score and a bad matching of the two point clouds, i.e. the script is not able to reproduce the rock in 3D. I have tried with different rocks but so far I did not succeed.

Here's an image of the result after Ransac

Here's an image of the result after ICP

def execute_global_registration(source_down, target_down, source_fpfh,
                                target_fpfh, voxel_size):
    distance_threshold = voxel_size * 1.5
    print(":: RANSAC registration on downsampled point clouds.")
    print("   Since the downsampling voxel size is %.3f," % voxel_size)
    print("   we use a liberal distance threshold %.3f." % distance_threshold)
    result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source_down, target_down, source_fpfh, target_fpfh, True,
        distance_threshold,
        o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
        3, [
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(
                0.9),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(
                distance_threshold)
        ], o3d.pipelines.registration.RANSACConvergenceCriteria(1000000000, 0.999))
    return result

def draw_registration_result(source, target, transformation, window_title):
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.paint_uniform_color([1, 0.706, 0])
    target_temp.paint_uniform_color([0, 0.651, 0.929])
    source_temp.transform(transformation)
    o3d.visualization.draw_geometries([source_temp, target_temp], window_name=window_title)
    
#draw_registration_result(source, target, np.identity(4))

def preprocess_point_cloud(pcd, voxel_size):
    #print(":: Downsample with a voxel size %.3f." % voxel_size)
    #pcd_down = pcd.voxel_down_sample(voxel_size) --> already downsampled in previous section

    radius_normal = voxel_size * 1 #initially 2
    print(":: Estimate normal with search radius %.3f." % radius_normal)
    pcd.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=500))

    radius_feature = voxel_size * 5
    print(":: Compute FPFH feature with search radius %.3f." % radius_feature)
    pcd_fpfh =o3d.pipelines.registration.compute_fpfh_feature(pcd, o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100))
    return pcd, pcd_fpfh

def refine_registration(source, target, source_fpfh, target_fpfh, voxel_size):
    distance_threshold = voxel_size * 0.6
    result = o3d.pipelines.registration.registration_icp(
        source, target, distance_threshold, result_ransac.transformation,
        o3d.pipelines.registration.TransformationEstimationPointToPlane())
    return result

source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)


result_ransac = execute_global_registration(source_down, target_down,
                                            source_fpfh, target_fpfh,
                                            voxel_size)

o3d.visualization.draw_geometries([source, target], window_name='Before registration')
print('\n')
print(result_ransac)
draw_registration_result(source, target, result_ransac.transformation, 'Ransac result')

result_icp = refine_registration(source, target, source_fpfh, target_fpfh,
                                 voxel_size)
print('\n')
print(result_icp)
draw_registration_result(source, target, result_icp.transformation, 'ICP result')

2. ICP

This method uses the vertices. After ICP, the source point cloud is more misaligned than before compared to the destination. Here's an image of the result after ICP

import numpy as np
from sklearn.neighbors import NearestNeighbors
import matplotlib.pyplot as plt

# Function to find the nearest neighbors between source and destination point clouds
def find_nearest_neighbors(src, dst):
    neigh = NearestNeighbors(n_neighbors=1)
    neigh.fit(dst)
    distances, indices = neigh.kneighbors(src, return_distance=True)
    return distances.flatten(), indices.flatten()

# Function to compute the best-fit transform between the aligned source and destination points
def best_fit_transform(src, dst, weights=None):
    centroid_src = np.average(src, axis=0, weights=weights)
    centroid_dst = np.average(dst, axis=0, weights=weights)
    src_centered = src - centroid_src
    dst_centered = dst - centroid_dst
    H = np.dot((src_centered * weights[:, None]).T, dst_centered)
    U, _, Vt = np.linalg.svd(H)
    R = np.dot(Vt.T, U.T)
    t = centroid_dst - np.dot(centroid_src, R)
    return R, t

# The main ICP algorithm
def icp(src, dst, max_iterations=10000, tolerance=1e-6, reject_ratio=0.2):
    prev_error = 0
    for i in range(max_iterations):
        distances, indices = find_nearest_neighbors(src, dst)
        sorted_idx = np.argsort(distances)
        reject_count = int(reject_ratio * len(sorted_idx))
        src_filtered = src[sorted_idx[:-reject_count]]
        indices_filtered = indices[sorted_idx[:-reject_count]]
        dst_matched = dst[indices_filtered]
        weights = np.exp(-distances[sorted_idx[:-reject_count]] ** 2)
        R, t = best_fit_transform(src_filtered, dst_matched, weights)
        src = np.dot(src, R.T) + t
        mean_error = np.mean(distances)
        if np.abs(prev_error - mean_error) < tolerance:
            break
        prev_error = mean_error
    return src

# Function to plot point clouds
def plot_point_clouds(src, dst, title):
    fig = plt.figure(figsize=(15, 10))
    ax = fig.add_subplot(111, projection='3d')
    ax.scatter(src[:, 0], src[:, 1], src[:, 2], c='r', marker='o', label='Source', s=s)
    ax.scatter(dst[:, 0], dst[:, 1], dst[:, 2], c='b', marker='^', label='Destination', s=s)
    ax.set_title(title)
    ax.legend()
    plt.show()

# Extract the vertices (3D points)
vertices_source = np.asarray(source.points)

# Extract the vertices (3D points)
vertices_target = np.asarray(target.points)

# Plot point clouds before ICP
plot_point_clouds(vertices_source, vertices_target, 'Before ICP')

# Apply ICP to align the source point cloud to the destination
aligned_src = icp(vertices_source, vertices_target)

# Plot point clouds after ICP
plot_point_clouds(aligned_src, vertices_target, 'After ICP')

3. CPD

Last, I tried using the CPD method. However, I get this error LinAlgError: SVD did not converge indicating that the model did not converge. I increased the iteration number (max_iterations) a lot but it still does not converge.

# Function to compute Gaussian kernel matrix between two sets of vectors
def gaussian_kernel(x, y, beta):
    """Computes Gaussian kernel between x and y with width beta"""
    return np.exp(-beta * np.sum((x[:, None] - y[None, :]) ** 2, axis=2))

# Main function for Coherent Point Drift (CPD) algorithm
def cpd_rigid(X, Y, max_iterations=200, tolerance=1e-6, w=0.0, beta=2.0):
    """Performs rigid CPD between source X and target Y"""
    N, D = X.shape  # Number of points and dimensions in source
    M = Y.shape[0]  # Number of points in target

    # Initialize sigma^2
    sigma2 = np.sum((X - np.mean(X, axis=0)) ** 2) + np.sum((Y - np.mean(Y, axis=0)) ** 2)
    sigma2 /= (D * (N + M))

    TY = np.copy(Y)  # Transformed target point cloud
    P = np zeros((M, N))  # Matrix for storing correspondence probabilities
    prev_error = 0  # Variable to store error from previous iteration

    # Iterative optimization
    for i in range(max_iterations):
        # E-step: Compute correspondence probabilities using Gaussian kernel
        P = gaussian_kernel(X, TY, 1 / (2 * sigma2))
        C = np.sum(P, axis=0)
        P /= C[None, :]  # Normalize probabilities

        # M-step: Update the transformation parameters
        Np = np.sum(P)  # Normalization factor
        muX = np.sum(np.dot(P, X), axis=0) / Np  # Weighted mean of source points
        muY = np.sum(TY * np.sum(P, axis=1)[:, None], axis=0) / Np  # Weighted mean of target points

        # Centered matrices
        X_hat = X - muX
        Y_hat = TY - muY

        # Compute new transformation using SVD
        A = np.dot(X_hat.T, np.dot(np.diag(np.sum(P, axis=0)), Y_hat)) / Np
        U, _, V = np.linalg.svd(A)
        R = np.dot(U, V)
        t = muX - np.dot(R, muY)

        # Update the model to transform the target point cloud
        TY = np.dot(R, Y.T).T + t

        # Update sigma^2
        error = np.sum(np.sum(P * np.sum((X[:, None] - TY[None, :]) ** 2, axis=2), axis=0)) / Np
        sigma2 = (1 - w) * error / (D * Np)

        # Check for convergence based on the change in error
        if np.abs(prev_error - error) < tolerance:
            break
        prev_error = error

    return TY

# Function to plot point clouds
def plot_point_clouds(src, dst, title):
    fig = plt.figure(figsize=(15, 10))
    ax = fig.add_subplot(111, projection='3d')
    ax.scatter(src[:, 0], src[:, 1], src[:, 2], c='r', marker='o', label='Source', s=s)
    ax.scatter(dst[:, 0], dst[:, 1], dst[:, 2], c='b', marker='^', label='Destination', s=s)
    ax.set_title(title)
    ax.legend()
    plt.show()

# Assuming you have matrices vertices_s and vertices_t
num_rows_to_remove = vertices_t.shape[0] - vertices_s.shape[0]

# Slice vertices_t to remove the extra rows
vertices_t_trimmed = vertices_t[:-num_rows_to_remove, :]

# Plot point clouds before alignment
plot_point_clouds(vertices_s, vertices_t_trimmed, 'Before CPD')

# Perform CPD alignment
aligned_X = cpd_rigid(vertices_s, vertices_t_trimmed)

# Plot point clouds after alignment
plot_point_clouds(aligned_X, vertices_t_trimmed, 'After CPD')
0

There are 0 answers