Spaces:
Sleeping
Sleeping
| import logging | |
| import math | |
| import random | |
| from collections import namedtuple | |
| import cv2 | |
| import numpy as np | |
| from scipy.spatial.transform import Rotation | |
| _logger = logging.getLogger(__name__) | |
| _logger.setLevel(logging.DEBUG) | |
| def kabsch(pts1, pts2, estimate_scale=False): | |
| c_pts1 = pts1 - pts1.mean(axis=0) | |
| c_pts2 = pts2 - pts2.mean(axis=0) | |
| covariance = np.matmul(c_pts1.T, c_pts2) / c_pts1.shape[0] | |
| U, S, VT = np.linalg.svd(covariance) | |
| d = np.sign(np.linalg.det(np.matmul(VT.T, U.T))) | |
| correction = np.eye(3) | |
| correction[2, 2] = d | |
| if estimate_scale: | |
| pts_var = np.mean(np.linalg.norm(c_pts2, axis=1) ** 2) | |
| scale_factor = pts_var / np.trace(S * correction) | |
| else: | |
| scale_factor = 1.0 | |
| R = scale_factor * np.matmul(np.matmul(VT.T, correction), U.T) | |
| t = pts2.mean(axis=0) - np.matmul(R, pts1.mean(axis=0)) | |
| T = np.eye(4) | |
| T[:3, :3] = R | |
| T[:3, 3] = t | |
| return T, scale_factor | |
| def get_inliers(h_T, poses_gt, poses_est, inlier_threshold_t, inlier_threshold_r): | |
| # h_T aligns ground truth poses with estimates poses | |
| poses_gt_transformed = h_T @ poses_gt | |
| # calculate differences in position and rotations | |
| translations_delta = poses_gt_transformed[:, :3, 3] - poses_est[:, :3, 3] | |
| rotations_delta = poses_gt_transformed[:, :3, :3] @ poses_est[:, :3, :3].transpose( | |
| [0, 2, 1] | |
| ) | |
| # translation inliers | |
| inliers_t = np.linalg.norm(translations_delta, axis=1) < inlier_threshold_t | |
| # rotation inliers | |
| inliers_r = Rotation.from_matrix(rotations_delta).magnitude() < ( | |
| inlier_threshold_r / 180 * math.pi | |
| ) | |
| # intersection of both | |
| return np.logical_and(inliers_r, inliers_t) | |
| def print_hyp(hypothesis, hyp_name): | |
| h_translation = np.linalg.norm(hypothesis["transformation"][:3, 3]) | |
| h_angle = ( | |
| np.linalg.norm( | |
| Rotation.from_matrix(hypothesis["transformation"][:3, :3]).as_rotvec() | |
| ) | |
| * 180 | |
| / math.pi | |
| ) | |
| print( | |
| f"{hyp_name}: score={hypothesis['score']}, translation={h_translation:.2f}m, " | |
| f"rotation={h_angle:.1f}deg." | |
| ) | |
| def estimated_alignment( | |
| pose_est, | |
| pose_gt, | |
| inlier_threshold_t=0.05, | |
| inlier_threshold_r=5, | |
| ransac_iterations=1000, | |
| refinement_max_hyp=12, | |
| refinement_max_it=8, | |
| estimate_scale=False, | |
| ): | |
| n_pose = len(pose_est) | |
| ransac_hypotheses = [] | |
| for i in range(ransac_iterations): | |
| min_sample_size = 3 | |
| samples = random.sample(range(n_pose), min_sample_size) | |
| h_pts1 = pose_gt[samples, :3, 3] | |
| h_pts2 = pose_est[samples, :3, 3] | |
| h_T, h_scale = kabsch(h_pts1, h_pts2, estimate_scale=estimate_scale) | |
| inliers = get_inliers( | |
| h_T, pose_gt, pose_est, inlier_threshold_t, inlier_threshold_r | |
| ) | |
| if inliers[samples].sum() >= 3: | |
| # only keep hypotheses if minimal sample is all inliers | |
| ransac_hypotheses.append( | |
| { | |
| "transformation": h_T, | |
| "inliers": inliers, | |
| "score": inliers.sum(), | |
| "scale": h_scale, | |
| } | |
| ) | |
| if len(ransac_hypotheses) == 0: | |
| print( | |
| f"Did not fine a single valid RANSAC hypothesis, abort alignment estimation." | |
| ) | |
| return None, 1 | |
| # sort according to score | |
| ransac_hypotheses = sorted( | |
| ransac_hypotheses, key=lambda x: x["score"], reverse=True | |
| ) | |
| # for hyp_idx, hyp in enumerate(ransac_hypotheses): | |
| # print_hyp(hyp, f"Hypothesis {hyp_idx}") | |
| # create shortlist of best hypotheses for refinement | |
| # print(f"Starting refinement of {refinement_max_hyp} best hypotheses.") | |
| ransac_hypotheses = ransac_hypotheses[:refinement_max_hyp] | |
| # refine all hypotheses in the short list | |
| for ref_hyp in ransac_hypotheses: | |
| # print_hyp(ref_hyp, "Pre-Refinement") | |
| # refinement loop | |
| for ref_it in range(refinement_max_it): | |
| # re-solve alignment on all inliers | |
| h_pts1 = pose_gt[ref_hyp["inliers"], :3, 3] | |
| h_pts2 = pose_est[ref_hyp["inliers"], :3, 3] | |
| h_T, h_scale = kabsch(h_pts1, h_pts2, estimate_scale) | |
| # calculate new inliers | |
| inliers = get_inliers( | |
| h_T, pose_gt, pose_est, inlier_threshold_t, inlier_threshold_r | |
| ) | |
| # check whether hypothesis score improved | |
| refined_score = inliers.sum() | |
| if refined_score > ref_hyp["score"]: | |
| ref_hyp["transformation"] = h_T | |
| ref_hyp["inliers"] = inliers | |
| ref_hyp["score"] = refined_score | |
| ref_hyp["scale"] = h_scale | |
| # print_hyp(ref_hyp, f"Refinement interation {ref_it}") | |
| else: | |
| # print(f"Stopping refinement. Score did not improve: New score={refined_score}, " | |
| # f"Old score={ref_hyp['score']}") | |
| break | |
| # re-sort refined hyotheses | |
| ransac_hypotheses = sorted( | |
| ransac_hypotheses, key=lambda x: x["score"], reverse=True | |
| ) | |
| # for hyp_idx, hyp in enumerate(ransac_hypotheses): | |
| # print_hyp(hyp, f"Hypothesis {hyp_idx}") | |
| return ransac_hypotheses[0]["transformation"], ransac_hypotheses[0]["scale"] | |
| def eval_pose_ransac(gt, est, t_thres=0.05, r_thres=5, aligned=True, save_dir=None): | |
| if aligned: | |
| alignment_transformation, alignment_scale = estimated_alignment( | |
| est, | |
| gt, | |
| inlier_threshold_t=0.05, | |
| inlier_threshold_r=5, | |
| ransac_iterations=1000, | |
| refinement_max_hyp=12, | |
| refinement_max_it=8, | |
| estimate_scale=True, | |
| ) | |
| if alignment_transformation is None: | |
| _logger.info( | |
| f"Alignment requested but failed. Setting all pose errors to {math.inf}." | |
| ) | |
| else: | |
| alignment_transformation = np.eye(4) | |
| alignment_scale = 1.0 | |
| # Evaluation Loop | |
| rErrs = [] | |
| tErrs = [] | |
| accuracy = 0 | |
| r_acc_5 = 0 | |
| r_acc_2 = 0 | |
| r_acc_1 = 0 | |
| t_acc_15 = 0 | |
| t_acc_10 = 0 | |
| t_acc_5 = 0 | |
| t_acc_2 = 0 | |
| t_acc_1 = 0 | |
| acc_10 = 0 | |
| acc_5 = 0 | |
| acc_2 = 0 | |
| acc_1 = 0 | |
| for pred_pose, gt_pose in zip(est, gt): | |
| if alignment_transformation is not None: | |
| # Apply alignment transformation to GT pose | |
| gt_pose = alignment_transformation @ gt_pose | |
| # Calculate translation error. | |
| t_err = float(np.linalg.norm(gt_pose[0:3, 3] - pred_pose[0:3, 3])) | |
| # Correct translation scale with the inverse alignment scale (since we align GT with estimates) | |
| t_err = t_err / alignment_scale | |
| # Rotation error. | |
| gt_R = gt_pose[0:3, 0:3] | |
| out_R = pred_pose[0:3, 0:3] | |
| r_err = np.matmul(out_R, np.transpose(gt_R)) | |
| # Compute angle-axis representation. | |
| r_err = cv2.Rodrigues(r_err)[0] | |
| # Extract the angle. | |
| r_err = np.linalg.norm(r_err) * 180 / math.pi | |
| else: | |
| pose_gt = None | |
| t_err, r_err = math.inf, math.inf | |
| # _logger.info(f"Rotation Error: {r_err:.2f}deg, Translation Error: {t_err * 100:.1f}cm") | |
| # Save the errors. | |
| rErrs.append(r_err) | |
| tErrs.append(t_err * 100) # in cm | |
| # Check various thresholds. | |
| if r_err < r_thres and t_err < t_thres: | |
| accuracy += 1 | |
| if r_err < 5: | |
| r_acc_5 += 1 | |
| if r_err < 2: | |
| r_acc_2 += 1 | |
| if r_err < 1: | |
| r_acc_1 += 1 | |
| if t_err < 0.15: | |
| t_acc_15 += 1 | |
| if t_err < 0.10: | |
| t_acc_10 += 1 | |
| if t_err < 0.05: | |
| t_acc_5 += 1 | |
| if t_err < 0.02: | |
| t_acc_2 += 1 | |
| if t_err < 0.01: | |
| t_acc_1 += 1 | |
| if r_err < 10 and t_err < 0.10: | |
| acc_10 += 1 | |
| if r_err < 5 and t_err < 0.05: | |
| acc_5 += 1 | |
| if r_err < 2 and t_err < 0.02: | |
| acc_2 += 1 | |
| if r_err < 1 and t_err < 0.01: | |
| acc_1 += 1 | |
| total_frames = len(rErrs) | |
| assert total_frames == len(est) | |
| # Compute median errors. | |
| tErrs.sort() | |
| rErrs.sort() | |
| median_idx = total_frames // 2 | |
| median_rErr = rErrs[median_idx] | |
| median_tErr = tErrs[median_idx] | |
| # Compute final precision. | |
| accuracy = accuracy / total_frames * 100 | |
| r_acc_5 = r_acc_5 / total_frames * 100 | |
| r_acc_2 = r_acc_2 / total_frames * 100 | |
| r_acc_1 = r_acc_1 / total_frames * 100 | |
| t_acc_15 = t_acc_15 / total_frames * 100 | |
| t_acc_10 = t_acc_10 / total_frames * 100 | |
| t_acc_5 = t_acc_5 / total_frames * 100 | |
| t_acc_2 = t_acc_2 / total_frames * 100 | |
| t_acc_1 = t_acc_1 / total_frames * 100 | |
| acc_10 = acc_10 / total_frames * 100 | |
| acc_5 = acc_5 / total_frames * 100 | |
| acc_2 = acc_2 / total_frames * 100 | |
| acc_1 = acc_1 / total_frames * 100 | |
| # _logger.info("===================================================") | |
| # _logger.info("Test complete.") | |
| # _logger.info(f'Accuracy: {accuracy:.1f}%') | |
| # _logger.info(f"Median Error: {median_rErr:.1f}deg, {median_tErr:.1f}cm") | |
| # print("===================================================") | |
| # print("Test complete.") | |
| with open(save_dir, "w") as f: | |
| f.write(f"Accuracy: {accuracy:.1f}%\n\n") | |
| f.write(f"Median Error: {median_rErr:.1f}deg, {median_tErr:.1f}cm\n") | |
| f.write(f"R acc 5: {r_acc_5:.1f}%\n") | |
| f.write(f"R acc 2: {r_acc_2:.1f}%\n") | |
| f.write(f"R acc 1: {r_acc_1:.1f}%\n") | |
| f.write(f"T acc 15: {t_acc_15:.1f}%\n") | |
| f.write(f"T acc 10: {t_acc_10:.1f}%\n") | |
| f.write(f"T acc 5: {t_acc_5:.1f}%\n") | |
| f.write(f"T acc 2: {t_acc_2:.1f}%\n") | |
| f.write(f"T acc 1: {t_acc_1:.1f}%\n") | |
| f.write(f"Acc 10: {acc_10:.1f}%\n") | |
| f.write(f"Acc 5: {acc_5:.1f}%\n") | |
| f.write(f"Acc 2: {acc_2:.1f}%\n") | |
| f.write(f"Acc 1: {acc_1:.1f}%\n") | |