# Capstone 项目–使用 DQN 进行赛车 在最后几章中,我们通过使用神经网络逼近 q 函数来了解 Deep Q 学习的工作原理。 在此之后,我们看到了**深度 Q 网络**(**DQN**)的各种改进,例如双重 Q 学习,决斗网络架构和深度循环 Q 网络。 我们已经了解了 DQN 如何利用回放缓冲区来存储智能体的经验,并使用缓冲区中的小批样本来训练网络。 我们还实现了用于玩 Atari 游戏的 DQN 和一个用于玩 Doom 游戏的**深度循环 Q 网络**(**DRQN**)。 在本章中,让我们进入决斗 DQN 的详细实现,它与常规 DQN 基本相同,除了最终的全连接层将分解为两个流,即值流和优势流,而这些流 两个流将合并在一起以计算 Q 函数。 我们将看到如何训练决斗的 DQN 来赢得赛车比赛的代理商。 在本章中,您将学习如何实现以下内容: * 环境包装器功能 * 决斗网络 * 回放缓冲区 * 训练网络 * 赛车 # 环境包装器功能 本章使用的代码归功于 Giacomo Spigler 的 [GitHub 存储库](https://github.com/spiglerg/DQN_DDQN_Dueling_and_DDPG_Tensorflow)。 在本章中,每一行都对代码进行了说明。 有关完整的结构化代码,请查看上面的 GitHub 存储库。 首先,我们导入所有必需的库: ```py import numpy as np import tensorflow as tf import gym from gym.spaces import Box from scipy.misc import imresize import random import cv2 import time import logging import os import sys ``` 我们定义`EnvWrapper`类并定义一些环境包装器函数: ```py class EnvWrapper: ``` 我们定义`__init__`方法并初始化变量: ```py def __init__(self, env_name, debug=False): ``` 初始化`gym`环境: ```py self.env = gym.make(env_name) ``` 获取`action_space`: ```py self.action_space = self.env.action_space ``` 获取`observation_space`: ```py self.observation_space = Box(low=0, high=255, shape=(84, 84, 4)) ``` 初始化`frame_num`以存储帧数: ```py self.frame_num = 0 ``` 初始化`monitor`以记录游戏画面: ```py self.monitor = self.env.monitor ``` 初始化`frames`: ```py self.frames = np.zeros((84, 84, 4), dtype=np.uint8) ``` 初始化一个名为`debug`的布尔值,将其设置为`true`时将显示最后几帧: ```py self.debug = debug if self.debug: cv2.startWindowThread() cv2.namedWindow("Game") ``` 接下来,我们定义一个名为`step`的函数,该函数将当前状态作为输入并返回经过预处理的下一状态的帧: ```py def step(self, a): ob, reward, done, xx = self.env.step(a) return self.process_frame(ob), reward, done, xx ``` 我们定义了一个称为`reset`的功能来重置环境; 重置后,它将返回预处理的游戏屏幕: ```py def reset(self): self.frame_num = 0 return self.process_frame(self.env.reset()) ``` 接下来,我们定义另一个用于渲染环境的函数: ```py def render(self): return self.env.render() ``` 现在,我们定义用于预处理帧的`process_frame`函数: ```py def process_frame(self, frame): # convert the image to gray state_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # change the size state_resized = cv2.resize(state_gray,(84,110)) #resize gray_final = state_resized[16:100,:] if self.frame_num == 0: self.frames[:, :, 0] = gray_final self.frames[:, :, 1] = gray_final self.frames[:, :, 2] = gray_final self.frames[:, :, 3] = gray_final else: self.frames[:, :, 3] = self.frames[:, :, 2] self.frames[:, :, 2] = self.frames[:, :, 1] self.frames[:, :, 1] = self.frames[:, :, 0] self.frames[:, :, 0] = gray_final # Next we increment the frame_num counter self.frame_num += 1 if self.debug: cv2.imshow('Game', gray_final) return self.frames.copy() ``` 经过预处理后,我们的游戏屏幕如下图所示: ![](img/00406.gif) # 决斗网络 现在,我们构建决斗 DQN; 我们先构建三个卷积层,然后是两个全连接层,最后一个全连接层将被分为两个单独的层,用于价值流和优势流。 我们将使用将值流和优势流结合在一起的聚合层来计算 q 值。 这些层的尺寸如下: * **第 1 层**:32 个`8x8`滤镜,步幅为 4 + RELU * **第 2 层**:64 个`4x4`滤镜,步幅为 2 + RELU * **第 3 层**:64 个`3x3`滤镜,步幅为 1 + RELU * **第 4a 层**:512 个单元的全连接层 + RELU * **第 4b 层**:512 个单元的全连接层 + RELU * **第 5a 层**:1 个 FC + RELU(状态值) * **第 5b 层**:动作 FC + RELU(优势值) * **第 6 层**:总计`V(s) + A(s, a)` ```py class QNetworkDueling(QNetwork): ``` 我们定义`__init__`方法来初始化所有层: ```py def __init__(self, input_size, output_size, name): self.name = name self.input_size = input_size self.output_size = output_size with tf.variable_scope(self.name): # Three convolutional Layers self.W_conv1 = self.weight_variable([8, 8, 4, 32]) self.B_conv1 = self.bias_variable([32]) self.stride1 = 4 self.W_conv2 = self.weight_variable([4, 4, 32, 64]) self.B_conv2 = self.bias_variable([64]) self.stride2 = 2 self.W_conv3 = self.weight_variable([3, 3, 64, 64]) self.B_conv3 = self.bias_variable([64]) self.stride3 = 1 # Two fully connected layer self.W_fc4a = self.weight_variable([7`7`64, 512]) self.B_fc4a = self.bias_variable([512]) self.W_fc4b = self.weight_variable([7`7`64, 512]) self.B_fc4b = self.bias_variable([512]) # Value stream self.W_fc5a = self.weight_variable([512, 1]) self.B_fc5a = self.bias_variable([1]) # Advantage stream self.W_fc5b = self.weight_variable([512, self.output_size]) self.B_fc5b = self.bias_variable([self.output_size]) ``` 我们定义`__call__`方法并执行卷积运算: ```py def __call__(self, input_tensor): if type(input_tensor) == list: input_tensor = tf.concat(1, input_tensor) with tf.variable_scope(self.name): # Perform convolutional on three layers self.h_conv1 = tf.nn.relu( tf.nn.conv2d(input_tensor, self.W_conv1, strides=[1, self.stride1, self.stride1, 1], padding='VALID') + self.B_conv1 ) self.h_conv2 = tf.nn.relu( tf.nn.conv2d(self.h_conv1, self.W_conv2, strides=[1, self.stride2, self.stride2, 1], padding='VALID') + self.B_conv2 ) self.h_conv3 = tf.nn.relu( tf.nn.conv2d(self.h_conv2, self.W_conv3, strides=[1, self.stride3, self.stride3, 1], padding='VALID') + self.B_conv3 ) # Flatten the convolutional output self.h_conv3_flat = tf.reshape(self.h_conv3, [-1, 7`7`64]) # Fully connected layer self.h_fc4a = tf.nn.relu(tf.matmul(self.h_conv3_flat, self.W_fc4a) + self.B_fc4a) self.h_fc4b = tf.nn.relu(tf.matmul(self.h_conv3_flat, self.W_fc4b) + self.B_fc4b) # Compute value stream and advantage stream self.h_fc5a_value = tf.identity(tf.matmul(self.h_fc4a, self.W_fc5a) + self.B_fc5a) self.h_fc5b_advantage = tf.identity(tf.matmul(self.h_fc4b, self.W_fc5b) + self.B_fc5b) # Club both the value and advantage stream self.h_fc6 = self.h_fc5a_value + ( self.h_fc5b_advantage - tf.reduce_mean(self.h_fc5b_advantage, reduction_indices=[1,], keep_dims=True) ) return self.h_fc6 ``` # 回放记忆 现在,我们构建经验回放缓冲区,该缓冲区用于存储所有智能体的经验。 我们从重放缓冲区中抽取了少量经验来训练网络: ```py class ReplayMemoryFast: ``` 首先,我们定义`__init__`方法并启动缓冲区大小: ```py def __init__(self, memory_size, minibatch_size): # max number of samples to store self.memory_size = memory_size # minibatch size self.minibatch_size = minibatch_size self.experience = [None]*self.memory_size self.current_index = 0 self.size = 0 ``` 接下来,我们定义`store`函数来存储经验: ```py def store(self, observation, action, reward, newobservation, is_terminal): ``` 将经验存储为元组(当前状态`action`,`reward`,下一个状态是终端状态): ```py self.experience[self.current_index] = (observation, action, reward, newobservation, is_terminal) self.current_index += 1 self.size = min(self.size+1, self.memory_size) ``` 如果索引大于内存,那么我们通过减去内存大小来刷新索引: ```py if self.current_index >= self.memory_size: self.current_index -= self.memory_size ``` 接下来,我们定义一个`sample`函数,用于对小批量经验进行采样: ```py def sample(self): if self.size < self.minibatch_size: return [] # First we randomly sample some indices samples_index = np.floor(np.random.random((self.minibatch_size,))*self.size) # select the experience from the sampled indexed samples = [self.experience[int(i)] for i in samples_index] return samples ``` # 训练网络 现在,我们将看到如何训练网络。 首先,我们定义`DQN`类并在`__init__`方法中初始化所有变量: ```py class DQN(object): def __init__(self, state_size, action_size, session, summary_writer = None, exploration_period = 1000, minibatch_size = 32, discount_factor = 0.99, experience_replay_buffer = 10000, target_qnet_update_frequency = 10000, initial_exploration_epsilon = 1.0, final_exploration_epsilon = 0.05, reward_clipping = -1, ): ``` 初始化所有变量: ```py self.state_size = state_size self.action_size = action_size self.session = session self.exploration_period = float(exploration_period) self.minibatch_size = minibatch_size self.discount_factor = tf.constant(discount_factor) self.experience_replay_buffer = experience_replay_buffer self.summary_writer = summary_writer self.reward_clipping = reward_clipping self.target_qnet_update_frequency = target_qnet_update_frequency self.initial_exploration_epsilon = initial_exploration_epsilon self.final_exploration_epsilon = final_exploration_epsilon self.num_training_steps = 0 ``` 通过为我们的`QNetworkDueling`类创建实例来初始化主要决斗 DQN: ```py self.qnet = QNetworkDueling(self.state_size, self.action_size, "qnet") ``` 同样,初始化目标决斗 DQN: ```py self.target_qnet = QNetworkDueling(self.state_size, self.action_size, "target_qnet") ``` 接下来,将优化器初始化为`RMSPropOptimizer`: ```py self.qnet_optimizer = tf.train.RMSPropOptimizer(learning_rate=0.00025, decay=0.99, epsilon=0.01) ``` 现在,通过为我们的`ReplayMemoryFast`类创建实例来初始化`experience_replay_buffer`: ```py self.experience_replay = ReplayMemoryFast(self.experience_replay_buffer, self.minibatch_size) # Setup the computational graph self.create_graph() ``` 接下来,我们定义`copy_to_target_network`函数,用于将权重从主网络复制到目标网络: ```py def copy_to_target_network(source_network, target_network): target_network_update = [] for v_source, v_target in zip(source_network.variables(), target_network.variables()): # update target network update_op = v_target.assign(v_source) target_network_update.append(update_op) return tf.group(*target_network_update) ``` 现在,我们定义`create_graph`函数并构建我们的计算图: ```py def create_graph(self): ``` 我们计算`q_values`并选择具有最大`q`值的动作: ```py with tf.name_scope("pick_action"): # placeholder for state self.state = tf.placeholder(tf.float32, (None,)+self.state_size , name="state") # placeholder for q values self.q_values = tf.identity(self.qnet(self.state) , name="q_values") # placeholder for predicted actions self.predicted_actions = tf.argmax(self.q_values, dimension=1 , name="predicted_actions") # plot histogram to track max q values tf.histogram_summary("Q values", tf.reduce_mean(tf.reduce_max(self.q_values, 1))) # save max q-values to track learning ``` 接下来,我们计算目标未来奖励: ```py with tf.name_scope("estimating_future_rewards"): self.next_state = tf.placeholder(tf.float32, (None,)+self.state_size , name="next_state") self.next_state_mask = tf.placeholder(tf.float32, (None,) , name="next_state_mask") self.rewards = tf.placeholder(tf.float32, (None,) , name="rewards") self.next_q_values_targetqnet = tf.stop_gradient(self.target_qnet(self.next_state), name="next_q_values_targetqnet") self.next_q_values_qnet = tf.stop_gradient(self.qnet(self.next_state), name="next_q_values_qnet") self.next_selected_actions = tf.argmax(self.next_q_values_qnet, dimension=1) self.next_selected_actions_onehot = tf.one_hot(indices=self.next_selected_actions, depth=self.action_size) self.next_max_q_values = tf.stop_gradient( tf.reduce_sum( tf.mul( self.next_q_values_targetqnet, self.next_selected_actions_onehot ) , reduction_indices=[1,] ) * self.next_state_mask ) self.target_q_values = self.rewards + self.discount_factor*self.next_max_q_values ``` 接下来,我们使用 RMSProp 优化器执行优化: ```py with tf.name_scope("optimization_step"): self.action_mask = tf.placeholder(tf.float32, (None, self.action_size) , name="action_mask") self.y = tf.reduce_sum( self.q_values * self.action_mask , reduction_indices=[1,]) ## ERROR CLIPPING self.error = tf.abs(self.y - self.target_q_values) quadratic_part = tf.clip_by_value(self.error, 0.0, 1.0) linear_part = self.error - quadratic_part self.loss = tf.reduce_mean( 0.5*tf.square(quadratic_part) + linear_part ) # optimize the gradients qnet_gradients = self.qnet_optimizer.compute_gradients(self.loss, self.qnet.variables()) for i, (grad, var) in enumerate(qnet_gradients): if grad is not None: qnet_gradients[i] = (tf.clip_by_norm(grad, 10), var) self.qnet_optimize = self.qnet_optimizer.apply_gradients(qnet_gradients) ``` 将主要网络权重复制到目标网络: ```py with tf.name_scope("target_network_update"): self.hard_copy_to_target = DQN.copy_to_target_network(self.qnet, self.target_qnet) ``` 我们定义了`store`函数,用于将所有经验存储在`experience_replay_buffer`中: ```py def store(self, state, action, reward, next_state, is_terminal): # rewards clipping if self.reward_clipping > 0.0: reward = np.clip(reward, -self.reward_clipping, self.reward_clipping) self.experience_replay.store(state, action, reward, next_state, is_terminal) ``` 我们定义了一个`action`函数,用于使用衰减的`ε`贪婪策略选择动作: ```py def action(self, state, training = False): if self.num_training_steps > self.exploration_period: epsilon = self.final_exploration_epsilon else: epsilon = self.initial_exploration_epsilon - float(self.num_training_steps) * (self.initial_exploration_epsilon - self.final_exploration_epsilon) / self.exploration_period if not training: epsilon = 0.05 if random.random() <= epsilon: action = random.randint(0, self.action_size-1) else: action = self.session.run(self.predicted_actions, {self.state:[state] } )[0] return action ``` 现在,我们定义一个`train`函数来训练我们的网络: ```py def train(self): ``` 将主要网络权重复制到目标网络: ```py if self.num_training_steps == 0: print "Training starts..." self.qnet.copy_to(self.target_qnet) ``` 记忆重放中的示例经验: ```py minibatch = self.experience_replay.sample() ``` 从`minibatch`获取状态,动作,奖励和下一个状态: ```py batch_states = np.asarray( [d[0] for d in minibatch] ) actions = [d[1] for d in minibatch] batch_actions = np.zeros( (self.minibatch_size, self.action_size) ) for i in xrange(self.minibatch_size): batch_actions[i, actions[i]] = 1 batch_rewards = np.asarray( [d[2] for d in minibatch] ) batch_newstates = np.asarray( [d[3] for d in minibatch] ) batch_newstates_mask = np.asarray( [not d[4] for d in minibatch] ) ``` 执行训练操作: ```py scores, _, = self.session.run([self.q_values, self.qnet_optimize], { self.state: batch_states, self.next_state: batch_newstates, self.next_state_mask: batch_newstates_mask, self.rewards: batch_rewards, self.action_mask: batch_actions} ) ``` 更新目标网络权重: ```py if self.num_training_steps % self.target_qnet_update_frequency == 0: self.session.run( self.hard_copy_to_target ) print 'mean maxQ in minibatch: ',np.mean(np.max(scores,1)) str_ = self.session.run(self.summarize, { self.state: batch_states, self.next_state: batch_newstates, self.next_state_mask: batch_newstates_mask, self.rewards: batch_rewards, self.action_mask: batch_actions}) self.summary_writer.add_summary(str_, self.num_training_steps) self.num_training_steps += 1 ``` # 赛车 到目前为止,我们已经看到了如何构建决斗 DQN。 现在,我们将看到在玩赛车游戏时如何利用我们的决斗 DQN。 首先,让我们导入必要的库: ```py import gym import time import logging import os import sys import tensorflow as tf ``` 初始化所有必需的变量: ```py ENV_NAME = 'Seaquest-v0' TOTAL_FRAMES = 20000000 MAX_TRAINING_STEPS = 20 * 60 * 60 / 3 TESTING_GAMES = 30 MAX_TESTING_STEPS = 5 * 60 * 60 / 3 TRAIN_AFTER_FRAMES = 50000 epoch_size = 50000 MAX_NOOP_START = 30 LOG_DIR = 'logs' outdir = 'results' logger = tf.train.SummaryWriter(LOG_DIR) # Intialize tensorflow session session = tf.InteractiveSession() ``` 构建代理: ```py agent = DQN(state_size=env.observation_space.shape, action_size=env.action_space.n, session=session, summary_writer = logger, exploration_period = 1000000, minibatch_size = 32, discount_factor = 0.99, experience_replay_buffer = 1000000, target_qnet_update_frequency = 20000, initial_exploration_epsilon = 1.0, final_exploration_epsilon = 0.1, reward_clipping = 1.0, ) session.run(tf.initialize_all_variables()) logger.add_graph(session.graph) saver = tf.train.Saver(tf.all_variables()) ``` 存储录音: ```py env.monitor.start(outdir+'/'+ENV_NAME,force = True, video_callable=multiples_video_schedule) num_frames = 0 num_games = 0 current_game_frames = 0 init_no_ops = np.random.randint(MAX_NOOP_START+1) last_time = time.time() last_frame_count = 0.0 state = env.reset() ``` 现在,让我们开始培训: ```py while num_frames <= TOTAL_FRAMES+1: if test_mode: env.render() num_frames += 1 current_game_frames += 1 ``` 给定当前状态,选择操作: ```py action = agent.action(state, training = True) ``` 在环境上执行操作,接收`reward`,然后移至`next_state`: ```py next_state,reward,done,_ = env.step(action) ``` 将此转移信息存储在`experience_replay_buffer`中: ```py if current_game_frames >= init_no_ops: agent.store(state,action,reward,next_state,done) state = next_state ``` 培训代理商: ```py if num_frames>=TRAIN_AFTER_FRAMES: agent.train() if done or current_game_frames > MAX_TRAINING_STEPS: state = env.reset() current_game_frames = 0 num_games += 1 init_no_ops = np.random.randint(MAX_NOOP_START+1) ``` 在每个时期之后保存网络参数: ```py if num_frames % epoch_size == 0 and num_frames > TRAIN_AFTER_FRAMES: saver.save(session, outdir+"/"+ENV_NAME+"/model_"+str(num_frames/1000)+"k.ckpt") print "epoch: frames=",num_frames," games=",num_games ``` 我们每两个时期测试一次性能: ```py if num_frames % (2*epoch_size) == 0 and num_frames > TRAIN_AFTER_FRAMES: total_reward = 0 avg_steps = 0 for i in xrange(TESTING_GAMES): state = env.reset() init_no_ops = np.random.randint(MAX_NOOP_START+1) frm = 0 while frm < MAX_TESTING_STEPS: frm += 1 env.render() action = agent.action(state, training = False) if current_game_frames < init_no_ops: action = 0 state,reward,done,_ = env.step(action) total_reward += reward if done: break avg_steps += frm avg_reward = float(total_reward)/TESTING_GAMES str_ = session.run( tf.scalar_summary('test reward ('+str(epoch_size/1000)+'k)', avg_reward) ) logger.add_summary(str_, num_frames) state = env.reset() env.monitor.close() ``` 我们可以看到代理如何学习赢得赛车游戏,如以下屏幕截图所示: ![](img/00407.gif) # 概要 在本章中,我们学习了如何详细实现决斗 DQN。 我们从用于游戏画面预处理的基本环境包装器功能开始,然后定义了`QNetworkDueling`类。 在这里,我们实现了决斗 Q 网络,该网络将 DQN 的最终全连接层分为值流和优势流,然后将这两个流组合以计算`q`值。 之后,我们看到了如何创建回放缓冲区,该缓冲区用于存储经验并为网络训练提供经验的小批量样本,最后,我们使用 OpenAI 的 Gym 初始化了赛车环境并训练了我们的代理。 在下一章第 13 章,“最新进展和后续步骤”中,我们将看到 RL 的一些最新进展。 # 问题 问题列表如下: 1. DQN 和决斗 DQN 有什么区别? 2. 编写用于回放缓冲区的 Python 代码。 3. 什么是目标网络? 4. 编写 Python 代码以获取优先级的经验回放缓冲区。 5. 创建一个 Python 函数来衰减`ε`贪婪策略。 6. 决斗 DQN 与双 DQN 有何不同? 7. 创建用于将主要网络权重更新为目标网络的 Python 函数。 # 进一步阅读 以下链接将帮助您扩展知识: * [**使用 DQN 的飞扬的小鸟**](https://github.com/yenchenlin/DeepLearningFlappyBird) * [**使用 DQN 的超级马里奥**](https://github.com/JSDanielPark/tensorflow_dqn_supermario)