提交 c6fe7d7e 编写于 作者: G guru4elephant

update demo with trainer.run

上级 85d1096b
*.pyc
*/*.pyc
*/*/*.pyc
*/*/*/*.pyc
\ No newline at end of file
......@@ -30,7 +30,8 @@ class FLJobBase(object):
def _load_str_list(self, input_file):
res = []
with open(input_file, "r") as fin:
res.append(fin.readline().strip())
for line in fin:
res.append(line.strip())
return res
def _save_strategy(self, strategy, output_file):
......
......@@ -29,6 +29,8 @@ class JobGenerator(object):
# inner optimizer
self._optimizer = \
fluid.optimizer.SGD(learning_rate=0.001)
self._feed_names = []
self._target_names = []
def set_optimizer(self, optimizer):
......@@ -54,6 +56,18 @@ class JobGenerator(object):
self._startup_prog = startup
def set_infer_feed_and_target_names(self, feed_names, target_names):
if not isinstance(feed_names, list) or not isinstance(target_names, list):
raise ValueError("input should be list in set_infer_feed_and_target_names")
'''
print(feed_names)
print(target_names)
for item in feed_names:
if type(item) != str:
raise ValueError("item in feed_names should be string")
for item in target_names:
if type(item) != str:
raise ValueError("item in target_names should be string")
'''
self._feed_names = feed_names
self._target_names = target_names
......
......@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import logging
class FLTrainerFactory(object):
def __init__(self):
......@@ -32,6 +33,7 @@ class FLTrainerFactory(object):
class FLTrainer(object):
def __init__(self):
self._logger = logging.getLogger("FLTrainer")
pass
def set_trainer_job(self, job):
......@@ -47,15 +49,12 @@ class FLTrainer(object):
self.exe = fluid.Executor(fluid.CPUPlace())
self.exe.run(self._startup_program)
def train_inner_loop(self, reader):
now_step = 0
for data in reader():
now_step += 1
if now_step > self._step:
break
self.exe.run(self._main_program,
feed=data,
fetch_list=[])
def run(self, feed, fetch):
self._logger.debug("begin to run")
self.exe.run(self._main_program,
feed=feed,
fetch_list=fetch)
self._logger.debug("end to run current batch")
def save_inference_program(self, output_folder):
target_vars = []
......@@ -84,7 +83,7 @@ class FedAvgTrainer(FLTrainer):
def start(self):
self.exe = fluid.Executor(fluid.CPUPlace())
self.exe.run(self._startup_program)
self.step = 0
self.cur_step = 0
def set_trainer_job(self, job):
super(FedAvgTrainer, self).set_trainer_job(job)
......@@ -95,27 +94,20 @@ class FedAvgTrainer(FLTrainer):
self.cur_step = 0
def run(self, feed, fetch):
self._logger.debug("begin to run FedAvgTrainer, cur_step=%d, inner_step=%d" %
(self.cur_step, self._step))
if self.cur_step % self._step == 0:
self._logger.debug("begin to run recv program")
self.exe.run(self._recv_program)
self._logger.debug("begin to run current step")
self.exe.run(self._main_program,
feed=feed,
fetch_list=fetch)
if self.cur_step % self._step == 0:
self._logger.debug("begin to run send program")
self.exe.run(self._send_program)
self.cur_step += 1
def train_inner_loop(self, reader):
self.exe.run(self._recv_program)
now_step = 0
for data in reader():
now_step += 1
if now_step > self._step:
break
self.exe.run(self._main_program,
feed=data,
fetch_list=[])
self.exe.run(self._send_program)
def stop(self):
return False
......@@ -39,7 +39,7 @@ job_generator.set_infer_feed_and_target_names(
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 1
build_strategy.inner_step = 10
strategy = build_strategy.create_fl_strategy()
# endpoints will be collected through the cluster
......
......@@ -2,6 +2,9 @@ from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
from paddle_fl.core.master.fl_job import FLRunTimeJob
import numpy as np
import sys
import logging
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)
def reader():
for i in range(1000):
......@@ -23,5 +26,7 @@ step_i = 0
while not trainer.stop():
step_i += 1
print("batch %d start train" % (step_i))
trainer.train_inner_loop(reader)
trainer.save_inference_program(output_folder)
for data in reader():
trainer.run(feed=data, fetch=[])
if step_i % 100 == 0:
trainer.save_inference_program(output_folder)
......@@ -22,4 +22,7 @@ step_i = 0
while not trainer.stop():
step_i += 1
print("batch %d start train" % (step_i))
trainer.train_inner_loop(reader)
for data in reader():
trainer.run(feed=data, fetch=[])
if step_i % 100 == 0:
trainer.save_inference_program(output_folder)
......@@ -17,13 +17,13 @@ class Model(object):
gru_lr_x = 1.0
fc_lr_x = 1.0
# Input data
src_wordseq = fluid.layers.data(
self.src_wordseq = fluid.layers.data(
name="src_wordseq", shape=[1], dtype="int64", lod_level=1)
dst_wordseq = fluid.layers.data(
self.dst_wordseq = fluid.layers.data(
name="dst_wordseq", shape=[1], dtype="int64", lod_level=1)
emb = fluid.layers.embedding(
input=src_wordseq,
input=self.src_wordseq,
size=[vocab_size, hid_size],
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
......@@ -45,15 +45,17 @@ class Model(object):
low=init_low_bound, high=init_high_bound),
learning_rate=gru_lr_x))
fc = fluid.layers.fc(input=gru_h0,
self.fc = fluid.layers.fc(input=gru_h0,
size=vocab_size,
act='softmax',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=init_low_bound, high=init_high_bound),
learning_rate=fc_lr_x))
cost = fluid.layers.cross_entropy(input=fc, label=dst_wordseq)
acc = fluid.layers.accuracy(input=fc, label=dst_wordseq, k=20)
cost = fluid.layers.cross_entropy(
input=self.fc, label=self.dst_wordseq)
acc = fluid.layers.accuracy(
input=self.fc, label=self.dst_wordseq, k=20)
self.loss = fluid.layers.mean(x=cost)
self.startup_program = fluid.default_startup_program()
......@@ -67,6 +69,8 @@ optimizer = fluid.optimizer.SGD(learning_rate=2.0)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names(
[model.src_wordseq.name, model.dst_wordseq.name], [model.fc.name])
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
......
......@@ -5,7 +5,8 @@ import paddle.fluid as fluid
import numpy as np
import sys
import os
import logging
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)
trainer_id = int(sys.argv[1]) # trainer id for each guest
place = fluid.CPUPlace()
......@@ -23,4 +24,7 @@ step_i = 0
while not trainer.stop():
step_i += 1
print("batch %d start train" % (step_i))
trainer.train_inner_loop(train_reader)
for data in train_reader():
print(data)
trainer.run(feed=data,
fetch=[])
unset http_proxy
unset https_proxy
python fl_master.py
sleep 2
python -u fl_server.py >server0.log &
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册