未验证 提交 91beee07 编写于 作者: H Hongsheng Zeng 提交者: GitHub

Alphazero baseline for the Kaggle ConnectX competition (#284)

* version 0.1

* fix bug

* Update README.md

* Update README.md

* fix yapf
Co-authored-by: NTomorrowIsAnOtherDay <2466956298@qq.com>
上级 1dc5f30f
# Third party code
#
# The following code are copied or modified from:
# https://github.com/suragnair/alpha-zero-general
from tqdm import tqdm
from parl.utils import logger
class Arena():
"""
An Arena class where any 2 agents can be pit against each other.
"""
def __init__(self, player1, player2, game, display=None):
"""
Input:
player 1,2: two functions that takes board as input, return action
game: Game object
display: a function that takes board as input and prints it (e.g.
display in othello/OthelloGame). Is necessary for verbose
mode.
see othello/OthelloPlayers.py for an example. See pit.py for pitting
human players/other baselines with each other.
"""
self.player1 = player1
self.player2 = player2
self.game = game
self.display = display
def playGame(self, verbose=False):
"""
Executes one episode of a game.
Returns:
either
winner: player who won the game (1 if player1, -1 if player2)
or
draw result returned from the game that is neither 1, -1, nor 0.
"""
players = [self.player2, None, self.player1]
curPlayer = 1
board = self.game.getInitBoard()
it = 0
while self.game.getGameEnded(board, curPlayer) == 0:
it += 1
if verbose:
assert self.display
print("Turn ", str(it), "Player ", str(curPlayer))
self.display(board)
action = players[curPlayer + 1](self.game.getCanonicalForm(
board, curPlayer))
valids = self.game.getValidMoves(
self.game.getCanonicalForm(board, curPlayer), 1)
if valids[action] == 0:
logger.error('Action {} is not valid!'.format(action))
logger.debug('valids = {}'.format(valids))
assert valids[action] > 0
board, curPlayer = self.game.getNextState(board, curPlayer, action)
if verbose:
assert self.display
print("Game over: Turn ", str(it), "Result ",
str(self.game.getGameEnded(board, 1)))
self.display(board)
return curPlayer * self.game.getGameEnded(board, curPlayer)
def playGames(self, num, verbose=False):
"""
Plays num games in which player1 starts num/2 games and player2 starts
num/2 games.
Returns:
oneWon: games won by player1
twoWon: games won by player2
draws: games won by nobody
"""
num = int(num / 2)
oneWon = 0
twoWon = 0
draws = 0
for _ in tqdm(range(num), desc="Arena.playGames (1)"):
gameResult = self.playGame(verbose=verbose)
if gameResult == 1:
oneWon += 1
elif gameResult == -1:
twoWon += 1
else:
draws += 1
self.player1, self.player2 = self.player2, self.player1
for _ in tqdm(range(num), desc="Arena.playGames (2)"):
gameResult = self.playGame(verbose=verbose)
if gameResult == -1:
oneWon += 1
elif gameResult == 1:
twoWon += 1
else:
draws += 1
return oneWon, twoWon, draws
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import threading
import queue
import pickle
from pickle import Pickler, Unpickler
from random import shuffle
from parl.utils import tensorboard
import numpy as np
from tqdm import tqdm
import parl
from parl.utils import logger
from actor import Actor
from utils import split_group, get_test_dataset
from alphazero_agent import create_agent
class Coach():
"""
This class executes the self-play, learning and evaluating.
"""
def __init__(self, game, args):
self.game = game
self.args = args
# neural network of current generation
self.current_agent = create_agent(self.game)
# neural network of previous generation
self.previous_agent = create_agent(self.game)
# history of examples from args.numItersForTrainExamplesHistory latest iterations
self.trainExamplesHistory = []
self.remote_actors_signal_queues = []
self.remote_actors_return_queue = queue.Queue()
self.test_dataset = get_test_dataset()
def _run_remote_tasks(self, signal_queue):
# The remote actor will actually run on the local machine or other machines of xparl cluster
remote_actor = Actor(self.game, self.args)
while True:
# receive running task signal
# signal: specify task type and task input data (optional)
signal = signal_queue.get()
if signal["task"] == "self-play":
episode_num_each_actor = self.args.numEps // self.args.actors_num
result = remote_actor.self_play(
self.current_agent.get_weights(), episode_num_each_actor)
self.remote_actors_return_queue.put({"self-play": result})
elif signal["task"] == "pitting":
games_num_each_actor = self.args.arenaCompare // self.args.actors_num
result = remote_actor.pitting(
self.previous_agent.get_weights(),
self.current_agent.get_weights(), games_num_each_actor)
self.remote_actors_return_queue.put({"pitting": result})
elif signal["task"] == "evaluate_test_dataset":
test_dataset = signal["test_dataset"]
result = remote_actor.evaluate_test_dataset(
self.current_agent.get_weights(), test_dataset)
self.remote_actors_return_queue.put({
"evaluate_test_dataset":
result
})
else:
raise NotImplementedError
def _create_remote_actors(self):
# connect to xparl cluster to submit jobs
parl.connect(self.args.master_address)
for i in range(self.args.actors_num):
signal_queue = queue.Queue()
self.remote_actors_signal_queues.append(signal_queue)
remote_thread = threading.Thread(
target=self._run_remote_tasks, args=(signal_queue, ))
remote_thread.setDaemon(True)
remote_thread.start()
def learn(self):
"""Each iteration:
1. Performs numEps episodes of self-play.
2. Retrains neural network with examples in trainExamplesHistory
(which has a maximum length of numItersForTrainExamplesHistory).
3. Evaluates the new neural network with the test dataset.
4. Pits the new neural network against the old one and accepts it
only if it wins >= updateThreshold fraction of games.
"""
# create remote actors to run tasks (self-play/pitting/evaluate_test_dataset) in parallel.
self._create_remote_actors()
for iteration in range(1, self.args.numIters + 1):
logger.info('Starting Iter #{} ...'.format(iteration))
####################
logger.info('Step1: self-play in parallel...')
iterationTrainExamples = []
# update weights of remote actors to the latest weights, and ask them to run self-play task
for signal_queue in self.remote_actors_signal_queues:
signal_queue.put({"task": "self-play"})
# wait for all remote actors (a total of self.args.actors_num) to return the self-play results
for _ in range(self.args.actors_num):
result = self.remote_actors_return_queue.get()
iterationTrainExamples.extend(result["self-play"])
# save the iteration examples to the history
self.trainExamplesHistory.append(iterationTrainExamples)
if len(self.trainExamplesHistory
) > self.args.numItersForTrainExamplesHistory:
logger.warning("Removing the oldest entry in trainExamples.")
self.trainExamplesHistory.pop(0)
self.saveTrainExamples(iteration) # backup history to a file
####################
logger.info('Step2: train neural network...')
# shuffle examples before training
trainExamples = []
for e in self.trainExamplesHistory:
trainExamples.extend(e)
shuffle(trainExamples)
# training new network, keeping a copy of the old one
self.current_agent.save(
os.path.join(self.args.checkpoint, 'temp.pth.tar'))
self.previous_agent.restore(
os.path.join(self.args.checkpoint, 'temp.pth.tar'))
self.current_agent.learn(trainExamples)
####################
logger.info('Step3: evaluate test dataset in parallel...')
cnt = 0
# update weights of remote actors to the latest weights, and ask them to evaluate assigned test dataset
for i, data in enumerate(
split_group(
self.test_dataset,
len(self.test_dataset) // self.args.actors_num)):
self.remote_actors_signal_queues[i].put({
"task":
"evaluate_test_dataset",
"test_dataset":
data
})
cnt += len(data)
perfect_moves_cnt, good_moves_cnt = 0, 0
# wait for all remote actors (a total of self.args.actors_num) to return the evaluating results
for _ in range(self.args.actors_num):
(perfect_moves,
good_moves) = self.remote_actors_return_queue.get(
)["evaluate_test_dataset"]
perfect_moves_cnt += perfect_moves
good_moves_cnt += good_moves
logger.info('perfect moves rate: {}, good moves rate: {}'.format(
perfect_moves_cnt / cnt, good_moves_cnt / cnt))
tensorboard.add_scalar('perfect_moves_rate',
perfect_moves_cnt / cnt, iteration)
tensorboard.add_scalar('good_moves_rate', good_moves_cnt / cnt,
iteration)
####################
logger.info(
'Step4: pitting against previous generation in parallel...')
# transfer weights of previous generation and current generation to the remote actors, and ask them to pit.
for signal_queue in self.remote_actors_signal_queues:
signal_queue.put({"task": "pitting"})
previous_wins, current_wins, draws = 0, 0, 0
for _ in range(self.args.actors_num):
(pwins_, cwins_,
draws_) = self.remote_actors_return_queue.get()["pitting"]
previous_wins += pwins_
current_wins += cwins_
draws += draws_
logger.info('NEW/PREV WINS : %d / %d ; DRAWS : %d' %
(current_wins, previous_wins, draws))
if previous_wins + current_wins == 0 or float(current_wins) / (
previous_wins + current_wins) < self.args.updateThreshold:
logger.info('REJECTING NEW MODEL')
self.current_agent.restore(
os.path.join(self.args.checkpoint, 'temp.pth.tar'))
else:
logger.info('ACCEPTING NEW MODEL')
self.current_agent.save(
os.path.join(self.args.checkpoint, 'best.pth.tar'))
self.current_agent.save(
os.path.join(self.args.checkpoint,
self.getCheckpointFile(iteration)))
def getCheckpointFile(self, iteration):
return 'checkpoint_' + str(iteration) + '.pth.tar'
def saveTrainExamples(self, iteration):
folder = self.args.checkpoint
if not os.path.exists(folder):
os.makedirs(folder)
filename = os.path.join(
folder,
self.getCheckpointFile(iteration) + ".examples")
with open(filename, "wb+") as f:
Pickler(f).dump(self.trainExamplesHistory)
f.closed
def loadModel(self):
self.current_agent.restore(
os.path.join(self.args.load_folder_file[0],
self.args.load_folder_file[1]))
def loadTrainExamples(self):
modelFile = os.path.join(self.args.load_folder_file[0],
self.args.load_folder_file[1])
examplesFile = modelFile + ".examples"
if not os.path.isfile(examplesFile):
logger.warning(
"File {} with trainExamples not found!".format(examplesFile))
r = input("Continue? [y|n]")
if r != "y":
sys.exit()
else:
logger.info("File with trainExamples found. Loading it...")
with open(examplesFile, "rb") as f:
self.trainExamplesHistory = Unpickler(f).load()
logger.info('Loading done!')
# Third party code
#
# The following code are copied or modified from:
# https://github.com/suragnair/alpha-zero-general
import math
import time
import numpy as np
EPS = 1e-8
class MCTS():
"""
This class handles the MCTS tree.
"""
def __init__(self, game, nn_agent, args, dirichlet_noise=False):
self.game = game
self.nn_agent = nn_agent
self.args = args
self.dirichlet_noise = dirichlet_noise
self.Qsa = {} # stores Q values for s,a (as defined in the paper)
self.Nsa = {} # stores #times edge s,a was visited
self.Ns = {} # stores #times board s was visited
self.Ps = {} # stores initial policy (returned by neural net)
self.Es = {} # stores game.getGameEnded ended for board s
self.Vs = {} # stores game.getValidMoves for board s
def getActionProb(self, canonicalBoard, temp=1):
"""
This function performs numMCTSSims simulations of MCTS starting from
canonicalBoard.
Returns:
probs: a policy vector where the probability of the ith action is
proportional to Nsa[(s,a)]**(1./temp)
"""
for i in range(self.args.numMCTSSims):
dir_noise = (i == 0 and self.dirichlet_noise)
self.search(canonicalBoard, dirichlet_noise=dir_noise)
s = self.game.stringRepresentation(canonicalBoard)
counts = [
self.Nsa[(s, a)] if (s, a) in self.Nsa else 0
for a in range(self.game.getActionSize())
]
if temp == 0:
bestAs = np.array(np.argwhere(counts == np.max(counts))).flatten()
bestA = np.random.choice(bestAs)
probs = [0] * len(counts)
probs[bestA] = 1
return probs
counts = [x**(1. / temp) for x in counts]
counts_sum = float(sum(counts))
probs = [x / counts_sum for x in counts]
return probs
def search(self, canonicalBoard, dirichlet_noise=False):
"""
This function performs one iteration of MCTS. It is recursively called
till a leaf node is found. The action chosen at each node is one that
has the maximum upper confidence bound as in the paper.
Once a leaf node is found, the neural network is called to return an
initial policy P and a value v for the state. This value is propagated
up the search path. In case the leaf node is a terminal state, the
outcome is propagated up the search path. The values of Ns, Nsa, Qsa are
updated.
NOTE: the return values are the negative of the value of the current
state. This is done since v is in [-1,1] and if v is the value of a
state for the current player, then its value is -v for the other player.
Returns:
v: the negative of the value of the current canonicalBoard
"""
s = self.game.stringRepresentation(canonicalBoard)
if s not in self.Es:
self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)
if self.Es[s] != 0:
# terminal node
return -self.Es[s]
if s not in self.Ps:
# leaf node
self.Ps[s], v = self.nn_agent.predict(canonicalBoard)
valids = self.game.getValidMoves(canonicalBoard, 1)
self.Ps[s] = self.Ps[s] * valids # masking invalid moves
if dirichlet_noise:
self.applyDirNoise(s, valids)
sum_Ps_s = np.sum(self.Ps[s])
if sum_Ps_s > 0:
self.Ps[s] /= sum_Ps_s # renormalize
else:
# if all valid moves were masked make all valid moves equally probable
# NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
# If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.
print("All valid moves were masked, doing a workaround.")
self.Ps[s] = self.Ps[s] + valids
self.Ps[s] /= np.sum(self.Ps[s])
self.Vs[s] = valids
self.Ns[s] = 0
return -v
valids = self.Vs[s]
if dirichlet_noise:
self.applyDirNoise(s, valids)
sum_Ps_s = np.sum(self.Ps[s])
self.Ps[s] /= sum_Ps_s # renormalize
cur_best = -float('inf')
best_act = -1
# pick the action with the highest upper confidence bound
for a in range(self.game.getActionSize()):
if valids[a]:
if (s, a) in self.Qsa:
u = self.Qsa[
(s, a)] + self.args.cpuct * self.Ps[s][a] * math.sqrt(
self.Ns[s]) / (1 + self.Nsa[(s, a)])
else:
u = self.args.cpuct * self.Ps[s][a] * math.sqrt(
self.Ns[s] + EPS) # Q = 0 ?
if u > cur_best:
cur_best = u
best_act = a
a = best_act
next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)
next_s = self.game.getCanonicalForm(next_s, next_player)
v = self.search(next_s)
if (s, a) in self.Qsa:
self.Qsa[(s, a)] = (self.Nsa[(s, a)] * self.Qsa[
(s, a)] + v) / (self.Nsa[(s, a)] + 1)
self.Nsa[(s, a)] += 1
else:
self.Qsa[(s, a)] = v
self.Nsa[(s, a)] = 1
self.Ns[s] += 1
return -v
def applyDirNoise(self, s, valids):
dir_values = np.random.dirichlet(
[self.args.dirichletAlpha] * np.count_nonzero(valids))
dir_idx = 0
for idx in range(len(self.Ps[s])):
if self.Ps[s][idx]:
self.Ps[s][idx] = (0.75 * self.Ps[s][idx]) + (
0.25 * dir_values[dir_idx])
dir_idx += 1
## AlphaZero baseline for Connect4 game (distributed version)
- In this example, we provide a fine-tuned AlphaZero baseline to solve the Connect4 game, based on the code of [alpha-zero-general](https://github.com/suragnair/alpha-zero-general) repo.
- We take advantage of the parallelism capacity of [PARL](https://github.com/PaddlePaddle/PARL) to support running self-play and evaluating tasks in parallel.
- We also provide scripts to pack your well-trained model to a submission file, which can be submitted to the Kaggle [Connect X](https://www.kaggle.com/c/connectx/leaderboard) competition directly.
### Dependencies
- python3
- [parl==1.3](https://github.com/PaddlePaddle/PARL)
- torch
- tqdm
### Training
1. Download the [1k connect4 validation set](https://www.kaggle.com/petercnudde/1k-connect4-validation-set) to the current directory. (filename: `refmoves1k_kaggle`)
2. Start xparl cluster
```bash
# You can change following `cpu_num` and `args.actor_nums` in the main.py
# based on the CPU number of your machine.
xparl start --port 8010 --cpu_num 25
```
```bash
# [OPTIONAL] You can also run the following script in other machines to add more CPU resource
# to the xparl cluster, so you can increase the parallelism (args.actor_nums).
xparl connect --address MASTER_IP:8010 --cpu_num [CPU_NUM]
```
3. Run training script
```bash
python main.py
```
4. Visualize (good moves rate and perfect moves rate)
```
tensorboard --logdir .
```
### Submitting
To submit the well-trained model to the Kaggle, you can use our provided script to generate `submission.py`, for example:
```bash
python gen_submission.py saved_model/best.pth.tar
```
### Performance
- Following are `good moves rate` and `perfect moves rate` indicators in tensorbaord, please refer to the [link](https://www.kaggle.com/petercnudde/scoring-connect-x-agents) for specific meaning.
<img src=".pic/good_moves.png" width = "300" alt="good moves rate"/> <img src=".pic/perfect_moves.png" width = "300" alt="perfect moves rate"/>
> It takes about 1 day to run 25 iterations on the machine with 25 cpus.
- It can reach about score 1368 (rank 5 on 2020/06/04) in the Kaggle [Connect X](https://www.kaggle.com/c/connectx/leaderboard) competition.
### Reference
- [suragnair/alpha-zero-general](https://github.com/suragnair/alpha-zero-general)
- [Scoring connect-x agents](https://www.kaggle.com/petercnudde/scoring-connect-x-agents)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import parl
import os
from alphazero_agent import create_agent
from MCTS import MCTS
from Arena import Arena
from utils import win_loss_draw
@parl.remote_class
class Actor(object):
def __init__(self, game, args):
os.environ['OMP_NUM_THREADS'] = "1"
self.game = game
self.args = args
# neural network of previous generation
self.previous_agent = create_agent(self.game, cuda=False)
# neural network of current generation
self.current_agent = create_agent(self.game, cuda=False)
# MCTS of previous generation
self.previous_mcts = MCTS(
self.game, self.previous_agent, self.args, dirichlet_noise=True)
# MCTS of current generation
self.current_mcts = MCTS(
self.game, self.current_agent, self.args, dirichlet_noise=True)
def self_play(self, current_weights, game_num):
"""Collecting training data by self-play.
Args:
current_weights (numpy.array): latest weights of neural network
game_num (int): game number of self-play
Returns:
train_examples (list): examples of the form (canonicalBoard, currPlayer, pi,v)
"""
# update weights of current neural network with latest weights
self.current_agent.set_weights(current_weights)
train_examples = []
for _ in range(game_num):
# reset node state of MCTS
self.current_mcts = MCTS(
self.game, self.current_agent, self.args, dirichlet_noise=True)
train_examples.extend(self._executeEpisode())
return train_examples
def pitting(self, previous_weights, current_weights, games_num):
"""Fighting between previous generation agent and current generation agent
Args:
previous_weights (numpy.array): weights of previous generation neural network
current_weights (numpy.array): weights of current generation neural network
game_num (int): game number of fighting
Returns:
tuple of (game number of previous agent won, game number of current agent won, game number of draw)
"""
# update weights of previous and current neural network
self.previous_agent.set_weights(previous_weights)
self.current_agent.set_weights(current_weights)
# reset node state of MCTS
self.previous_mcts = MCTS(self.game, self.previous_agent, self.args)
self.current_mcts = MCTS(self.game, self.current_agent, self.args)
arena = Arena(
lambda x: np.argmax(self.previous_mcts.getActionProb(x, temp=0)),
lambda x: np.argmax(self.current_mcts.getActionProb(x, temp=0)),
self.game)
previous_wins, current_wins, draws = arena.playGames(games_num)
return (previous_wins, current_wins, draws)
def evaluate_test_dataset(self, current_weights, test_dataset):
"""Evaluate performance of latest neural nerwork
Args:
current_weights (numpy.array): latest weights of neural network
test_dataset (list): game number of self-play
Returns:
tuple of (number of perfect moves, number of good moves)
"""
# update weights of current neural network with latest weights
self.current_agent.set_weights(current_weights)
perfect_move_count, good_move_count = 0, 0
for data in test_dataset:
self.current_mcts = MCTS(self.game, self.current_agent, self.args)
x = self.game.getCanonicalForm(data['board'], data['player'])
agent_move = int(
np.argmax(self.current_mcts.getActionProb(x, temp=0)))
moves = data["move_score"]
perfect_score = max(moves)
perfect_moves = [i for i in range(7) if moves[i] == perfect_score]
if agent_move in perfect_moves:
perfect_move_count += 1
if win_loss_draw(
moves[agent_move]) == win_loss_draw(perfect_score):
good_move_count += 1
return (perfect_move_count, good_move_count)
def _executeEpisode(self):
"""
This function executes one episode of self-play, starting with player 1.
As the game goes on, each turn is added as a training example to
trainExamples. The game is played till the game ends. After the game
ends, the outcome of the game is used to assign values to each example
in trainExamples.
It uses a temp=1 if episodeStep < tempThresholdStep, and thereafter
uses temp=0.
Returns:
trainExamples: a list of examples of the form (canonicalBoard, currPlayer, pi,v)
pi is the MCTS informed policy vector, v is +1 if
the player eventually won the game, else -1.
"""
trainExamples = []
board = self.game.getInitBoard()
self.curPlayer = 1
episodeStep = 0
while True:
episodeStep += 1
canonicalBoard = self.game.getCanonicalForm(board, self.curPlayer)
temp = int(episodeStep < self.args.tempThresholdStep)
pi = self.current_mcts.getActionProb(canonicalBoard, temp=temp)
sym = self.game.getSymmetries(canonicalBoard, pi)
for b, p in sym: # board, pi
trainExamples.append([b, self.curPlayer, p, None])
action = np.random.choice(len(pi), p=pi)
board, self.curPlayer = self.game.getNextState(
board, self.curPlayer, action)
r = self.game.getGameEnded(board, self.curPlayer)
if r != 0:
return [(x[0], x[2], r * ((-1)**(x[1] != self.curPlayer)))
for x in trainExamples]
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import parl
import torch
import torch.optim as optim
from tqdm import tqdm
from utils import *
from connect4_model import Connect4Model
args = dotdict({
'lr': 0.001,
'dropout': 0.3,
'epochs': 5,
'batch_size': 64,
'num_channels': 64,
})
class AlphaZero(parl.Algorithm):
def __init__(self, model):
self.model = model
def learn(self, boards, target_pis, target_vs, optimizer):
self.model.train() # train mode
# compute model output
out_log_pi, out_v = self.model(boards)
pi_loss = -torch.sum(target_pis * out_log_pi) / target_pis.size()[0]
v_loss = torch.sum(
(target_vs - out_v.view(-1))**2) / target_vs.size()[0]
total_loss = pi_loss + v_loss
# compute gradient and do SGD step
optimizer.zero_grad()
total_loss.backward()
optimizer.step()
return total_loss, pi_loss, v_loss
def predict(self, board):
self.model.eval() # eval mode
with torch.no_grad():
log_pi, v = self.model(board)
pi = torch.exp(log_pi)
return pi, v
def create_agent(game, cuda=True):
cuda = cuda and torch.cuda.is_available()
model = Connect4Model(game, args)
if cuda:
model.cuda()
algorithm = AlphaZero(model)
alphazero_agent = AlphaZeroAgent(algorithm, game, cuda)
return alphazero_agent
class AlphaZeroAgent(parl.Agent):
def __init__(self, algorithm, game, cuda):
super(AlphaZeroAgent, self).__init__(algorithm)
self.cuda = cuda
self.board_x, self.board_y = game.getBoardSize()
self.action_size = game.getActionSize()
def learn(self, examples):
"""
Args:
examples: list of examples, each example is of form (board, pi, v)
"""
optimizer = optim.Adam(self.algorithm.model.parameters(), lr=args.lr)
for epoch in range(args.epochs):
print('EPOCH ::: ' + str(epoch + 1))
batch_count = int(len(examples) / args.batch_size)
pbar = tqdm(range(batch_count), desc='Training Net')
for _ in pbar:
sample_ids = np.random.randint(
len(examples), size=args.batch_size)
boards, pis, vs = list(zip(*[examples[i] for i in sample_ids]))
boards = torch.FloatTensor(np.array(boards).astype(np.float64))
target_pis = torch.FloatTensor(np.array(pis))
target_vs = torch.FloatTensor(np.array(vs).astype(np.float64))
if self.cuda:
boards, target_pis, target_vs = boards.contiguous().cuda(
), target_pis.contiguous().cuda(), target_vs.contiguous(
).cuda()
total_loss, pi_loss, v_loss = self.algorithm.learn(
boards, target_pis, target_vs, optimizer)
# record loss with tqdm
pbar.set_postfix(Loss_pi=pi_loss.item(), Loss_v=v_loss.item())
def predict(self, board):
"""
Args:
board (np.array): input board
Return:
pi (np.array): probability of actions
v (np.array): estimated value of input
"""
# preparing input
board = torch.FloatTensor(board.astype(np.float64))
if self.cuda:
board = board.contiguous().cuda()
board = board.view(1, self.board_x, self.board_y)
pi, v = self.algorithm.predict(board)
return pi.data.cpu().numpy()[0], v.data.cpu().numpy()[0]
def create_agent(game, cuda=True):
cuda = cuda and torch.cuda.is_available()
model = Connect4Model(game, args)
if cuda:
model.cuda()
algorithm = AlphaZero(model)
alphazero_agent = AlphaZeroAgent(algorithm, game, cuda)
return alphazero_agent
# Third party code
#
# The following code are copied or modified from:
# https://github.com/suragnair/alpha-zero-general
import numpy as np
from collections import namedtuple
DEFAULT_HEIGHT = 6
DEFAULT_WIDTH = 7
DEFAULT_WIN_LENGTH = 4
WinState = namedtuple('WinState', 'is_ended winner')
class Board():
"""
Connect4 Board.
"""
def __init__(self,
height=None,
width=None,
win_length=None,
np_pieces=None):
"Set up initial board configuration."
self.height = height or DEFAULT_HEIGHT
self.width = width or DEFAULT_WIDTH
self.win_length = win_length or DEFAULT_WIN_LENGTH
if np_pieces is None:
self.np_pieces = np.zeros([self.height, self.width], dtype=np.int)
else:
self.np_pieces = np_pieces
assert self.np_pieces.shape == (self.height, self.width)
def add_stone(self, column, player):
"Create copy of board containing new stone."
available_idx, = np.where(self.np_pieces[:, column] == 0)
if len(available_idx) == 0:
raise ValueError(
"Can't play column %s on board %s" % (column, self))
self.np_pieces[available_idx[-1]][column] = player
def get_valid_moves(self):
"Any zero value in top row in a valid move"
return self.np_pieces[0] == 0
def get_win_state(self):
for player in [-1, 1]:
player_pieces = self.np_pieces == -player
# Check rows & columns for win
if (self._is_straight_winner(player_pieces)
or self._is_straight_winner(player_pieces.transpose())
or self._is_diagonal_winner(player_pieces)):
return WinState(True, -player)
# draw has very little value.
if not self.get_valid_moves().any():
return WinState(True, None)
# Game is not ended yet.
return WinState(False, None)
def with_np_pieces(self, np_pieces):
"""Create copy of board with specified pieces."""
if np_pieces is None:
np_pieces = self.np_pieces
return Board(self.height, self.width, self.win_length, np_pieces)
def _is_diagonal_winner(self, player_pieces):
"""Checks if player_pieces contains a diagonal win."""
win_length = self.win_length
for i in range(len(player_pieces) - win_length + 1):
for j in range(len(player_pieces[0]) - win_length + 1):
if all(player_pieces[i + x][j + x] for x in range(win_length)):
return True
for j in range(win_length - 1, len(player_pieces[0])):
if all(player_pieces[i + x][j - x] for x in range(win_length)):
return True
return False
def _is_straight_winner(self, player_pieces):
"""Checks if player_pieces contains a vertical or horizontal win."""
run_lengths = [
player_pieces[:, i:i + self.win_length].sum(axis=1)
for i in range(len(player_pieces) - self.win_length + 2)
]
return max([x.max() for x in run_lengths]) >= self.win_length
def __str__(self):
return str(self.np_pieces)
class Connect4Game(object):
"""
Connect4 Game class implementing the alpha-zero-general Game interface.
Use 1 for player1 and -1 for player2.
"""
def __init__(self,
height=None,
width=None,
win_length=None,
np_pieces=None):
self._base_board = Board(height, width, win_length, np_pieces)
def getInitBoard(self):
"""
Returns:
startBoard: a representation of the board (ideally this is the form
that will be the input to your neural network)
"""
return self._base_board.np_pieces
def getBoardSize(self):
"""
Returns:
(x,y): a tuple of board dimensions
"""
return (self._base_board.height, self._base_board.width)
def getActionSize(self):
"""
Returns:
actionSize: number of all possible actions
"""
return self._base_board.width
def getNextState(self, board, player, action):
"""Returns a copy of the board with updated move, original board is unmodified.
Input:
board: current board
player: current player (1 or -1)
action: action taken by current player
Returns:
nextBoard: board after applying action
nextPlayer: player who plays in the next turn (should be -player)
"""
b = self._base_board.with_np_pieces(np_pieces=np.copy(board))
b.add_stone(action, player)
return b.np_pieces, -player
def getValidMoves(self, board, player):
"""Any zero value in top row in a valid move.
Input:
board: current board
player: current player
Returns:
validMoves: a binary vector of length self.getActionSize(), 1 for
moves that are valid from the current board and player,
0 for invalid moves
"""
return self._base_board.with_np_pieces(
np_pieces=board).get_valid_moves()
def getGameEnded(self, board, player):
"""
Input:
board: current board
player: current player (1 or -1)
Returns:
r: 0 if game has not ended. 1 if player won, -1 if player lost,
small non-zero value for draw.
"""
b = self._base_board.with_np_pieces(np_pieces=board)
winstate = b.get_win_state()
if winstate.is_ended:
if winstate.winner is None:
# draw has very little value.
return 1e-4
elif winstate.winner == player:
return +1
elif winstate.winner == -player:
return -1
else:
raise ValueError('Unexpected winstate found: ', winstate)
else:
# 0 used to represent unfinished game.
return 0
def getCanonicalForm(self, board, player):
"""
Input:
board: current board
player: current player (1 or -1)
Returns:
canonicalBoard: returns canonical form of board. The canonical form
should be independent of player. For e.g. in chess,
the canonical form can be chosen to be from the pov
of white. When the player is white, we can return
board as is. When the player is black, we can invert
the colors and return the board.
"""
return board * player
def getSymmetries(self, board, pi):
"""Board is left/right board symmetric
Input:
board: current board
pi: policy vector of size self.getActionSize()
Returns:
symmForms: a list of [(board,pi)] where each tuple is a symmetrical
form of the board and the corresponding pi vector. This
is used when training the neural network from examples.
"""
return [(board, pi),
(np.array(board[:, ::-1], copy=True),
np.array(pi[::-1], copy=True))]
def stringRepresentation(self, board):
"""
Input:
board: current board
Returns:
boardString: a quick conversion of board to a string format.
Required by MCTS for hashing.
"""
return board.tostring()
@staticmethod
def display(board):
print(" -----------------------")
print(' '.join(map(str, range(len(board[0])))))
print(board)
print(" -----------------------")
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import parl
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
class Connect4Model(parl.Model):
def __init__(self, game, args):
# game params
self.board_x, self.board_y = game.getBoardSize()
self.action_size = game.getActionSize()
self.args = args
super(Connect4Model, self).__init__()
self.conv1 = nn.Conv2d(1, args.num_channels, 3, stride=1, padding=1)
self.conv2 = nn.Conv2d(
args.num_channels, args.num_channels, 3, stride=1, padding=1)
self.conv3 = nn.Conv2d(
args.num_channels, args.num_channels, 3, stride=1)
self.conv4 = nn.Conv2d(
args.num_channels, args.num_channels, 3, stride=1)
self.bn1 = nn.BatchNorm2d(args.num_channels)
self.bn2 = nn.BatchNorm2d(args.num_channels)
self.bn3 = nn.BatchNorm2d(args.num_channels)
self.bn4 = nn.BatchNorm2d(args.num_channels)
self.fc1 = nn.Linear(
args.num_channels * (self.board_x - 4) * (self.board_y - 4), 128)
self.fc_bn1 = nn.BatchNorm1d(128)
self.fc2 = nn.Linear(128, 64)
self.fc_bn2 = nn.BatchNorm1d(64)
self.fc3 = nn.Linear(64, self.action_size)
self.fc4 = nn.Linear(64, 1)
def forward(self, s):
"""
Args:
s(torch.Tensor): batch_size x board_x x board_y
"""
# batch_size x 1 x board_x x board_y
s = s.view(-1, 1, self.board_x, self.board_y)
# batch_size x num_channels x board_x x board_y
s = F.relu(self.bn1(self.conv1(s)))
# batch_size x num_channels x board_x x board_y
s = F.relu(self.bn2(self.conv2(s)))
# batch_size x num_channels x (board_x-2) x (board_y-2)
s = F.relu(self.bn3(self.conv3(s)))
# batch_size x num_channels x (board_x-4) x (board_y-4)
s = F.relu(self.bn4(self.conv4(s)))
s = s.view(
-1,
self.args.num_channels * (self.board_x - 4) * (self.board_y - 4))
s = F.dropout(
F.relu(self.fc_bn1(self.fc1(s))),
p=self.args.dropout,
training=self.training) # batch_size x 128
s = F.dropout(
F.relu(self.fc_bn2(self.fc2(s))),
p=self.args.dropout,
training=self.training) # batch_size x 64
pi = self.fc3(s) # batch_size x action_size
v = self.fc4(s) # batch_size x 1
return F.log_softmax(pi, dim=1), torch.tanh(v)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import base64
import inspect
import os
assert len(sys.argv) == 2, "please specify model path."
model_path = sys.argv[1]
with open(model_path, 'rb') as f:
raw_bytes = f.read()
encoded_weights = base64.encodebytes(raw_bytes)
# encode weights of model to byte string
submission_file = """
import base64
decoded = base64.b64decode({})
""".format(encoded_weights)
# insert code snippet of loading weights
with open('submission_template.py', 'r') as f:
submission_file += ''.join(f.readlines())
# generate final submission file
with open('submission.py', 'w') as f:
f.write(submission_file)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from Coach import Coach
from connect4_game import Connect4Game
from utils import *
from parl.utils import logger
args = dotdict({
# master address of xparl cluster
'master_address': 'localhost:8010',
# number of remote actors (execute tasks [self-play/pitting/evaluate_test_dataset] in parallel).
'actors_num': 25,
# total number of iteration
'numIters': 200,
# Number of complete self-play games to simulate during a new iteration.
'numEps': 500,
# Number of games to play during arena (pitting) play to determine if new neural network will be accepted.
'arenaCompare': 50,
# Number of games moves for MCTS to simulate.
'numMCTSSims': 800,
# temp=1 (Temperature, τ (tau)) if episodeStep < tempThresholdStep, and thereafter uses temp=0.
'tempThresholdStep': 15,
# During arena playoff, new neural net will be accepted if threshold or more of games are won.
'updateThreshold': 0.6,
# CPUCT parameter
'cpuct': 4,
# alpha parameter of dirichlet noise which is added to the policy (pi)
'dirichletAlpha': 1.0,
# history of examples from numItersForTrainExamplesHistory latest iterations (training data)
'numItersForTrainExamplesHistory': 20,
# folder to save model and training examples
'checkpoint': './saved_model/',
# whether to load saved model and training examples
'load_model': False,
'load_folder_file': ('./saved_model', 'checkpoint_1.pth.tar'),
})
# Plays arenaCompare games in which player1 starts arenaCompare/2 games and player2 starts arenaCompare/2 games.
assert args.arenaCompare % 2 == 0
# make sure the tasks can be split evenly among different remote actors
assert args.numEps % args.actors_num == 0
assert (args.arenaCompare // 2) % args.actors_num == 0
assert 1000 % args.actors_num == 0 # there are 1000 boards state in test_dataset
def main():
game = Connect4Game()
c = Coach(game, args)
if args.load_model:
logger.info('Loading checkpoint {}...'.format(args.load_folder_file))
c.loadModel()
logger.info("Loading 'trainExamples' from file {}...".format(
args.load_folder_file))
c.loadTrainExamples()
c.learn()
if __name__ == "__main__":
main()
# Third party code
#
# The following code are copied or modified from:
# https://github.com/suragnair/alpha-zero-general
import os
os.environ['OMP_NUM_THREADS'] = "1"
# ===== utils.py =====
class dotdict(dict):
def __getattr__(self, name):
return self[name]
# ===== MCTS.py ======
import math
import time
import numpy as np
EPS = 1e-8
class MCTS():
"""
This class handles the MCTS tree.
"""
def __init__(self, game, nn_agent, args, dirichlet_noise=False):
self.game = game
self.nn_agent = nn_agent
self.args = args
self.dirichlet_noise = dirichlet_noise
self.Qsa = {} # stores Q values for s,a (as defined in the paper)
self.Nsa = {} # stores #times edge s,a was visited
self.Ns = {} # stores #times board s was visited
self.Ps = {} # stores initial policy (returned by neural net)
self.Es = {} # stores game.getGameEnded ended for board s
self.Vs = {} # stores game.getValidMoves for board s
def getActionProb(self, canonicalBoard, temp=1, timelimit=4.9):
"""
This function performs numMCTSSims simulations of MCTS starting from
canonicalBoard.
Returns:
probs: a policy vector where the probability of the ith action is
proportional to Nsa[(s,a)]**(1./temp)
"""
dir_noise = self.dirichlet_noise
start_time = time.time()
while time.time() - start_time < timelimit:
self.search(canonicalBoard, dirichlet_noise=dir_noise)
s = self.game.stringRepresentation(canonicalBoard)
counts = [
self.Nsa[(s, a)] if (s, a) in self.Nsa else 0
for a in range(self.game.getActionSize())
]
if temp == 0:
bestAs = np.array(np.argwhere(counts == np.max(counts))).flatten()
bestA = np.random.choice(bestAs)
probs = [0] * len(counts)
probs[bestA] = 1
return probs
counts = [x**(1. / temp) for x in counts]
counts_sum = float(sum(counts))
probs = [x / counts_sum for x in counts]
return probs
def search(self, canonicalBoard, dirichlet_noise=False):
"""
This function performs one iteration of MCTS. It is recursively called
till a leaf node is found. The action chosen at each node is one that
has the maximum upper confidence bound as in the paper.
Once a leaf node is found, the neural network is called to return an
initial policy P and a value v for the state. This value is propagated
up the search path. In case the leaf node is a terminal state, the
outcome is propagated up the search path. The values of Ns, Nsa, Qsa are
updated.
NOTE: the return values are the negative of the value of the current
state. This is done since v is in [-1,1] and if v is the value of a
state for the current player, then its value is -v for the other player.
Returns:
v: the negative of the value of the current canonicalBoard
"""
s = self.game.stringRepresentation(canonicalBoard)
if s not in self.Es:
self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)
if self.Es[s] != 0:
# terminal node
return -self.Es[s]
if s not in self.Ps:
# leaf node
self.Ps[s], v = self.nn_agent.predict(canonicalBoard)
valids = self.game.getValidMoves(canonicalBoard, 1)
self.Ps[s] = self.Ps[s] * valids # masking invalid moves
if dirichlet_noise:
self.applyDirNoise(s, valids)
sum_Ps_s = np.sum(self.Ps[s])
if sum_Ps_s > 0:
self.Ps[s] /= sum_Ps_s # renormalize
else:
# if all valid moves were masked make all valid moves equally probable
# NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
# If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.
print("All valid moves were masked, doing a workaround.")
self.Ps[s] = self.Ps[s] + valids
self.Ps[s] /= np.sum(self.Ps[s])
self.Vs[s] = valids
self.Ns[s] = 0
return -v
valids = self.Vs[s]
if dirichlet_noise:
self.applyDirNoise(s, valids)
sum_Ps_s = np.sum(self.Ps[s])
self.Ps[s] /= sum_Ps_s # renormalize
cur_best = -float('inf')
best_act = -1
# pick the action with the highest upper confidence bound
for a in range(self.game.getActionSize()):
if valids[a]:
if (s, a) in self.Qsa:
u = self.Qsa[
(s, a)] + self.args.cpuct * self.Ps[s][a] * math.sqrt(
self.Ns[s]) / (1 + self.Nsa[(s, a)])
else:
u = self.args.cpuct * self.Ps[s][a] * math.sqrt(
self.Ns[s] + EPS) # Q = 0 ?
if u > cur_best:
cur_best = u
best_act = a
a = best_act
next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)
next_s = self.game.getCanonicalForm(next_s, next_player)
v = self.search(next_s)
if (s, a) in self.Qsa:
self.Qsa[(s, a)] = (self.Nsa[(s, a)] * self.Qsa[
(s, a)] + v) / (self.Nsa[(s, a)] + 1)
self.Nsa[(s, a)] += 1
else:
self.Qsa[(s, a)] = v
self.Nsa[(s, a)] = 1
self.Ns[s] += 1
return -v
def applyDirNoise(self, s, valids):
dir_values = np.random.dirichlet(
[self.args.dirichletAlpha] * np.count_nonzero(valids))
dir_idx = 0
for idx in range(len(self.Ps[s])):
if self.Ps[s][idx]:
self.Ps[s][idx] = (0.75 * self.Ps[s][idx]) + (
0.25 * dir_values[dir_idx])
dir_idx += 1
# ===== connect4_game.py ======
import numpy as np
from collections import namedtuple
DEFAULT_HEIGHT = 6
DEFAULT_WIDTH = 7
DEFAULT_WIN_LENGTH = 4
WinState = namedtuple('WinState', 'is_ended winner')
class Board():
"""
Connect4 Board.
"""
def __init__(self,
height=None,
width=None,
win_length=None,
np_pieces=None):
"Set up initial board configuration."
self.height = height or DEFAULT_HEIGHT
self.width = width or DEFAULT_WIDTH
self.win_length = win_length or DEFAULT_WIN_LENGTH
if np_pieces is None:
self.np_pieces = np.zeros([self.height, self.width], dtype=np.int)
else:
self.np_pieces = np_pieces
assert self.np_pieces.shape == (self.height, self.width)
def add_stone(self, column, player):
"Create copy of board containing new stone."
available_idx, = np.where(self.np_pieces[:, column] == 0)
if len(available_idx) == 0:
raise ValueError(
"Can't play column %s on board %s" % (column, self))
self.np_pieces[available_idx[-1]][column] = player
def get_valid_moves(self):
"Any zero value in top row in a valid move"
return self.np_pieces[0] == 0
def get_win_state(self):
for player in [-1, 1]:
player_pieces = self.np_pieces == -player
# Check rows & columns for win
if (self._is_straight_winner(player_pieces)
or self._is_straight_winner(player_pieces.transpose())
or self._is_diagonal_winner(player_pieces)):
return WinState(True, -player)
# draw has very little value.
if not self.get_valid_moves().any():
return WinState(True, None)
# Game is not ended yet.
return WinState(False, None)
def with_np_pieces(self, np_pieces):
"""Create copy of board with specified pieces."""
if np_pieces is None:
np_pieces = self.np_pieces
return Board(self.height, self.width, self.win_length, np_pieces)
def _is_diagonal_winner(self, player_pieces):
"""Checks if player_pieces contains a diagonal win."""
win_length = self.win_length
for i in range(len(player_pieces) - win_length + 1):
for j in range(len(player_pieces[0]) - win_length + 1):
if all(player_pieces[i + x][j + x] for x in range(win_length)):
return True
for j in range(win_length - 1, len(player_pieces[0])):
if all(player_pieces[i + x][j - x] for x in range(win_length)):
return True
return False
def _is_straight_winner(self, player_pieces):
"""Checks if player_pieces contains a vertical or horizontal win."""
run_lengths = [
player_pieces[:, i:i + self.win_length].sum(axis=1)
for i in range(len(player_pieces) - self.win_length + 2)
]
return max([x.max() for x in run_lengths]) >= self.win_length
def __str__(self):
return str(self.np_pieces)
class Connect4Game(object):
"""
Connect4 Game class implementing the alpha-zero-general Game interface.
Use 1 for player1 and -1 for player2.
"""
def __init__(self,
height=None,
width=None,
win_length=None,
np_pieces=None):
self._base_board = Board(height, width, win_length, np_pieces)
def getInitBoard(self):
"""
Returns:
startBoard: a representation of the board (ideally this is the form
that will be the input to your neural network)
"""
return self._base_board.np_pieces
def getBoardSize(self):
"""
Returns:
(x,y): a tuple of board dimensions
"""
return (self._base_board.height, self._base_board.width)
def getActionSize(self):
"""
Returns:
actionSize: number of all possible actions
"""
return self._base_board.width
def getNextState(self, board, player, action):
"""Returns a copy of the board with updated move, original board is unmodified.
Input:
board: current board
player: current player (1 or -1)
action: action taken by current player
Returns:
nextBoard: board after applying action
nextPlayer: player who plays in the next turn (should be -player)
"""
b = self._base_board.with_np_pieces(np_pieces=np.copy(board))
b.add_stone(action, player)
return b.np_pieces, -player
def getValidMoves(self, board, player):
"""Any zero value in top row in a valid move.
Input:
board: current board
player: current player
Returns:
validMoves: a binary vector of length self.getActionSize(), 1 for
moves that are valid from the current board and player,
0 for invalid moves
"""
return self._base_board.with_np_pieces(
np_pieces=board).get_valid_moves()
def getGameEnded(self, board, player):
"""
Input:
board: current board
player: current player (1 or -1)
Returns:
r: 0 if game has not ended. 1 if player won, -1 if player lost,
small non-zero value for draw.
"""
b = self._base_board.with_np_pieces(np_pieces=board)
winstate = b.get_win_state()
if winstate.is_ended:
if winstate.winner is None:
# draw has very little value.
return 1e-4
elif winstate.winner == player:
return +1
elif winstate.winner == -player:
return -1
else:
raise ValueError('Unexpected winstate found: ', winstate)
else:
# 0 used to represent unfinished game.
return 0
def getCanonicalForm(self, board, player):
"""
Input:
board: current board
player: current player (1 or -1)
Returns:
canonicalBoard: returns canonical form of board. The canonical form
should be independent of player. For e.g. in chess,
the canonical form can be chosen to be from the pov
of white. When the player is white, we can return
board as is. When the player is black, we can invert
the colors and return the board.
"""
return board * player
def getSymmetries(self, board, pi):
"""Board is left/right board symmetric
Input:
board: current board
pi: policy vector of size self.getActionSize()
Returns:
symmForms: a list of [(board,pi)] where each tuple is a symmetrical
form of the board and the corresponding pi vector. This
is used when training the neural network from examples.
"""
return [(board, pi),
(np.array(board[:, ::-1], copy=True),
np.array(pi[::-1], copy=True))]
def stringRepresentation(self, board):
"""
Input:
board: current board
Returns:
boardString: a quick conversion of board to a string format.
Required by MCTS for hashing.
"""
return board.tostring()
@staticmethod
def display(board):
print(" -----------------------")
print(' '.join(map(str, range(len(board[0])))))
print(board)
print(" -----------------------")
# ===== connect4_model ======
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
#class Connect4Model(parl.Model): # Kaggle doesn't support parl package
class Connect4Model(nn.Module):
def __init__(self, game, args):
# game params
self.board_x, self.board_y = game.getBoardSize()
self.action_size = game.getActionSize()
self.args = args
super(Connect4Model, self).__init__()
self.conv1 = nn.Conv2d(1, args.num_channels, 3, stride=1, padding=1)
self.conv2 = nn.Conv2d(
args.num_channels, args.num_channels, 3, stride=1, padding=1)
self.conv3 = nn.Conv2d(
args.num_channels, args.num_channels, 3, stride=1)
self.conv4 = nn.Conv2d(
args.num_channels, args.num_channels, 3, stride=1)
self.bn1 = nn.BatchNorm2d(args.num_channels)
self.bn2 = nn.BatchNorm2d(args.num_channels)
self.bn3 = nn.BatchNorm2d(args.num_channels)
self.bn4 = nn.BatchNorm2d(args.num_channels)
self.fc1 = nn.Linear(
args.num_channels * (self.board_x - 4) * (self.board_y - 4), 128)
self.fc_bn1 = nn.BatchNorm1d(128)
self.fc2 = nn.Linear(128, 64)
self.fc_bn2 = nn.BatchNorm1d(64)
self.fc3 = nn.Linear(64, self.action_size)
self.fc4 = nn.Linear(64, 1)
def forward(self, s):
# s: batch_size x board_x x board_y
s = s.view(-1, 1, self.board_x,
self.board_y) # batch_size x 1 x board_x x board_y
s = F.relu(self.bn1(
self.conv1(s))) # batch_size x num_channels x board_x x board_y
s = F.relu(self.bn2(
self.conv2(s))) # batch_size x num_channels x board_x x board_y
s = F.relu(self.bn3(self.conv3(
s))) # batch_size x num_channels x (board_x-2) x (board_y-2)
s = F.relu(self.bn4(self.conv4(
s))) # batch_size x num_channels x (board_x-4) x (board_y-4)
s = s.view(
-1,
self.args.num_channels * (self.board_x - 4) * (self.board_y - 4))
s = F.dropout(
F.relu(self.fc_bn1(self.fc1(s))),
p=self.args.dropout,
training=self.training) # batch_size x 128
s = F.dropout(
F.relu(self.fc_bn2(self.fc2(s))),
p=self.args.dropout,
training=self.training) # batch_size x 64
pi = self.fc3(s) # batch_size x action_size
v = self.fc4(s) # batch_size x 1
return F.log_softmax(pi, dim=1), torch.tanh(v)
# ===== simple agent ======
args = dotdict({
'dropout': 0.3,
'num_channels': 64,
})
class SimpleAgent():
def __init__(self, game, cuda=True):
self.cuda = cuda and torch.cuda.is_available()
self.model = Connect4Model(game, args)
if self.cuda:
self.model.cuda()
self.board_x, self.board_y = game.getBoardSize()
self.action_size = game.getActionSize()
def predict(self, board):
"""
Args:
board (np.array): input board
Return:
pi (np.array): probability of actions
v (np.array): estimated value of input
"""
# preparing input
board = torch.FloatTensor(board.astype(np.float64))
if self.cuda:
board = board.contiguous().cuda()
board = board.view(1, self.board_x, self.board_y)
self.model.eval() # eval mode
with torch.no_grad():
log_pi, v = self.model(board)
pi = torch.exp(log_pi)
return pi.data.cpu().numpy()[0], v.data.cpu().numpy()[0]
def load_checkpoint(self, buffer):
map_location = None if self.cuda else 'cpu'
checkpoint = torch.load(buffer, map_location=map_location)
self.model.load_state_dict(checkpoint)
# ===== predict function ======
import base64
import io
game = Connect4Game()
# AlphaZero players
agent = SimpleAgent(game)
buffer = io.BytesIO(decoded)
agent.load_checkpoint(buffer)
mcts_args = dotdict({'numMCTSSims': 800, 'cpuct': 1.0})
mcts = MCTS(game, agent, mcts_args)
def alphazero_agent(obs, config):
board = np.reshape(obs.board.copy(), game.getBoardSize()).astype(int)
board[np.where(board == 2)] = -1
player = 1
if obs.mark == 2:
player = -1
x = game.getCanonicalForm(board, player)
action = np.argmax(
mcts.getActionProb(x, temp=0, timelimit=config.timeout - 0.1))
return int(action)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class dotdict(dict):
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
def win_loss_draw(score):
if score > 0:
return 'win'
if score < 0:
return 'loss'
return 'draw'
"""
split one list to multiple lists
"""
split_group = lambda the_list, group_size: zip(*(iter(the_list), ) * group_size)
import numpy as np
import json
from connect4_game import Connect4Game
def get_test_dataset():
game = Connect4Game()
test_dataset = []
with open("refmoves1k_kaggle") as f:
for line in f:
data = json.loads(line)
board = data["board"]
board = np.reshape(board, game.getBoardSize()).astype(int)
board[np.where(board == 2)] = -1
# find out how many moves are played to set the correct mark.
ply = len([x for x in data["board"] if x > 0])
if ply & 1:
player = -1
else:
player = 1
test_dataset.append({
'board': board,
'player': player,
'move_score': data['move score'],
})
return test_dataset
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册