OpenAI Gym from scratch

From a environment development to a trained network.

Jonathas Figueiredo
Towards Data Science

--

There are a lot of work and tutorials out there explaining how to use OpenAI Gym toolkit and also how to use Keras and TensorFlow to train existing environments using some existing OpenAI Gym structures. However in this tutorial I will explain how to create an OpenAI environment from scratch and train an agent on it.

I will also explain how to create a simulator in order to develop the environment. The tutorial is divided in 4 sections: Problem statement, Simulator, Gym environment and Training.

1 — Problem statement

The problem here proposed is based on my final graduation project. The objective is to create an artificial intelligence agent to control the navigation of a ship throughout a channel.

Nowadays navigation in restricted waters such as channels and ports are basically based on the pilot knowledge about environmental conditions such as wind and water current in a given location.

Figure1: Ship accident

Let’s say the humans still making mistakes that costs billions of dollars sometimes and AI is a possible alternative that could be applied in navigation to reduce the number of accidents.

2 — Simulator

In order to create an AI agent to control a ship we need an environment where the AI agent can perform navigation experiences and learn with its own mistakes how to navigate correctly throughout a channel. Moreover, because we cannot use a real ship to train the AI agent, the best alternative is to use a simulator that mimics the dynamics behavior of a real-world ship. For this purpose we could use an existing commercial software (paid option) , but in this tutorial we’are going to create our own ship simulator.

To do so, some hypothesis are adopted such as: the ship is a rigid body, the only external forces that actuate in the ship are the water-resistance forces (no wind, no water current), furthermore the propulsion and rudder control forces are used control the direction and the velocity of the ship. The complete equations that govern the dynamics of the ship are complex and can be found in reference [1] . Here we’re going to use a very simple 3DOF model presented bellow:

In this diagram u is the longitudinal velocity of the ship in relation to a frame fixed on the ship CG, v is the draft velocity and dψ/dt is the angular velocity in relation to the fixed reference and ψ is the attack angle of the ship measured in relation to a fixed frame OXY. The velocities U, V (fixed frame) are linked t1o u, v via the 2x2 rotation matrix. Φ is the rudder angle measured w.r.t the moving frame as shown in the figure.

The formulations of the resistance and propulsion forces is out of scope of this tutorial, but, in summary, the resistance forces are opposite to the ship movement and proportional to the ship velocity. The rudder and propulsion forces are proportional to the parameters Al in [−1, 1] and Ap in [0, 1]. These parameters have a direct proportional relation with the rudder angle and the propulsion (Tp). Note that Al and Ap are controllable parameters, such that:

Now that we have the model differential equations, we can use a integrator to build up our simulator. Let’s write down our simulator.

  • Step 1: Simulator equation. Writing the equations above isolating the accelerations terms we have:
def simulate_scipy(self, t, global_states):
local_states = self._global_to_local(global_states)
return self._local_ds_global_ds(global_states[2], self.simulate(local_states))

def simulate(self, local_states):
"""
:param local_states: Space state
:return df_local_states
"""
x1 = local_states[0] # u
x2 = local_states[1] # v
x3 = local_states[2] # theta (not used)
x4 = local_states[3] # du
x5 = local_states[4] # dv
x6 = local_states[5] # dtheta

Frx, Fry, Frz = self.compute_rest_forces(local_states)

# Propulsion model
Fpx, Fpy, Fpz = self.compute_prop_forces(local_states)

# Derivative function

fx1 = x4
fx2 = x5
fx3 = x6

# main model simple
fx4 = (Frx + Fpx) / (self.M)
fx5 = (Fry + Fpy) / (self.M)
fx6 = (Frz + Fpz) / (self.Iz)

fx = np.array([fx1, fx2, fx3, fx4, fx5, fx6])
return fx

In the first 6 lines we just define the variable names from x1 to x6, beta and alpha are the control constants used to control rudder and propulsion control, after we compute the resistance forces and finally we isolate the derivative terms fx1,fx2 …, fx6 such that:

  • Step2 : The integrator. As integrator for simulations we use a 5-order Runge-Kutta integrator of scipy library.
from scipy.integrate import RK45

We define a function that uses the scipy RK45 to integrate a function fun using a start point y0. The integration is done from t0 to t_bound, with relative tolerance rtol and absolute tolerance atol.

def scipy_runge_kutta(self, fun, y0, t0=0, t_bound=10):
return RK45(fun, t0, y0, t_bound, rtol=self.time_span/self.number_iterations, atol=1e-4)

Because we are using a global reference(OXY) to locate the ship and a local one to integrate the equations (oxyz), we define a “mask” function to use in the integrator. This function basically transform the differential vector outputted in the function simulate to the global reference.

def simulate_scipy(self, t, global_states):
local_states = self._global_to_local(global_states)
return self._local_ds_global_ds(global_states[2], self.simulate(local_states))
  • Step3 : The step function

For each step we pass a rudder (angle_level) and a rotational level (rot_level) to control the thrust delivered by the propulsion.

