MushroomRL/mushroom-rl

View on GitHub
mushroom_rl/environments/ship_steering.py

Summary

Maintainability
B
4 hrs
Test Coverage
B
89%
import numpy as np

from mushroom_rl.core import Environment, MDPInfo
from mushroom_rl.rl_utils import spaces
from mushroom_rl.utils.angles import normalize_angle
from mushroom_rl.utils.viewer import Viewer


class ShipSteering(Environment):
    """
    The Ship Steering environment as presented in:
    "Hierarchical Policy Gradient Algorithms". Ghavamzadeh M. and Mahadevan S.. 2013.

    """
    def __init__(self, small=True, n_steps_action=3):
        """
        Constructor.

        Args:
             small (bool, True): whether to use a small state space or not.
             n_steps_action (int, 3): number of integration intervals for each
                                      step of the env.

        """
        # MDP parameters
        self.field_size = 150 if small else 1000
        low = np.array([0, 0, -np.pi, -np.pi / 12.])
        high = np.array([self.field_size, self.field_size, np.pi, np.pi / 12.])
        self.omega_max = np.array([np.pi / 12.])
        self._v = 3.
        self._T = 5.
        self._gate_s = np.empty(2)
        self._gate_e = np.empty(2)
        self._gate_s[0] = 100 if small else 350
        self._gate_s[1] = 120 if small else 400
        self._gate_e[0] = 120 if small else 450
        self._gate_e[1] = 100 if small else 400
        self._out_reward = -100
        self._success_reward = 0
        self._small = small
        self._state = None
        self.n_steps_action = n_steps_action

        # MDP properties
        dt = .2
        observation_space = spaces.Box(low=low, high=high)
        action_space = spaces.Box(low=-self.omega_max, high=self.omega_max)
        horizon = 5000
        gamma = .99
        mdp_info = MDPInfo(observation_space, action_space, gamma, horizon, dt)

        # Visualization
        self._viewer = Viewer(self.field_size, self.field_size,
                              background=(66, 131, 237))

        super().__init__(mdp_info)

    def reset(self, state=None):
        if state is None:
            if self._small:
                self._state = np.zeros(4)
                self._state[2] = np.pi/2
            else:
                low = self.info.observation_space.low
                high = self.info.observation_space.high
                self._state = (high-low)*np.random.rand(4) + low
        else:
            self._state = state

        return self._state, {}

    def step(self, action):

        r = self._bound(action[0], -self.omega_max, self.omega_max)

        new_state = self._state

        for _ in range(self.n_steps_action):
            state = new_state
            new_state = np.empty(4)
            new_state[0] = state[0] + self._v * np.cos(state[2]) * self.info.dt
            new_state[1] = state[1] + self._v * np.sin(state[2]) * self.info.dt
            new_state[2] = normalize_angle(state[2] + state[3] * self.info.dt)
            new_state[3] = state[3] + (r - state[3]) * self.info.dt / self._T

            if new_state[0] > self.field_size or new_state[1] > self.field_size or new_state[0] < 0 or new_state[1] < 0:
                new_state[0] = self._bound(new_state[0], 0, self.field_size)
                new_state[1] = self._bound(new_state[1], 0, self.field_size)

                reward = self._out_reward
                absorbing = True
                break

            elif self._through_gate(state[:2], new_state[:2]):
                reward = self._success_reward
                absorbing = True
                break
            else:
                reward = -1
                absorbing = False

        self._state = new_state

        return self._state, reward, absorbing, {}

    def render(self, record=False):
        self._viewer.line(self._gate_s, self._gate_e,
                          width=3)

        boat = [
            [-4, -4],
            [-4, 4],
            [4, 4],
            [8, 0.0],
            [4, -4]
        ]
        self._viewer.polygon(self._state[:2], self._state[2], boat,
                             color=(32, 193, 54))

        frame = self._viewer.get_frame() if record else None

        self._viewer.display(self.info.dt)

        return frame

    def stop(self):
        self._viewer.close()

    def _through_gate(self, start, end):
        r = self._gate_e - self._gate_s
        s = end - start
        den = self._cross_2d(vecr=r, vecs=s)

        if den == 0:
            return False

        t = self._cross_2d((start - self._gate_s), s) / den
        u = self._cross_2d((start - self._gate_s), r) / den

        return 1 >= u >= 0 and 1 >= t >= 0

    @staticmethod
    def _cross_2d(vecr, vecs):
        return vecr[0] * vecs[1] - vecr[1] * vecs[0]