src/amberdriver/drive_to_point/drive_to_point.py
import logging
import logging.config
import threading
import time
import math
import traceback
import os
from ambercommon.common import runtime
from amberclient.common.listener import Listener
from amberdriver.drive_support import drive_support_logic
from amberdriver.drive_to_point import drive_to_point_logic
from amberdriver.tools import logic, config
__author__ = 'paoolo'
pwd = os.path.dirname(os.path.abspath(__file__))
logging.config.fileConfig('%s/drive_to_point.ini' % pwd)
config.add_config_ini('%s/drive_to_point.ini' % pwd)
LOGGER_NAME = 'DriveToPoint'
MAX_SPEED = float(config.DRIVE_TO_POINT_MAX_SPEED)
ROBO_WIDTH = float(config.ROBO_WIDTH)
MAXIMUM_TIME_DRIVE_TO = float(config.MAXIMUM_TIME_DRIVE_TO)
class ScanHandler(Listener):
def __init__(self, driver):
self.__drive_to_point = driver
def handle(self, response):
self.__drive_to_point.set_scan(response)
class DriveToPoint(object):
TIMESTAMP_FIELD = 4
def __init__(self, driver_proxy, location_proxy, hokuyo_proxy):
self.__driver_proxy = driver_proxy
self.__location_proxy = location_proxy
self.__hokuyo_proxy = hokuyo_proxy
self.__scan_analyzer = logic.ScanAnalyzer()
self.__hokuyo_listener = ScanHandler(self)
hokuyo_proxy.subscribe(self.__hokuyo_listener)
self.__scan = None
self.__next_targets, self.__visited_targets = [], []
self.__current_location, self.__next_targets_timestamp = None, 0.0
self.__targets_lock = threading.Condition()
self.__is_active = True
self.__driving_allowed = False
self.__locator = drive_to_point_logic.Locator()
self.__speeds_filter = logic.LowPassFilter(0.5, 0.0, 0.0)
self.__logger = logging.getLogger(LOGGER_NAME)
runtime.add_shutdown_hook(self.stop)
def set_scan(self, scan):
self.__scan = self.__scan_analyzer(scan)
def stop(self):
self.__is_active = False
def set_targets(self, targets):
self.__targets_lock.acquire()
try:
self.__next_targets = targets
self.__next_targets_timestamp = time.time()
self.__driving_allowed = len(targets) > 0
self.__visited_targets = []
finally:
self.__targets_lock.notify_all()
self.__targets_lock.release()
def get_next_targets_and_location(self):
self.__targets_lock.acquire()
try:
return self.__next_targets[:], self.__current_location
finally:
self.__targets_lock.release()
def get_next_target_and_location(self):
self.__targets_lock.acquire()
try:
_next_target = self.__next_targets[0] if len(self.__next_targets) > 0 else (0, 0, 0)
return _next_target, self.__current_location
finally:
self.__targets_lock.release()
def get_visited_targets_and_location(self):
self.__targets_lock.acquire()
try:
return self.__visited_targets[:], self.__current_location
finally:
self.__targets_lock.release()
def get_visited_target_and_location(self):
self.__targets_lock.acquire()
try:
_visited_target = self.__visited_targets[-1] if len(self.__visited_targets) > 0 else (0, 0, 0)
return _visited_target, self.__current_location
finally:
self.__targets_lock.release()
def location_loop(self):
while self.__is_active:
time.sleep(0.09)
current_location = self.__location_proxy.get_location()
current_location = current_location.get_location()
self.__current_location = current_location
self.__locator.update_absolute_location(current_location)
def measuring_loop(self):
while self.__is_active:
time.sleep(0.09)
motors_speed = self.__driver_proxy.get_current_motors_speed()
speed_left = logic.average(motors_speed.get_front_left_speed(),
motors_speed.get_rear_left_speed())
speed_right = logic.average(motors_speed.get_front_right_speed(),
motors_speed.get_rear_right_speed())
self.__locator.calculate_relative_location(speed_left, speed_right)
def driving_loop(self):
driving = False
while self.__is_active:
target = self.__get_next_target()
while self.__is_active and target is not None:
driving = True
self.__drive_to(target, self.__next_targets_timestamp)
self.__add_target_to_visited(target)
target = self.__get_next_target()
if driving:
self.__logger.warning('Next targets list is empty, stop driving.')
self.__stop()
driving = False
time.sleep(0.09)
def __get_next_target(self):
self.__targets_lock.acquire()
try:
return self.__next_targets[0]
except IndexError:
return None
finally:
self.__targets_lock.release()
def __drive_to(self, target, next_targets_timestamp):
self.__logger.info('Drive to %s', str(target))
location = self.__locator.get_absolute_location()
start_time = time.time()
while not DriveToPoint.target_reached(location, target) and \
self.__driving_allowed and self.__is_active and \
not self.__next_targets_timestamp > next_targets_timestamp and \
start_time + MAXIMUM_TIME_DRIVE_TO > time.time():
drive_angle, drive_distance = DriveToPoint.__compute_drive_angle_distance(location, target)
if drive_angle is not None and drive_distance is not None:
scan = self.__scan
scan_distance = drive_support_logic.get_distance(scan, drive_angle)
if drive_distance > scan_distance:
temporary_target = DriveToPoint.__find_temporary_target(scan, drive_distance, drive_angle,
location, target)
self.__logger.warn('Try to drive to temporary target: %s', str(temporary_target))
drive_angle, drive_distance = DriveToPoint.__compute_drive_angle_distance(location,
temporary_target)
if drive_angle is not None and drive_distance is not None:
self.__logger.info('Drive angle: %f, distance: %f', drive_angle, drive_distance)
self.__send_commands(drive_angle, drive_distance)
else:
self.__send_commands(drive_angle, drive_distance)
self.__logger.info('Drive angle: %f, distance: %f', drive_angle, drive_distance)
time.sleep(0.07)
location = self.__locator.get_absolute_location()
if start_time + MAXIMUM_TIME_DRIVE_TO < time.time():
self.__logger.warn('Target %s not reachable', str(target))
else:
self.__logger.info('Target %s reached', str(target))
def __add_target_to_visited(self, target):
self.__targets_lock.acquire()
try:
self.__next_targets.remove(target)
self.__visited_targets.append(target)
except ValueError:
self.__logger.warning('Target %s is not in next targets list, not added to visited targets list.',
str(target))
finally:
self.__targets_lock.release()
def __stop(self):
self.__driver_proxy.send_motors_command(0, 0, 0, 0)
@staticmethod
def target_reached(location, target):
target_x, target_y, target_radius = target
location_x, location_y = location.x, location.y
try:
diff_x = location_x - target_x
diff_y = location_y - target_y
return math.pow(diff_x, 2) + math.pow(diff_y, 2) < math.pow(target_radius, 2)
except TypeError:
traceback.print_exc()
return False
@staticmethod
def __find_temporary_target(scan, drive_distance, drive_angle, location, current_target):
best_current_temporary_distance = None
best_temporary_x, best_temporary_y = None, None
for angle, distance in sorted(scan.points, key=lambda (a, _): a):
if distance > drive_distance:
for _d in logic.drange(ROBO_WIDTH, distance, 100.0):
temporary_x, temporary_y = logic.convert_polar_to_grid(_d - 1.5 * ROBO_WIDTH,
angle + location.angle)
if DriveToPoint.__check_temporary_target(scan, (temporary_x, temporary_y)):
temporary_x, temporary_y = temporary_x / 1000.0 + location.x, temporary_y / 1000.0 + location.y
current_temporary_distance = math.sqrt(math.pow(current_target[0] - temporary_x, 2.0) +
math.pow(current_target[1] - temporary_y, 2.0))
condition = abs(temporary_x - location.x) > abs(current_target[0] - location.x) or \
abs(temporary_y - location.y) > abs(current_target[1] - location.y)
if (best_current_temporary_distance is None or
current_temporary_distance < best_current_temporary_distance) and \
condition:
best_temporary_x, best_temporary_y = temporary_x, temporary_y
best_current_temporary_distance = current_temporary_distance
if best_temporary_x is not None and best_temporary_y is not None:
return best_temporary_x, best_temporary_y, ROBO_WIDTH * 1.3 / 1000.0
return None
@staticmethod
def __check_temporary_target(scan, point):
for alpha in logic.drange(0, 1.5707963267948966, 0.17453292519943295):
x, y = logic.convert_polar_to_grid(ROBO_WIDTH * 0.35, alpha)
_a, _d = logic.convert_grid_to_polar(x + point[0], y + point[1])
s_d = drive_support_logic.get_distance(scan, _a)
if 100.0 < s_d < _d:
return False
_a, _d = logic.convert_grid_to_polar(-x + point[0], y + point[1])
s_d = drive_support_logic.get_distance(scan, _a)
if 100.0 < s_d < _d:
return False
_a, _d = logic.convert_grid_to_polar(x + point[0], -y + point[1])
s_d = drive_support_logic.get_distance(scan, _a)
if 100.0 < s_d < _d:
return False
_a, _d = logic.convert_grid_to_polar(-x + point[0], -y + point[1])
s_d = drive_support_logic.get_distance(scan, _a)
if 100.0 < s_d < _d:
return False
return True
@staticmethod
def __compute_drive_angle_distance(location, target):
target_x, target_y, _ = target
location_x, location_y, location_angle = location.x, location.y, location.angle
location_angle = drive_to_point_logic.normalize_angle(location_angle)
if location_x is None or location_y is None or location_angle is None:
return None, None
diff_y = target_y - location_y
diff_x = target_x - location_x
target_angle = math.atan2(target_y - location_y, target_x - location_x)
drive_angle = target_angle - location_angle
drive_angle = drive_to_point_logic.normalize_angle(drive_angle)
drive_angle = -drive_angle # mirrored map
drive_distance = math.sqrt(diff_y * diff_y + diff_x * diff_x) * 1000.0
return drive_angle, drive_distance
def __compute_speed(self, drive_angle, drive_distance):
alpha = 0.17453292519943295
beta = 0.5235987755982988
if abs(drive_angle) < alpha: # 10st
# drive normal
left, right = MAX_SPEED, MAX_SPEED
elif abs(drive_angle) > beta: # 60st
# rotate in place
left = -MAX_SPEED
right = MAX_SPEED
if drive_angle < 0:
left, right = right, left
else:
# drive on turn
left = MAX_SPEED - DriveToPoint.compute_change(drive_angle, math.pi / beta)
right = MAX_SPEED + DriveToPoint.compute_change(drive_angle, math.pi / beta)
_left, _right = self.__speeds_filter(abs(left), abs(right))
left, right = logic.sign(left) * _left, logic.sign(right) * _right
return left, right
@staticmethod
def compute_change(drive_angle, driving_alpha):
return driving_alpha * drive_angle / math.pi * MAX_SPEED
def __send_commands(self, drive_angle, drive_distance):
left, right = self.__compute_speed(drive_angle, drive_distance)
current_timestamp = time.time()
scan_factor = logic.compute_data_trust(self.__scan.timestamp / 1000.0, current_timestamp)
location_factor = logic.compute_data_trust(self.__locator.get_last_update_timestamp(), current_timestamp)
left = left * scan_factor * location_factor
right = right * scan_factor * location_factor
left, right = int(left), int(right)
self.__driver_proxy.send_motors_command(left, right, left, right)