def step(self, angle_level, rot_level):
self.current_action = np.array([angle_level, rot_level])
while not (self.integrator.status == 'finished'):
self.integrator.step()
self.last_global_state = self.integrator.y
self.last_local_state = self._global_to_local(self.last_global_state)
self.integrator = self.scipy_runge_kutta(self.simulate_scipy, self.get_state(), t0=self.integrator.t, t_bound=self.integrator.t+self.time_span)
return self.last_global_state

In the first line we store the current action vector, in the second line we integrate using RK45 self.integrator.step() until it have reached the final time span. Finally we update the self.last_global_state, self.last_local_state and the integration interval via self.integrator. Finally we return the global states self.last_global_state.

  • Step4 : The reset function. The reset function is used to set-up the initial conditions of simulation at each new iteration such as initial position and velocity. It uses a global variable and update the self.last_global_state, self.last_local_state.
def reset_start_pos(self, global_vector):
x0, y0, theta0, vx0, vy0, theta_dot0 = global_vector[0], global_vector[1], global_vector[2], global_vector[3], global_vector[4], global_vector[5]
self.last_global_state = np.array([x0, y0, theta0, vx0, vy0, theta_dot0])
self.last_local_state = self._global_to_local(self.last_global_state)
self.current_action = np.zeros(2)
self.integrator = self.scipy_runge_kutta(self.simulate_scipy, self.get_state(), t_bound=self.time_span)

The final code can be checked here.

3 — Gym Environment

Once we have our simulator we can now create a gym environment to train the agent.

3.1 States

The states are the environment variables that the agent can “see” the world. The agent uses the variables to locate himself in the environment and decide what actions to take to accomplish the proposed mission. In our problem the mission is stated as:

Use the rudder control to perform a defined linear navigation path along a channel under a given constant propulsion action.

The states chosen for the application of RL in the task are the following:

