# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import parl import queue import six import threading import time import numpy as np from actor import Actor from opensim_model import OpenSimModel from opensim_agent import OpenSimAgent from parl.utils import logger, ReplayMemory, summary, get_gpu_count from parl.utils.window_stat import WindowStat from parl.remote.client import get_global_client from parl.utils import machine_info ACT_DIM = 22 VEL_DIM = 19 OBS_DIM = 98 + VEL_DIM GAMMA = 0.96 TAU = 0.001 ACTOR_LR = 3e-5 CRITIC_LR = 3e-5 BATCH_SIZE = 128 NOISE_DECAY = 0.999998 class TransitionExperience(object): """ A transition of state, or experience""" def __init__(self, obs, action, reward, info, **kwargs): """ kwargs: whatever other attribute you want to save""" self.obs = obs self.action = action self.reward = reward self.info = info for k, v in six.iteritems(kwargs): setattr(self, k, v) class ActorState(object): """Maintain incomplete trajectories data of actor.""" def __init__(self): self.memory = [] # list of Experience self.ident = np.random.randint(int(1e18)) self.last_target_changed_steps = 0 def reset(self): self.memory = [] self.last_target_changed_steps = 0 def update_last_target_changed(self): self.last_target_changed_steps = len(self.memory) class Learner(object): def __init__(self, args): if machine_info.is_gpu_available(): assert get_gpu_count() == 1, 'Only support training in single GPU,\ Please set environment variable: `export CUDA_VISIBLE_DEVICES=[GPU_ID_TO_USE]` .' else: cpu_num = os.environ.get('CPU_NUM') assert cpu_num is not None and cpu_num == '1', 'Only support training in single CPU,\ Please set environment variable: `export CPU_NUM=1`.' model = OpenSimModel(OBS_DIM, VEL_DIM, ACT_DIM) algorithm = parl.algorithms.DDPG( model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR) self.agent = OpenSimAgent(algorithm, OBS_DIM, ACT_DIM) self.rpm = ReplayMemory(args.rpm_size, OBS_DIM, ACT_DIM) if args.restore_rpm_path is not None: self.rpm.load(args.restore_rpm_path) if args.restore_model_path is not None: self.restore(args.restore_model_path) # add lock between training and predicting self.model_lock = threading.Lock() # add lock when appending data to rpm or writing scalars to summary self.memory_lock = threading.Lock() self.ready_actor_queue = queue.Queue() self.total_steps = 0 self.noiselevel = 0.5 self.critic_loss_stat = WindowStat(500) self.env_reward_stat = WindowStat(500) self.shaping_reward_stat = WindowStat(500) self.max_env_reward = 0 # thread to keep training learn_thread = threading.Thread(target=self.keep_training) learn_thread.setDaemon(True) learn_thread.start() self.create_actors() def create_actors(self): """Connect to the cluster and start sampling of the remote actor. """ parl.connect(args.cluster_address, ['official_obs_scaler.npz']) for i in range(args.actor_num): logger.info('Remote actor count: {}'.format(i + 1)) remote_thread = threading.Thread(target=self.run_remote_sample) remote_thread.setDaemon(True) remote_thread.start() # There is a memory-leak problem in osim-rl package. # So we will dynamically add actors when remote actors killed due to excessive memory usage. time.sleep(10 * 60) parl_client = get_global_client() while True: if parl_client.actor_num < args.actor_num: logger.info( 'Dynamic adding acotr, current actor num:{}'.format( parl_client.actor_num)) remote_thread = threading.Thread(target=self.run_remote_sample) remote_thread.setDaemon(True) remote_thread.start() time.sleep(5) def _new_ready_actor(self): """ The actor is ready to start new episode, but blocking until training thread call actor_ready_event.set() """ actor_ready_event = threading.Event() self.ready_actor_queue.put(actor_ready_event) logger.info( "[new_avaliabe_actor] approximate size of ready actors:{}".format( self.ready_actor_queue.qsize())) actor_ready_event.wait() def run_remote_sample(self): remote_actor = Actor( difficulty=args.difficulty, vel_penalty_coeff=args.vel_penalty_coeff, muscle_penalty_coeff=args.muscle_penalty_coeff, penalty_coeff=args.penalty_coeff, only_first_target=args.only_first_target) actor_state = ActorState() while True: obs = remote_actor.reset() actor_state.reset() while True: actor_state.memory.append( TransitionExperience( obs=obs, action=None, reward=None, info=None, timestamp=time.time())) action = self.pred_batch(obs) # For each target, decay noise as the steps increase. step = len( actor_state.memory) - actor_state.last_target_changed_steps current_noise = self.noiselevel * (0.98**(step - 1)) noise = np.zeros((ACT_DIM, ), dtype=np.float32) if actor_state.ident % 3 == 0: if step % 5 == 0: noise = np.random.randn(ACT_DIM) * current_noise elif actor_state.ident % 3 == 1: if step % 5 == 0: noise = np.random.randn(ACT_DIM) * current_noise * 2 action += noise action = np.clip(action, -1, 1) obs, reward, done, info = remote_actor.step(action) reward_scale = (1 - GAMMA) info['shaping_reward'] *= reward_scale actor_state.memory[-1].reward = reward actor_state.memory[-1].info = info actor_state.memory[-1].action = action if 'target_changed' in info and info['target_changed']: actor_state.update_last_target_changed() if done: self._parse_memory(actor_state, last_obs=obs) break self._new_ready_actor() def _parse_memory(self, actor_state, last_obs): mem = actor_state.memory n = len(mem) episode_shaping_reward = np.sum( [exp.info['shaping_reward'] for exp in mem]) episode_env_reward = np.sum([exp.info['env_reward'] for exp in mem]) episode_time = time.time() - mem[0].timestamp episode_rpm = [] for i in range(n - 1): episode_rpm.append([ mem[i].obs, mem[i].action, mem[i].info['shaping_reward'], mem[i + 1].obs, False ]) episode_rpm.append([ mem[-1].obs, mem[-1].action, mem[-1].info['shaping_reward'], last_obs, not mem[-1].info['timeout'] ]) with self.memory_lock: self.total_steps += n self.add_episode_rpm(episode_rpm) if actor_state.ident % 3 == 2: # trajectory without noise self.env_reward_stat.add(episode_env_reward) self.shaping_reward_stat.add(episode_shaping_reward) self.max_env_reward = max(self.max_env_reward, episode_env_reward) if self.env_reward_stat.count > 500: summary.add_scalar('recent_env_reward', self.env_reward_stat.mean, self.total_steps) summary.add_scalar('recent_shaping_reward', self.shaping_reward_stat.mean, self.total_steps) if self.critic_loss_stat.count > 500: summary.add_scalar('recent_critic_loss', self.critic_loss_stat.mean, self.total_steps) summary.add_scalar('episode_length', n, self.total_steps) summary.add_scalar('max_env_reward', self.max_env_reward, self.total_steps) summary.add_scalar('ready_actor_num', self.ready_actor_queue.qsize(), self.total_steps) summary.add_scalar('episode_time', episode_time, self.total_steps) self.noiselevel = self.noiselevel * NOISE_DECAY def learn(self): start_time = time.time() for T in range(args.train_times): [states, actions, rewards, new_states, dones] = self.rpm.sample_batch(BATCH_SIZE) with self.model_lock: critic_loss = self.agent.learn(states, actions, rewards, new_states, dones) self.critic_loss_stat.add(critic_loss) logger.info( "[learn] time consuming:{}".format(time.time() - start_time)) def keep_training(self): episode_count = 1000000 for T in range(episode_count): if self.rpm.size() > BATCH_SIZE * args.warm_start_batchs: self.learn() logger.info( "[keep_training/{}] trying to acq a new env".format(T)) # Keep training and predicting balance # After training, wait for a ready actor, and make the actor start new episode ready_actor_event = self.ready_actor_queue.get() ready_actor_event.set() if np.mod(T, 100) == 0: logger.info("saving models") self.save(T) if np.mod(T, 10000) == 0: logger.info("saving rpm") self.save_rpm() def save_rpm(self): save_path = os.path.join(logger.get_dir(), "rpm.npz") self.rpm.save(save_path) def save(self, T): save_path = os.path.join( logger.get_dir(), 'model_every_100_episodes/episodes-{}'.format(T)) self.agent.save(save_path) def restore(self, model_path): logger.info('restore model from {}'.format(model_path)) self.agent.restore(model_path) def add_episode_rpm(self, episode_rpm): for x in episode_rpm: self.rpm.append( obs=x[0], act=x[1], reward=x[2], next_obs=x[3], terminal=x[4]) def pred_batch(self, obs): batch_obs = np.expand_dims(obs, axis=0) with self.model_lock: action = self.agent.predict(batch_obs.astype('float32')) action = np.squeeze(action, axis=0) return action if __name__ == '__main__': from train_args import get_args args = get_args() if args.logdir is not None: logger.set_dir(args.logdir) learner = Learner(args)