未验证 提交 37203246 编写于 作者: M mapingshuo 提交者: GitHub

Merge pull request #77 from barrierye/update-model

update code to make test acc=0.116
...@@ -12,16 +12,23 @@ FL-mobile是一个集移动端算法模拟调研、训练和部署为一体的 ...@@ -12,16 +12,23 @@ FL-mobile是一个集移动端算法模拟调研、训练和部署为一体的
## 准备工作 ## 准备工作
- 安装mpirun - 安装mpirun
- python安装grpc - python安装grpc
```shell ```shell
pip install grpcio==1.28.1 pip install grpcio==1.28.1
``` ```
- 安装Paddle - 安装Paddle
```shell
pip install paddlepaddle==1.8.0
```
## 快速开始 ## 快速开始
我们以Leaf数据集中的[reddit数据](https://github.com/TalwalkarLab/leaf/tree/master/data/reddit)为例,用LSTM建模,在simulator 我们以Leaf数据集中的[reddit数据](https://github.com/TalwalkarLab/leaf/tree/master/data/reddit)为例,参考[这篇论文](https://arxiv.org/pdf/1812.01097.pdf),用LSTM建模,在simulator中给出一个单机训练的例子。通过这个例子,您能了解simulator的基础用法。
中给出一个单机训练的例子,通过这个例子,您能了解simulator的基础用法。
### 准备数据 ### 准备数据
...@@ -52,6 +59,8 @@ cd .. ...@@ -52,6 +59,8 @@ cd ..
### 开始训练 ### 开始训练
在训练中,我们每轮用均匀采样(Uniform Sample)方式选取`10`个Client进行训练,每个Client在本地用该Client对应的全部数据(未经shuffle)训练`1`个epoch,总共训练`100`轮。在本实验中使用的Client学习率为`1.0`,FedAvg学习率为`1.85 `
```shell ```shell
export PYTHONPATH=$PWD:$PYTHONPATH export PYTHONPATH=$PWD:$PYTHONPATH
mpirun -np 2 python application.py lm_data mpirun -np 2 python application.py lm_data
...@@ -59,11 +68,13 @@ mpirun -np 2 python application.py lm_data ...@@ -59,11 +68,13 @@ mpirun -np 2 python application.py lm_data
### 训练结果 ### 训练结果
在测试集上,测试Top1为 `11.6% `
```shell ```shell
framework.py : INFO infer results: 0.085723 framework.py : INFO infer results: 0.116334
``` ```
即:在测试集上的,测试Top1为 8.6% 相同参数的非联邦训练测试Top1为`11.1%`
## 添加自己的数据集和Trainer ## 添加自己的数据集和Trainer
...@@ -89,7 +100,7 @@ framework.py : INFO infer results: 0.085723 ...@@ -89,7 +100,7 @@ framework.py : INFO infer results: 0.085723
- Step1 模型初始化 - Step1 模型初始化
1. 全局参数初始化:由编号为0的simulator来做模型初始化工作,初始化之后,它会通过UpdateGlobalParams()接口将参数传递给Scheduler; 1. 全局参数初始化:由编号为0的simulator来做模型初始化工作,初始化之后,它会通过UpdateGlobalParams()接口将参数传递给Scheduler;
2. 个性化参数初始化 2. 个性化参数初始化
- Step2 模型分发 - Step2 模型分发
......
...@@ -36,17 +36,22 @@ simulator = SimulationFramework(role_maker) ...@@ -36,17 +36,22 @@ simulator = SimulationFramework(role_maker)
language_model_trainer = LanguageModelTrainer() language_model_trainer = LanguageModelTrainer()
language_model_trainer.set_trainer_configs({ language_model_trainer.set_trainer_configs({
"epoch": 3, "epoch": 1,
"max_steps_in_epoch": -1, "max_steps_in_epoch": -1,
"lr": 0.1, "lr": 1.0,
"batch_size": 5, "batch_size": 5,
"max_grad_norm": 5,
"n_hidden": 256,
"num_layers": 2,
"init_scale": 0.1,
"dropout_prob": 0.0,
}) })
sampler = UniformSampler() sampler = UniformSampler()
sampler.set_sample_num(30) sampler.set_sample_num(10)
sampler.set_min_ins_num(1) sampler.set_min_ins_num(1)
test_sampler = Test1percentSampler() test_sampler = Test1percentSampler()
fed_avg_optimizer = FedAvgOptimizer(learning_rate=2.0) fed_avg_optimizer = FedAvgOptimizer(learning_rate=1.85)
simulator.set_trainer(language_model_trainer) simulator.set_trainer(language_model_trainer)
simulator.set_sampler(sampler) simulator.set_sampler(sampler)
...@@ -68,5 +73,8 @@ elif simulator.is_simulator(): ...@@ -68,5 +73,8 @@ elif simulator.is_simulator():
print("dates: {}".format(dates)) print("dates: {}".format(dates))
time.sleep(10) time.sleep(10)
simulator.run_simulation( simulator.run_simulation(base_path,
base_path, dates, sim_num_everyday=100, do_test=True, test_skip_day=1) dates,
sim_num_everyday=100,
do_test=True,
test_skip_day=1)
...@@ -108,7 +108,7 @@ def train_reader(lines): ...@@ -108,7 +108,7 @@ def train_reader(lines):
input_data, input_length = process_x(data_x, VOCAB) input_data, input_length = process_x(data_x, VOCAB)
target_data = process_y(data_y, VOCAB) target_data = process_y(data_y, VOCAB)
yield [input_data] + [target_data] yield [input_data] + [target_data] + [input_length] + [data_mask]
return local_iter return local_iter
......
...@@ -34,10 +34,10 @@ def train_one_user(arg_dict, trainer_config): ...@@ -34,10 +34,10 @@ def train_one_user(arg_dict, trainer_config):
max_training_steps = trainer_config["max_training_steps"] max_training_steps = trainer_config["max_training_steps"]
batch_size = trainer_config["batch_size"] batch_size = trainer_config["batch_size"]
# logging.info("training one user...") # logging.info("training one user...")
main_program = fluid.Program.parse_from_string(trainer_config[ main_program = fluid.Program.parse_from_string(
"main_program_desc"]) trainer_config["main_program_desc"])
startup_program = fluid.Program.parse_from_string(trainer_config[ startup_program = fluid.Program.parse_from_string(
"startup_program_desc"]) trainer_config["startup_program_desc"])
place = fluid.CPUPlace() place = fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
scope = fluid.global_scope() scope = fluid.global_scope()
...@@ -46,10 +46,9 @@ def train_one_user(arg_dict, trainer_config): ...@@ -46,10 +46,9 @@ def train_one_user(arg_dict, trainer_config):
exit() exit()
exe.run(startup_program) exe.run(startup_program)
feeder = fluid.DataFeeder( feeder = fluid.DataFeeder(feed_list=trainer_config["input_names"],
feed_list=trainer_config["input_names"], place=place,
place=place, program=main_program)
program=main_program)
data_server_endpoints = arg_dict["data_endpoints"] data_server_endpoints = arg_dict["data_endpoints"]
# create data clients # create data clients
data_client = DataClient() data_client = DataClient()
...@@ -76,36 +75,43 @@ def train_one_user(arg_dict, trainer_config): ...@@ -76,36 +75,43 @@ def train_one_user(arg_dict, trainer_config):
epoch = trainer_config["epoch"] epoch = trainer_config["epoch"]
max_steps_in_epoch = trainer_config.get("max_steps_in_epoch", -1) max_steps_in_epoch = trainer_config.get("max_steps_in_epoch", -1)
metrics = trainer_config["metrics"] metrics = trainer_config["metrics"]
metric_keys = metrics.keys() fetch_list = []
fetch_list = [main_program.global_block().var(trainer_config["loss_name"])] for var in trainer_config["target_names"]:
for key in metric_keys: fetch_list.append(var)
fetch_list.append(main_program.global_block().var(metrics[key]))
seq_len = 10
for ei in range(epoch): for ei in range(epoch):
fetch_res_list = []
trained_sample_num = 0 trained_sample_num = 0
step = 0 step = 0
fetch_res_list = [] num_layers = trainer_config["num_layers"]
total_loss = 0.0 hidden_size = trainer_config["n_hidden"]
total_correct = 0 tot_loss, tot_correct = 0, 0
tot_samples = 0
init_hidden, init_cell = generate_init_data(batch_size, num_layers,
hidden_size)
for data in train_reader(): for data in train_reader():
feed_data, input_lengths = prepare_input(batch_size, data,
init_hidden, init_cell)
fetch_res = exe.run(main_program, fetch_res = exe.run(main_program,
feed=feeder.feed(data), feed=feeder.feed(feed_data),
fetch_list=fetch_list) fetch_list=fetch_list)
loss, last_hidden, last_cell, correct = fetch_res
init_hidden = np.array(last_hidden)
init_cell = np.array(last_cell)
tot_loss += np.array(loss)
tot_correct += np.array(correct)
tot_samples += np.sum(input_lengths)
step += 1 step += 1
trained_sample_num += len(data) trained_sample_num += len(data)
fetch_res_list.append([x[0] for x in fetch_res]) fetch_res_list.append([np.array(loss), np.array(correct)])
if max_steps_in_epoch != -1 and step >= max_steps_in_epoch: if max_steps_in_epoch != -1 and step >= max_steps_in_epoch:
break break
if show_metric and trained_sample_num > 0: if show_metric and trained_sample_num > 0:
loss = sum([x[0] for x in fetch_res_list]) / trained_sample_num loss = tot_loss / step
print("loss: {}, ppl: {}".format(loss, np.exp(loss))) acc = float(tot_correct) / tot_samples
for i, key in enumerate(metric_keys): print("loss: {}, acc: {}".format(loss, acc))
if key == "correct":
value = float(sum([x[i + 1] for x in fetch_res_list
])) / trained_sample_num
print("correct: {}".format(value / seq_len))
local_updated_param_dict = {} local_updated_param_dict = {}
# update user param # update user param
...@@ -142,10 +148,10 @@ def infer_one_user(arg_dict, trainer_config): ...@@ -142,10 +148,10 @@ def infer_one_user(arg_dict, trainer_config):
# run startup program, set params # run startup program, set params
uid = arg_dict["uid"] uid = arg_dict["uid"]
batch_size = trainer_config["batch_size"] batch_size = trainer_config["batch_size"]
startup_program = fluid.Program.parse_from_string(trainer_config[ startup_program = fluid.Program.parse_from_string(
"startup_program_desc"]) trainer_config["startup_program_desc"])
infer_program = fluid.Program.parse_from_string(trainer_config[ infer_program = fluid.Program.parse_from_string(
"infer_program_desc"]) trainer_config["infer_program_desc"])
place = fluid.CPUPlace() place = fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
scope = fluid.global_scope() scope = fluid.global_scope()
...@@ -169,7 +175,6 @@ def infer_one_user(arg_dict, trainer_config): ...@@ -169,7 +175,6 @@ def infer_one_user(arg_dict, trainer_config):
arg_dict["global_params"], scope) arg_dict["global_params"], scope)
# reader # reader
date = arg_dict["date"] date = arg_dict["date"]
global_param_dict = arg_dict["global_params"] global_param_dict = arg_dict["global_params"]
user_data = data_client.get_data_by_uid(uid, date) user_data = data_client.get_data_by_uid(uid, date)
...@@ -179,36 +184,60 @@ def infer_one_user(arg_dict, trainer_config): ...@@ -179,36 +184,60 @@ def infer_one_user(arg_dict, trainer_config):
# run infer program # run infer program
os.mkdir(arg_dict["infer_result_dir"]) os.mkdir(arg_dict["infer_result_dir"])
#pred_file = open(arg_dict["infer_result_dir"] + '/' + "pred_file", "w") #pred_file = open(arg_dict["infer_result_dir"] + '/' + "pred_file", "w")
feeder = fluid.DataFeeder( feeder = fluid.DataFeeder(feed_list=trainer_config["input_names"],
feed_list=trainer_config["input_names"], place=place,
place=place, program=infer_program)
program=infer_program)
fetch_list = trainer_config["target_names"] fetch_list = trainer_config["target_names"]
#logging.info("fetch_list: {}".format(fetch_list)) #logging.info("fetch_list: {}".format(fetch_list))
fetch_res = [] fetch_res = []
sample_count = 0 sample_count = 0
total_loss = 0.0 num_layers = trainer_config["num_layers"]
total_correct = 0 hidden_size = trainer_config["n_hidden"]
iters = 0 tot_correct, tot_loss = 0, 0
steps = 0 tot_samples, tot_batches = 0, 0
seq_len = 10 init_hidden, init_cell = generate_init_data(batch_size, num_layers,
hidden_size)
for data in infer_reader(): for data in infer_reader():
# feed_data = [x["features"] + [x["label"]] for x in data] feed_data, input_lengths = prepare_input(batch_size, data, init_hidden,
# prediction, acc_val= exe.run(infer_program, init_cell)
pred, correct_count, loss = exe.run(infer_program, fetch_res = exe.run(infer_program,
feed=feeder.feed(data), feed=feeder.feed(feed_data),
fetch_list=fetch_list) fetch_list=fetch_list)
total_loss += loss loss, last_hidden, last_cell, correct = fetch_res
total_correct += correct_count
steps += 1 cost_eval = np.array(loss)
sample_count += len(data) init_hidden = np.array(last_hidden)
init_cell = np.array(last_cell)
correct = float(total_correct) / (seq_len * sample_count) correct_val = np.array(correct)
# logging.info("correct: {}".format(correct)) tot_loss += cost_eval
tot_correct += correct_val
tot_samples += np.sum(input_lengths)
tot_batches += 1
loss = tot_loss / tot_batches
acc = float(tot_correct) / tot_samples
logging.info("infer acc: {}".format(acc))
with open(arg_dict["infer_result_dir"] + "/res", "w") as f: with open(arg_dict["infer_result_dir"] + "/res", "w") as f:
f.write("%d\t%f\n" % (1, correct)) f.write("%d\t%f\n" % (1, acc))
def prepare_input(batch_size, data, init_hidden, init_cell):
init_hidden = np.split(init_hidden, batch_size)
init_cell = np.split(init_cell, batch_size)
data = [[features] + [labels] + [seq_len_ph] + [seq_mask_ph] + [init_hidden[i]] + [init_cell[i] ] \
for i, (features, labels, seq_len_ph, seq_mask_ph) in enumerate(data)]
input_lengths = [x[2] for x in data]
return data, input_lengths
def generate_init_data(batch_size, num_layers, hidden_size):
init_hidden = np.zeros((batch_size, num_layers, hidden_size),
dtype='float32')
init_cell = np.zeros((batch_size, num_layers, hidden_size),
dtype='float32')
return init_hidden, init_cell
def save_and_upload(arg_dict, trainer_config, dfs_upload_path): def save_and_upload(arg_dict, trainer_config, dfs_upload_path):
...@@ -219,7 +248,6 @@ def save_and_upload(arg_dict, trainer_config, dfs_upload_path): ...@@ -219,7 +248,6 @@ def save_and_upload(arg_dict, trainer_config, dfs_upload_path):
def evaluate_a_group(group): def evaluate_a_group(group):
group_list = [] group_list = []
for label, pred, _ in group: for label, pred, _ in group:
# print("%s\t%s\n" % (label, pred))
group_list.append((int(label), float(pred))) group_list.append((int(label), float(pred)))
random.shuffle(group_list) random.shuffle(group_list)
labels = [x[0] for x in group_list] labels = [x[0] for x in group_list]
...@@ -236,7 +264,6 @@ class LanguageModelTrainer(TrainerBase): ...@@ -236,7 +264,6 @@ class LanguageModelTrainer(TrainerBase):
""" """
LanguageModelTrainer only support training with PaddlePaddle LanguageModelTrainer only support training with PaddlePaddle
""" """
def __init__(self): def __init__(self):
super(LanguageModelTrainer, self).__init__() super(LanguageModelTrainer, self).__init__()
self.main_program_ = fluid.Program() self.main_program_ = fluid.Program()
...@@ -270,10 +297,13 @@ class LanguageModelTrainer(TrainerBase): ...@@ -270,10 +297,13 @@ class LanguageModelTrainer(TrainerBase):
""" """
with fluid.program_guard(self.main_program_, self.startup_program_): with fluid.program_guard(self.main_program_, self.startup_program_):
self.input_model_ = LanguageModel() self.input_model_ = LanguageModel()
model_configs = {} model_configs = self.trainer_config
self.input_model_.build_model(model_configs) self.input_model_.build_model(model_configs)
optimizer = fluid.optimizer.SGD( optimizer = fluid.optimizer.SGD(
learning_rate=self.trainer_config["lr"]) learning_rate=self.trainer_config["lr"],
grad_clip=fluid.clip.GradientClipByGlobalNorm(
clip_norm=self.trainer_config["max_grad_norm"]))
optimizer.minimize(self.input_model_.get_model_loss()) optimizer.minimize(self.input_model_.get_model_loss())
self.main_program_desc_ = self.main_program_.desc.serialize_to_string() self.main_program_desc_ = self.main_program_.desc.serialize_to_string()
...@@ -283,13 +313,16 @@ class LanguageModelTrainer(TrainerBase): ...@@ -283,13 +313,16 @@ class LanguageModelTrainer(TrainerBase):
self.input_model_.get_model_loss_name()) self.input_model_.get_model_loss_name())
self.update_trainer_configs( self.update_trainer_configs(
"input_names", "input_names",
self.input_model_.get_model_input_names(), ) self.input_model_.get_model_input_names(),
)
self.update_trainer_configs( self.update_trainer_configs(
"target_names", "target_names",
self.input_model_.get_target_names(), ) self.input_model_.get_target_names(),
)
self.update_trainer_configs( self.update_trainer_configs(
"metrics", "metrics",
self.input_model_.get_model_metrics(), ) self.input_model_.get_model_metrics(),
)
self.update_trainer_configs("show_metric", True) self.update_trainer_configs("show_metric", True)
self.update_trainer_configs("max_training_steps", "inf") self.update_trainer_configs("max_training_steps", "inf")
self.update_trainer_configs("shuffle", False) self.update_trainer_configs("shuffle", False)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册