Where d is the distance from the center of mass of the ship to the guideline; θ is the angle between the longitudinal axis of the ship and the guideline; vx is the horizontal speed of the ship in its center of mass (in the direction of the guideline; vy is the vertical speed of the ship in its center of mass (perpendicular to the guideline); dθ/dt is the angular velocity of the ship.

3.2 Reward

The reward function is responsible for punishing the agent if he does not follow the guideline, and will reward him if he can stay in line without too much wavering. The defined reward function is:

Where:

3.3 Actions

The actions are the input parameters for controlling the ship maneuver movement. The forces that make the ship controllable are the rudder and propulsion forces. The actions have the vector form Av = [Al, Ap], where Al is the dimensionless rudder command and Ap is the dimensionless propulsion command, such that Al in [-1,1] and Ap in [0,1].

3.4 Code

Now that we have defined the main aspects of our environment, we can write down the code.

First we define the limits bounds of our ship and the kind of “box” of our observable space-state (features), we also define the initial condition box.

self.observation_space = spaces.Box(low=np.array([0, -np.pi / 2, 0, -4, -0.2]), high=np.array([150, np.pi / 2, 4.0, 4.0, 0.2]))self.init_space = spaces.Box(low=np.array([0, -np.pi / 15, 1.0, 0.2, -0.01]), high=np.array([30, np.pi / 15, 1.5, 0.3, 0.01]))
self.ship_data = None

After that we define a function to transform the simulator space-state to the environment space-state. Note that we mirror the vy velocity the θ angle and the distance d to make easier to the AI to learn (decrease the space-state dimension).

def convert_state(self, state):
"""
This method generated the features used to build the reward function
:param state: Global state of the ship
"""
ship_point = Point((state[0], state[1]))
side = np.sign(state[1] - self.point_a[1])
d = ship_point.distance(self.guideline) # meters
theta = side*state[2] # radians
vx = state[3] # m/s
vy = side*state[4] # m/s
thetadot = side * state[5] # graus/min
obs = np.array([d, theta, vx, vy, thetadot])
return obs

Than we define a function to compute the reward as defined before

d, theta, vx, vy, thetadot = obs[0], obs[1]*180/np.pi, obs[2], obs[3], obs[4]*180/np.pi
if not self.observation_space.contains(obs):
return -1000
else:
return 1-8*np.abs(theta/90)-np.abs(thetadot/20)-5*np.abs(d)/150- np.abs(vy/4)-np.abs(vx-2)/2

We also have to define the step function. This function is used by the agent when navigating, at each step the agent choose an action and run a simulation during 10s (in our integrator) and do it again and again until it reaches the end of the channel or until it hits the channel edge.

def step(self, action):
side = np.sign(self.last_pos[1])
angle_action = action[0]*side
rot_action = 0.2
state_prime = self.simulator.step(angle_level=angle_action, rot_level=rot_action)
# convert simulator states into obervable states
obs = self.convert_state(state_prime)
# print('Observed state: ', obs)
dn = self.end(state_prime=state_prime, obs=obs)
rew = self.calculate_reward(obs=obs)
self.last_pos = [state_prime[0], state_prime[1], state_prime[2]]
self.last_action = np.array([angle_action, rot_action])
info = dict()
return obs, rew, dn, info

Because we mirror the states we also have to mirror the rudder actions multiplying it by side. In this tutorial we are going to create a network to control only the rudder actions and keep the rotational angle constant (rot_action = 0.2).

We also create a viewer using the library turtle, you can check the code here. It is used to show the learning process or the performance after training. The viewer is called in the function render.

def render(self, mode='human'):
if self.viewer is None:
self.viewer = Viewer()
self.viewer.plot_boundary(self.borders)
self.viewer.plot_guidance_line(self.point_a, self.point_b)
img_x_pos = self.last_pos[0] - self.point_b[0] * (self.last_pos[0] // self.point_b[0])
if self.last_pos[0]//self.point_b[0] > self.number_loop:
self.viewer.end_episode()
self.viewer.plot_position(img_x_pos, self.last_pos[1], self.last_pos[2], 20 * self.last_action[0])
self.viewer.restart_plot()
self.number_loop += 1
else:
self.viewer.plot_position(img_x_pos, self.last_pos[1], self.last_pos[2], 20 * self.last_action[0])

Finally we define the function to setup the init space-state and the reset, they are used in the beginning of each new iteration.

def set_init_space(self, low, high):
self.init_space = spaces.Box(low=np.array(low), high=np.array(high))
def reset(self):
init = list(map(float, self.init_space.sample()))
init_states = np.array([self.start_pos[0], init[0], init[1], init[2] * np.cos(init[1]), init[2] * np.sin(init[1]), 0])
self.simulator.reset_start_pos(init_states)
self.last_pos = np.array([self.start_pos[0], init[0], init[1]])
print('Reseting position')
state = self.simulator.get_state()
if self.viewer is not None:
self.viewer.end_episode()
return self.convert_state(state)

The complete code can be found here.

4 — Training

To train our agent we are using a DDPG agent from the Keras-rl project. Details about the DDPG method can be found here.

We than import all used methods to build our neural network.

from keras.models import Sequential, Model
from keras.layers import Dense, Activation, Flatten, Input, Concatenate
from keras.optimizers import Adam
from rl.agents import DDPGAgent
from rl.memory import SequentialMemory
from rl.random import OrnsteinUhlenbeckProcess
from ship_env import ShipEnv

The neural network used has the following structure (actor-critic structure):

The code that implements this is structure is the following:

# Next, we build a very simple model.
actor = Sequential()
actor.add(Flatten(input_shape=(1,) + env.observation_space.shape))
actor.add(Dense(400))
actor.add(Activation('relu'))
actor.add(Dense(300))
actor.add(Activation('relu'))
actor.add(Dense(nb_actions))
actor.add(Activation('softsign'))
print(actor.summary())
action_input = Input(shape=(nb_actions,), name='action_input')
observation_input = Input(shape=(1,) + env.observation_space.shape, name='observation_input')
flattened_observation = Flatten()(observation_input)
x = Concatenate()([action_input, flattened_observation])
x = Dense(400)(x)
x = Activation('relu')(x)
x = Dense(300)(x)
x = Activation('relu')(x)
x = Dense(1)(x)
x = Activation('linear')(x)
critic = Model(inputs=[action_input, observation_input], outputs=x)
print(critic.summary())
# Finally, we configure and compile our agent. You can use every built-in Keras optimizer and
# even the metrics!
memory = SequentialMemory(limit=2000, window_length=1)
random_process = OrnsteinUhlenbeckProcess(size=nb_actions, theta=0.6, mu=0, sigma=0.3)
agent = DDPGAgent(nb_actions=nb_actions, actor=actor, critic=critic, critic_action_input=action_input,
memory=memory, nb_steps_warmup_critic=2000, nb_steps_warmup_actor=10000,
random_process=random_process, gamma=.99, target_model_update=1e-3)
agent.compile(Adam(lr=0.001, clipnorm=1.), metrics=['mae'])

Finally we train our agent using 300.000 iterations and after training we save the network weights and the history of training:

hist = agent.fit(env, nb_steps=300000, visualize=False, verbose=2, nb_max_episode_steps=1000) # train our agent and store training in histfilename = '300kit_rn4_maior2_mem20k_target01_theta3_batch32_adam2'# we save the history of learning, it can further be used to plot reward evolutionwith open('_experiments/history_ddpg__redetorcs'+filename+'.pickle', 'wb') as handle:
pickle.dump(hist.history, handle, protocol=pickle.HIGHEST_PROTOCOL)
#
After training is done, we save the final weights.
agent.save_weights('h5f_files/ddpg_{}_weights.h5f'.format('600kit_rn4_maior2_mem20k_target01_theta3_batch32_adam2_action_lim_1'), overwrite=True)

Finally after training we have the following result:

The agent have learned how to control the rudder and how to stay in the channel mid-line.

Thanks if you have read this far! Any questions just leave a comment bellow

More about this project:

In my final-year project I used a more-detailed ship model and also included the propulsion action to be controlled by the AI agent. The project repository can be found here.

References:

[1] FOSSEN, Thor I. Handbook of marine craft hydrodynamics and motion control. John Wiley & Sons, Ltd, 2011.

--

--

Graduated in Mechatronics and Automation Control, I have a particular interest in innovation engineering activities and AI development.