src/feature_tracking_motion_displacement/visual_odometer.py
"""
Note: This code is heavily based upon the C# implementation developed by Dr Rainer Hessmer (http://www.hessmer.org/blog/2010/08/17/monocular-visual-odometry/comment-page-1/)
"""
from __future__ import division
__author__ = 'connorgoddard'
import cv2
import numpy as np
from optical_flow import OpticalFlow
from tracked_feature import TrackedFeature
class VisualOdometer:
_optical_flow = OpticalFlow.load_defaults()
_current_gray_image = None
_prev_gray_image = None
_raw_tracked_feature_points = []
_tracked_features = []
_initial_features_count = 0
_feature_repopulation_threshold = 10
_not_tracked_feature_count = 0
def __init__(self):
pass
def process_image(self, new_image_filepath):
self._prev_gray_image = self._current_gray_image
new_image = cv2.imread(new_image_filepath)
self._current_gray_image = cv2.cvtColor(new_image, cv2.COLOR_BGR2GRAY)
if self._prev_gray_image is None:
self.repopulate_feature_points(self._current_gray_image)
return
# cv2.imshow("prev", self._prev_gray_image)
# cv2.imshow("current", self._current_gray_image)
self.track_features()
if len(self._tracked_features) < self._feature_repopulation_threshold:
print "We need to re-populate features!"
self.repopulate_feature_points(self._current_gray_image)
def track_features(self):
if len(self._raw_tracked_feature_points) == 0:
return
tracked_feature_points, tracking_status_indicators, tracking_errors = self._optical_flow.calc_optical_flow(self._prev_gray_image, self._current_gray_image, np.array(self._raw_tracked_feature_points))
# Clear the current list of tracked feature points ready to replace with the new ones.
self._raw_tracked_feature_points = []
self._raw_tracked_feature_points.extend(tracked_feature_points)
full_history_features_count = 0
unsmooth_features_count = 0
for i in reversed(xrange(len(tracked_feature_points))):
is_tracked = (tracking_status_indicators[i] == 1)
if is_tracked:
existing_tracked_feature = self._tracked_features[i]
existing_tracked_feature.add(tracked_feature_points[i])
if existing_tracked_feature.is_full():
full_history_features_count += 1
if existing_tracked_feature.is_smooth() is False:
unsmooth_features_count += 1
else:
self.remove_tracked_feature(i)
if unsmooth_features_count < (full_history_features_count / 2):
# The majority of features is smooth. We downgrade unsmooth features
self.apply_unsmooth_grades()
def apply_unsmooth_grades(self):
unsmooth_features_out_count = 0
# for i, tracked_feature in enumerate(self._tracked_features):
for i in reversed(xrange(len(self._tracked_features))):
tracked_feature = self._tracked_features[i]
tracked_feature.apply_score_change()
if tracked_feature.is_out():
self.remove_tracked_feature(i)
unsmooth_features_out_count += 1
def remove_tracked_feature(self, index):
self._not_tracked_feature_count += 1
# Remove the feature from the two lists at the specified index.
self._tracked_features.pop(index)
self._raw_tracked_feature_points.pop(index)
def repopulate_feature_points(self, gray_image):
new_raw_tracked_feature_points = self._optical_flow.find_features_to_track(gray_image)
if len(new_raw_tracked_feature_points) == 0:
print "ERROR: No feature points to track."
return
self._raw_tracked_feature_points.extend(new_raw_tracked_feature_points)
for new_feature_point in new_raw_tracked_feature_points:
tracked_feature = TrackedFeature()
tracked_feature.add(new_feature_point)
self._tracked_features.append(tracked_feature)
self._initial_features_count = len(self._tracked_features)
self._feature_repopulation_threshold = (self._initial_features_count * 9) / 10
if self._feature_repopulation_threshold < 100:
self._feature_repopulation_threshold = 100
self._not_tracked_feature_count = 0
def draw_feature_locations_previous_current(tracked_features, image_canvas):
for tracked_feature in tracked_features:
draw_current_feature(tracked_feature.get_value(0), tracked_feature.is_full(), image_canvas)
if tracked_feature.get_full_count() > 1:
draw_previous_feature(tracked_feature.get_value(-1), tracked_feature.is_full(), image_canvas)
cv2.line(image_canvas, (tracked_feature.get_value(-1)[0][0], tracked_feature.get_value(-1)[0][1]), (tracked_feature.get_value(0)[0][0], tracked_feature.get_value(0)[0][1]), (255, 0, 0), 2)
def draw_previous_feature(previous_feature_location, has_full_history, image_canvas):
if has_full_history:
# red
cv2.circle(image_canvas, (previous_feature_location[0][0], previous_feature_location[0][1]), 5, (0, 0, 255), -1)
else:
# yellow
cv2.circle(image_canvas, (previous_feature_location[0][0], previous_feature_location[0][1]), 5, (0, 255, 255), -1)
def draw_current_feature(current_feature_location, has_full_history, image_canvas):
if has_full_history:
# green
cv2.circle(image_canvas, (current_feature_location[0][0], current_feature_location[0][1]), 5, (0, 255, 0), -1)
else:
# orange
cv2.circle(image_canvas, (current_feature_location[0][0], current_feature_location[0][1]), 5, (0, 128, 255), -1)
def main():
images = ["../eval_data/motion_images/wiltshire_outside_10cm/IMG1.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG2.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG3.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG4.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG5.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG6.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG7.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG8.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG9.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG10.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG11.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG12.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG13.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG14.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG15.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG16.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG17.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG18.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG19.JPG",
"../eval_data/motion_images/wiltshire_outside_10cm/IMG20.JPG"]
images2 = ["../eval_data/motion_images/flat_10cm/IMG1.JPG",
"../eval_data/motion_images/flat_10cm/IMG2.JPG",
"../eval_data/motion_images/flat_10cm/IMG3.JPG",
"../eval_data/motion_images/flat_10cm/IMG4.JPG",
"../eval_data/motion_images/flat_10cm/IMG5.JPG",
"../eval_data/motion_images/flat_10cm/IMG6.JPG",
"../eval_data/motion_images/flat_10cm/IMG7.JPG",
"../eval_data/motion_images/flat_10cm/IMG8.JPG",
"../eval_data/motion_images/flat_10cm/IMG9.JPG",
"../eval_data/motion_images/flat_10cm/IMG10.JPG",
"../eval_data/motion_images/flat_10cm/IMG11.JPG",
"../eval_data/motion_images/flat_10cm/IMG12.JPG"]
vo = VisualOdometer()
for image_filepath in images:
new_image = cv2.imread(image_filepath)
vo.process_image(image_filepath)
draw_feature_locations_previous_current(vo._tracked_features, new_image)
cv2.imshow("result", new_image)
cv2.waitKey(2000)
cv2.waitKey()
if __name__ == "__main__":
main()