未验证 提交 12e7f920 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge pull request #22 from qjing666/master

create dataset api and training demo for femnist
## Training process
<img src='../../../images/split_flow.png' width = "1000" height = "440" align="middle"/>
- User send a request to server including task type, image size, etc.
- Server response a path for User where it can download the Encoder.
- User encode the images and send the processed data to DB after serialization.
- Server fetch the uploaded data and start training model.
## Start the service on Server side
```
#python
python server/receiver.py
```
## Start the request on User side
```
#python
python submitter.py
```
import zmq
import socket
import msgpack
import os
random_port = 60001
current_ip = socket.gethostbyname(socket.gethostname())
print(current_ip)
os.system("python -m SimpleHTTPServer 8080 &")
#listening for client
context = zmq.Context()
zmq_socket = context.socket(zmq.REP)
zmq_socket.bind("tcp://{}:{}".format(current_ip, random_port))
print("binding tcp://{}:{}".format(current_ip, random_port))
#get mission and return the path of encoder
message = msgpack.loads(zmq_socket.recv())
print(message["mission"])
zmq_socket.send("user.py")
#wait client finish encoding
while True:
message = zmq_socket.recv()
if message == 'complete':
break
#start training
os.system("python -u server.py > server.log &")
from __future__ import print_function
import os
import paddle
import paddle.fluid as fluid
import numpy
import sys
from paddle.fluid import layers
import redis
import time
from paddle.fluid.param_attr import ParamAttr
import math
import msgpack
def data_generater(samples,r):
# data generater
def train_data():
for item in samples:
sample = msgpack.loads(r.get(str(item)))
conv = sample[0]
label = sample[1]
yield conv,label
return train_data
class ResNet():
def __init__(self, layers=50):
self.layers = layers
def net(self, input, class_dim=10):
layers = self.layers
supported_layers = [18, 34, 50, 101, 152]
assert layers in supported_layers, \
"supported layers are {} but input layer is {}".format(supported_layers, layers)
if layers == 18:
depth = [2, 2, 2, 2]
elif layers == 34 or layers == 50:
depth = [3, 4, 6, 3]
elif layers == 101:
depth = [3, 4, 23, 3]
elif layers == 152:
depth = [3, 8, 36, 3]
num_filters = [64, 128, 256, 512]
conv = input
if layers >= 50:
for block in range(len(depth)):
for i in range(depth[block]):
if layers in [101, 152] and block == 2:
if i == 0:
conv_name = "res" + str(block + 2) + "a"
else:
conv_name = "res" + str(block + 2) + "b" + str(i)
else:
conv_name = "res" + str(block + 2) + chr(97 + i)
conv = self.bottleneck_block(
input=conv,
num_filters=num_filters[block],
stride=2 if i == 0 and block != 0 else 1,
name=conv_name)
pool = fluid.layers.pool2d(
input=conv, pool_type='avg', global_pooling=True)
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
out = fluid.layers.fc(
input=pool,
size=class_dim,
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv, stdv)),
act = "softmax")
else:
for block in range(len(depth)):
for i in range(depth[block]):
conv_name = "res" + str(block + 2) + chr(97 + i)
conv = self.basic_block(
input=conv,
num_filters=num_filters[block],
stride=2 if i == 0 and block != 0 else 1,
is_first=block == i == 0,
name=conv_name)
pool = fluid.layers.pool2d(
input=conv, pool_type='avg', global_pooling=True)
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
out = fluid.layers.fc(
input=pool,
size=class_dim,
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv, stdv)),
act = "softmax")
return out
def conv_bn_layer(self,
input,
num_filters,
filter_size,
stride=1,
groups=1,
act=None,
name=None):
conv = fluid.layers.conv2d(
input=input,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
act=None,
param_attr=ParamAttr(name=name + "_weights"),
bias_attr=False,
name=name + '.conv2d.output.1')
if name == "conv1":
bn_name = "bn_" + name
else:
bn_name = "bn" + name[3:]
return fluid.layers.batch_norm(
input=conv,
act=act,
name=bn_name + '.output.1',
param_attr=ParamAttr(name=bn_name + '_scale'),
bias_attr=ParamAttr(bn_name + '_offset'),
moving_mean_name=bn_name + '_mean',
moving_variance_name=bn_name + '_variance', )
def shortcut(self, input, ch_out, stride, is_first, name):
ch_in = input.shape[1]
if ch_in != ch_out or stride != 1 or is_first == True:
return self.conv_bn_layer(input, ch_out, 1, stride, name=name)
else:
return input
def bottleneck_block(self, input, num_filters, stride, name):
conv0 = self.conv_bn_layer(
input=input,
num_filters=num_filters,
filter_size=1,
act='relu',
name=name + "_branch2a")
conv1 = self.conv_bn_layer(
input=conv0,
num_filters=num_filters,
filter_size=3,
stride=stride,
act='relu',
name=name + "_branch2b")
conv2 = self.conv_bn_layer(
input=conv1,
num_filters=num_filters * 4,
filter_size=1,
act=None,
name=name + "_branch2c")
short = self.shortcut(
input,
num_filters * 4,
stride,
is_first=False,
name=name + "_branch1")
return fluid.layers.elementwise_add(
x=short, y=conv2, act='relu', name=name + ".add.output.5")
def basic_block(self, input, num_filters, stride, is_first, name):
conv0 = self.conv_bn_layer(
input=input,
num_filters=num_filters,
filter_size=3,
act='relu',
stride=stride,
name=name + "_branch2a")
conv1 = self.conv_bn_layer(
input=conv0,
num_filters=num_filters,
filter_size=3,
act=None,
name=name + "_branch2b")
short = self.shortcut(
input, num_filters, stride, is_first, name=name + "_branch1")
return fluid.layers.elementwise_add(x=short, y=conv1, act='relu')
# local redis config
redis_host = "127.0.0.1"
redis_port = 6379
redis_password = ""
r = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password)
# reader generation
reader = fluid.layers.py_reader(capacity=64,
shapes=[(-1,64, 8, 8), (-1,1)],
dtypes=['float32', 'int64'])
samples = r.keys()
train_data = data_generater(samples,r)
reader.decorate_paddle_reader(paddle.batch(
paddle.reader.shuffle(
train_data, buf_size=5000),
batch_size=64))
conv1,label = fluid.layers.read_file(reader)
# train program
place = fluid.CUDAPlace(0)
model = ResNet(layers=50)
predicts = model.net(conv1,10)
cost = fluid.layers.cross_entropy(input=predicts, label=label)
accuracy = fluid.layers.accuracy(input=predicts, label=label)
loss = fluid.layers.mean(cost)
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
optimizer.minimize(loss)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
total_time = 0
EPOCH_NUM = 1
step = 0
train_start = time.time()
# start training
for pass_id in range(EPOCH_NUM):
reader.start()
try:
while True:
start_time = time.time()
loss_value,acc_value = exe.run(fetch_list=[loss.name,accuracy.name])
step += 1
if step % 10 == 0:
print("epoch: "+ str(pass_id)+"step: "+str(step)+"loss: "+ str(loss_value)+"acc: "+str(acc_value))
end_time = time.time()
total_time += (end_time - start_time)
except fluid.core.EOFException:
reader.reset()
train_end = time.time()
print("total time: %d" % (train_end - train_start))
print("computation time: %d" % total_time)
from __future__ import print_function
import os
import paddle
import paddle.fluid as fluid
import numpy
import sys
import redis
import time
from paddle.fluid import layers
from paddle.fluid.param_attr import ParamAttr
import msgpack
def conv_bn_layer(input,
num_filters,
filter_size,
stride=1,
groups=1,
act=None,
name=None):
conv = fluid.layers.conv2d(
input=input,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
act=None,
param_attr=ParamAttr(name=name + "_weights"),
bias_attr=False,
name=name + '.conv2d.output.1')
if name == "conv1":
bn_name = "bn_" + name
else:
bn_name = "bn" + name[3:]
return fluid.layers.batch_norm(
input=conv,
act=act,
name=bn_name + '.output.1',
param_attr=ParamAttr(name=bn_name + '_scale'),
bias_attr=ParamAttr(bn_name + '_offset'),
moving_mean_name=bn_name + '_mean',
moving_variance_name=bn_name + '_variance', )
def load_conf(conf_file, local_dict):
with open(conf_file) as fin:
for line in fin:
group = line.strip().split("=")
if len(group) != 2:
continue
local_dict[group[0]] = group[1]
return local_dict
# redis DB configuration
redis_host = "127.0.0.1"
redis_port = 6379
redis_password = ""
start_time = time.time()
# start a redis client and empty the DB
r = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password)
r.flushall()
# encoding program
images = fluid.layers.data(name='images', shape=[3,32,32], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
place = fluid.CPUPlace()
conv1 = conv_bn_layer(input=images,num_filters=64,filter_size=7,stride=2,act='relu',name="conv1")
pool = fluid.layers.pool2d(input=conv1,pool_size=3,pool_stride=2,pool_padding=1,pool_type='max')
feeder = fluid.DataFeeder(place=place, feed_list=[images,label])
pretrained_model = 'ResNet50_pretrained'
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
# load pretrained mode and prepare datal
def if_exist(var):
return os.path.exists(os.path.join(pretrained_model, var.name))
fluid.io.load_vars(exe, pretrained_model, main_program=fluid.default_main_program(),
predicate=if_exist)
train_data = paddle.dataset.cifar.train10()
step = 0
# start encoding and uploading
for data in train_data():
pre_data = []
pre_data.append(data)
res = exe.run(program=fluid.default_main_program(),feed=feeder.feed(pre_data), fetch_list=[pool.name])
sample = [res[0][0].tolist(),data[1]]
step += 1
file = msgpack.dumps(sample)
r.set(step,file)
if step % 100 == 0:
print(numpy.array(sample[0]).shape)
print("%dstart" % step)
files = r.keys()
print("upload file numbers: %d" % len(files))
end_time = time.time()
total_time = end_time - start_time
print("total time: %d"% total_time)
import zmq
import socket
import msgpack
import os
mission_dict = {"mission": "image classification", "image_size": [3,32,32]}
#send request
context = zmq.Context()
zmq_socket = context.socket(zmq.REQ)
zmq_socket.connect("tcp://127.0.0.1:60001")
zmq_socket.send(msgpack.dumps(mission_dict))
#get and download encoder
file = zmq_socket.recv()
os.system("wget 127.0.0.1:8080/{}".format(file))
#data encoding
os.system("python -u user.py > user.log")
zmq_socket.send("complete")
......@@ -20,3 +20,6 @@ from .strategy.fl_strategy_base import FedAvgStrategy
from .scheduler.agent_master import FLServerAgent
from .scheduler.agent_master import FLWorkerAgent
from .scheduler.agent_master import FLScheduler
from .submitter.client_base import HPCClient
from .submitter.client_base import CloudClient
......@@ -113,6 +113,14 @@ class FLCompileTimeJob(FLJobBase):
self._save_readable_program(
server_main,
"%s/server.main.program.txt" % server_folder)
self._save_str_list(self._feed_names,
"%s/feed_names" % server_folder)
self._save_str_list(self._target_names,
"%s/target_names" % server_folder)
self._save_endpoints(self._server_endpoints,
"%s/endpoints" % server_folder)
self._save_strategy(self._strategy,
"%s/strategy.pkl" % server_folder)
for i in range(trainer_num):
trainer_folder = "%s/trainer%d" % (folder, i)
......@@ -131,6 +139,14 @@ class FLCompileTimeJob(FLJobBase):
self._save_readable_program(
trainer_main,
"%s/trainer.main.program.txt" % trainer_folder)
self._save_str_list(self._feed_names,
"%s/feed_names" % trainer_folder)
self._save_str_list(self._target_names,
"%s/target_names" % trainer_folder)
self._save_endpoints(self._server_endpoints,
"%s/endpoints" % trainer_folder)
self._save_strategy(self._strategy,
"%s/strategy.pkl" % trainer_folder)
for i in range(send_prog_num):
trainer_folder = "%s/trainer%d" % (folder, i)
......@@ -149,17 +165,6 @@ class FLCompileTimeJob(FLJobBase):
trainer_recv,
"%s/trainer.recv.program.txt" % trainer_folder)
self._save_str_list(self._feed_names,
"%s/feed_names" % folder)
self._save_str_list(self._target_names,
"%s/target_names" % folder)
self._save_endpoints(self._server_endpoints,
"%s/endpoints" % folder)
self._save_strategy(self._strategy,
"%s/strategy.pkl" % folder)
class FLRunTimeJob(FLJobBase):
"""
......@@ -211,16 +216,16 @@ class FLRunTimeJob(FLJobBase):
except:
pass
endpoints_fn = "%s/endpoints" % folder
endpoints_fn = "%s/endpoints" % folder_name
self._endpoints = self._load_endpoints(endpoints_fn)
strategy_fn = "%s/strategy.pkl" % folder
strategy_fn = "%s/strategy.pkl" % folder_name
self._strategy = self._load_strategy(strategy_fn)
feed_names_fn = "%s/feed_names" % folder
feed_names_fn = "%s/feed_names" % folder_name
self._feed_names = self._load_str_list(feed_names_fn)
target_names_fn = "%s/target_names" % folder
target_names_fn = "%s/target_names" % folder_name
self._target_names = self._load_str_list(target_names_fn)
def load_server_job(self, folder=None, server_id=0):
......@@ -243,9 +248,9 @@ class FLRunTimeJob(FLJobBase):
main_fn = "%s/server.main.program" % folder_name
self._server_main_program = self._load_program(main_fn)
endpoints_fn = "%s/endpoints" % folder
endpoints_fn = "%s/endpoints" % folder_name
self._endpoints = self._load_endpoints(endpoints_fn)
import pickle
strategy_fn = "%s/strategy.pkl" % folder
strategy_fn = "%s/strategy.pkl" % folder_name
self._strategy = self._load_strategy(strategy_fn)
......@@ -18,25 +18,32 @@ class FLServerAgent(object):
self.scheduler_ep = scheduler_ep
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect("tcp://127.0.0.1:9091")
self.socket.connect("tcp://{}".format(scheduler_ep))
self.current_ep = current_ep
def connect_scheduler(self):
self.socket.send("SERVER_EP\t{}".format(self.current_ep))
self.socket.recv()
while True:
self.socket.send("SERVER_EP\t{}".format(self.current_ep))
message = self.socket.recv()
group = message.split("\t")
if group[0] == 'INIT':
break
class FLWorkerAgent(object):
def __init__(self, scheduler_ep, current_ep):
self.scheduler_ep = scheduler_ep
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect("tcp://127.0.0.1:9091")
self.socket.connect("tcp://{}".format(scheduler_ep))
self.current_ep = current_ep
def connect_scheduler(self):
self.socket.send("WORKER_EP\t{}".format(self.current_ep))
self.socket.recv()
while True:
self.socket.send("WORKER_EP\t{}".format(self.current_ep))
message = self.socket.recv()
group = message.split("\t")
if group[0] == 'INIT':
break
def finish_training(self):
self.socket.send("FINISH\t{}".format(self.current_ep))
......@@ -59,10 +66,13 @@ class FLWorkerAgent(object):
class FLScheduler(object):
def __init__(self, worker_num, server_num, port=9091):
def __init__(self, worker_num, server_num, port=9091, socket=None):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.socket.bind("tcp://*:{}".format(port))
if socket == None:
self.socket = self.context.socket(zmq.REP)
self.socket.bind("tcp://*:{}".format(port))
else:
self.socket = socket
self.worker_num = worker_num
self.server_num = server_num
self.sample_worker_num = 0
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
import sys
import os
class CloudClient(object):
def __init__(self):
pass
def generate_submit_sh(self, job_dir):
with open() as fout:
pass
def generate_job_sh(self, job_dir):
with open() as fout:
pass
def submit(self, **kwargs):
pass
class HPCClient(object):
def __init__(self):
self.conf_dict = {}
def print_args(self):
print("task_name: {}".format(self.task_name))
print("hdfs_path: {}".format(self.hdfs_path))
print("ugi: {}".format(self.ugi))
print("hdfs_output: {}".format(self.hdfs_output))
print("worker_nodes: {}".format(self.worker_nodes))
print("server_nodes: {}".format(self.server_nodes))
print("hadoop_home: {}".format(self.hadoop_home))
print("hpc_home: {}".format(self.hpc_home))
print("train_cmd: {}".format(self.train_cmd))
print("package_path: {}".format(self.package_path))
print("priority: {}".format(self.priority))
print("queue: {}".format(self.queue))
print("server: {}".format(self.server))
print("mpi_node_mem: {}".format(self.mpi_node_mem))
print("pcpu: {}".format(self.pcpu))
print("python_tar: {}".format(self.python_tar))
print("wheel: {}".format(self.wheel))
def check_args(self):
assert self.task_name != ""
assert self.hdfs_path != ""
assert self.ugi != ""
assert self.hdfs_output != ""
assert self.worker_nodes != ""
assert self.server_nodes != ""
assert self.hadoop_home != ""
assert self.hpc_home != ""
assert self.train_cmd != ""
assert self.package_path != ""
assert self.priority != ""
assert self.queue != ""
assert self.server != ""
assert self.mpi_node_mem != ""
assert self.pcpu != ""
assert self.python_tar != ""
assert self.wheel != ""
def generate_qsub_conf(self, job_dir):
with open("{}/qsub.conf".format(job_dir), "w") as fout:
fout.write("SERVER={}\n".format(self.server))
fout.write("QUEUE={}\n".format(self.queue))
fout.write("PRIORITY={}\n".format(self.priority))
fout.write("USE_FLAGS_ADVRES=yes\n")
def generate_submit_sh(self, job_dir):
with open("{}/submit.sh".format(job_dir), "w") as fout:
fout.write("#!/bin/bash\n")
fout.write("unset http_proxy\n")
fout.write("unset https_proxy\n")
fout.write("export HADOOP_HOME={}\n".format(
self.hadoop_home))
fout.write("$HADOOP_HOME/bin/hadoop fs -Dhadoop.job.ugi={}"
" -Dfs.default.name={} -rmr {}\n".format(
self.ugi,
self.hdfs_path,
self.hdfs_output))
fout.write("MPI_NODE_MEM={}\n".format(self.mpi_node_mem))
fout.write("{}/bin/qsub_f -N {} --conf qsub.conf "
"--hdfs {} --ugi {} --hout {} --files ./package "
"-l nodes={},walltime=1000:00:00,pmem-hard={},"
"pcpu-soft={},pnetin-soft=1000,"
"pnetout-soft=1000 job.sh\n".format(
self.hpc_home,
self.task_name,
self.hdfs_path,
self.ugi,
self.hdfs_output,
int(self.worker_nodes) + int(self.server_nodes),
self.mpi_node_mem,
self.pcpu))
def generate_job_sh(self, job_dir):
with open("{}/job.sh".format(job_dir), "w") as fout:
fout.write("#!/bin/bash\n")
fout.write("WORKDIR=`pwd`\n")
fout.write("mpirun -npernode 1 mv package/* ./\n")
fout.write("echo 'current dir: '$WORKDIR\n")
fout.write("mpirun -npernode 1 tar -zxvf python.tar.gz > /dev/null\n")
fout.write("export LIBRARY_PATH=$WORKDIR/python/lib:$LIBRARY_PATH\n")
fout.write("mpirun -npernode 1 python/bin/python -m pip install "
"{} --index-url=http://pip.baidu.com/pypi/simple "
"--trusted-host pip.baidu.com > /dev/null\n".format(
self.wheel))
fout.write("export PATH=python/bin:$PATH\n")
if self.monitor_cmd != "":
fout.write("mpirun -npernode 1 -timestamp-output -tag-output -machinefile "
"${{PBS_NODEFILE}} python/bin/{} > monitor.log 2> monitor.elog &\n".format(self.monitor_cmd))
fout.write("mpirun -npernode 1 -timestamp-output -tag-output -machinefile ${PBS_NODEFILE} python/bin/python train_program.py\n")
fout.write("if [[ $? -ne 0 ]]; then\n")
fout.write(" echo 'Failed to run mpi!' 1>&2\n")
fout.write(" exit 1\n")
fout.write("fi\n")
def submit(self, **kwargs):
# task_name, output_path
self.task_name = kwargs.get("task_name", "test_submit_job")
self.hdfs_path = kwargs.get("hdfs_path", "")
self.ugi = kwargs.get("ugi", "")
self.hdfs_output = kwargs.get("hdfs_output", "")
self.worker_nodes = str(kwargs.get("worker_nodes", 2))
self.server_nodes = str(kwargs.get("server_nodes", 2))
self.hadoop_home = kwargs.get("hadoop_home", "")
self.hpc_home = kwargs.get("hpc_home", "")
self.train_cmd = kwargs.get("train_cmd", "")
self.monitor_cmd = kwargs.get("monitor_cmd", "")
self.package_path = kwargs.get("package_path", "")
self.priority = kwargs.get("priority", "")
self.queue = kwargs.get("queue", "")
self.server = kwargs.get("server", "")
self.mpi_node_mem = str(kwargs.get("mpi_node_mem", 11000))
self.pcpu = str(kwargs.get("pcpu", 180))
self.python_tar = kwargs.get("python_tar", "")
self.wheel = kwargs.get("wheel", "")
self.print_args()
self.check_args()
jobdir = "{}_jobdir".format(self.task_name)
os.system("mkdir -p {}_jobdir".format(self.task_name))
os.system("rm -rf {}/package".format(jobdir))
os.system("cp -r {} {}/package".format(self.package_path, jobdir))
os.system("cp {} {}/package/".format(self.python_tar, jobdir))
os.system("cp {} {}/package/".format(self.wheel, jobdir))
# make submit dir
self.generate_submit_sh(jobdir)
# generate submit.sh
self.generate_job_sh(jobdir)
# generate job.sh
self.generate_qsub_conf(jobdir)
# run submit
os.system("cd {};sh submit.sh > submit.log 2> submit.elog &".format(jobdir))
......@@ -118,6 +118,20 @@ class FedAvgTrainer(FLTrainer):
def reset(self):
self.cur_step = 0
def run_with_epoch(self,reader,feeder,fetch,num_epoch):
self._logger.debug("begin to run recv program")
self.exe.run(self._recv_program)
epoch = 0
for i in range(num_epoch):
print(epoch)
for data in reader():
self.exe.run(self._main_program,
feed=feeder.feed(data),
fetch_list=fetch)
self.cur_step += 1
epoch += 1
self._logger.debug("begin to run send program")
self.exe.run(self._send_program)
def run(self, feed, fetch):
self._logger.debug("begin to run FedAvgTrainer, cur_step=%d, inner_step=%d" %
(self.cur_step, self._step))
......@@ -133,9 +147,9 @@ class FedAvgTrainer(FLTrainer):
self.exe.run(self._send_program)
self.cur_step += 1
return loss
def stop(self):
return False
class SecAggTrainer(FLTrainer):
......
import requests
import os
import json
import tarfile
import random
url = "https://paddlefl.bj.bcebos.com/leaf/"
target_path = "femnist_data"
tar_path = target_path+".tar.gz"
print(tar_path)
def download(url):
r = requests.get(url)
with open(tar_path,'wb') as f:
f.write(r.content)
def extract(tar_path):
tar = tarfile.open(tar_path, "r:gz")
file_names = tar.getnames()
for file_name in file_names:
tar.extract(file_name)
tar.close()
def train(trainer_id,inner_step,batch_size,count_by_step):
if not os.path.exists(target_path):
print("Preparing data...")
if not os.path.exists(tar_path):
download(url+tar_path)
extract(tar_path)
def train_data():
train_file = open("./femnist_data/train/all_data_%d_niid_0_keep_0_train_9.json" % trainer_id,'r')
json_train = json.load(train_file)
users = json_train["users"]
rand = random.randrange(0,len(users)) # random choose a user from each trainer
cur_user = users[rand]
print('training using '+cur_user)
train_images = json_train["user_data"][cur_user]['x']
train_labels = json_train["user_data"][cur_user]['y']
if count_by_step:
for i in xrange(inner_step*batch_size):
yield train_images[i%(len(train_images))], train_labels[i%(len(train_images))]
else:
for i in xrange(len(train_images)):
yield train_images[i], train_labels[i]
train_file.close()
return train_data
def test(trainer_id,inner_step,batch_size,count_by_step):
if not os.path.exists(target_path):
print("Preparing data...")
if not os.path.exists(tar_path):
download(url+tar_path)
extract(tar_path)
def test_data():
test_file = open("./femnist_data/test/all_data_%d_niid_0_keep_0_test_9.json" % trainer_id, 'r')
json_test = json.load(test_file)
users = json_test["users"]
for user in users:
test_images = json_test['user_data'][user]['x']
test_labels = json_test['user_data'][user]['y']
for i in xrange(len(test_images)):
yield test_images[i], test_labels[i]
test_file.close()
return test_data
......@@ -47,5 +47,5 @@ strategy = build_strategy.create_fl_strategy()
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
job_generator.generate_fl_job(
strategy, server_endpoints=endpoints, worker_num=5, output=output)
strategy, server_endpoints=endpoints, worker_num=2, output=output)
# fl_job_config will be dispatched to workers
from paddle_fl.core.scheduler.agent_master import FLScheduler
worker_num = 5
worker_num = 2
server_num = 1
scheduler = FLScheduler(worker_num,server_num)
scheduler.set_sample_worker_num(5)
scheduler.set_sample_worker_num(worker_num)
scheduler.init_env()
print("init env done.")
scheduler.start_fl_training()
......@@ -23,7 +23,6 @@ job._scheduler_ep = "127.0.0.1:9091"
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id)
trainer.start()
print(trainer._scheduler_ep, trainer._current_ep)
output_folder = "fl_model"
step_i = 0
......@@ -38,3 +37,4 @@ while not trainer.stop():
step_i += 1
if step_i % 100 == 0:
trainer.save_inference_program(output_folder)
......@@ -6,7 +6,7 @@ python -u fl_scheduler.py > scheduler.log &
sleep 5
python -u fl_server.py >server0.log &
sleep 2
for ((i=0;i<5;i++))
for ((i=0;i<2;i++))
do
python -u fl_trainer.py $i >trainer$i.log &
sleep 2
......
import paddle.fluid as fluid
import paddle_fl as fl
from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
class Model(object):
def __init__(self):
pass
def cnn(self):
self.inputs = fluid.layers.data(name='img', shape=[1, 28, 28], dtype="float32")
self.label = fluid.layers.data(name='label', shape=[1],dtype='int64')
self.conv_pool_1 = fluid.nets.simple_img_conv_pool(input=self.inputs,num_filters=20,filter_size=5,pool_size=2,pool_stride=2,act='relu')
self.conv_pool_2 = fluid.nets.simple_img_conv_pool(input=self.conv_pool_1,num_filters=50,filter_size=5,pool_size=2,pool_stride=2,act='relu')
self.predict = self.predict = fluid.layers.fc(input=self.conv_pool_2, size=62, act='softmax')
self.cost = fluid.layers.cross_entropy(input=self.predict, label=self.label)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=self.label)
self.loss = fluid.layers.mean(self.cost)
self.startup_program = fluid.default_startup_program()
model = Model()
model.cnn()
job_generator = JobGenerator()
optimizer = fluid.optimizer.SGD(learning_rate=0.1)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names(
[model.inputs.name, model.label.name], [model.loss.name, model.accuracy.name])
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 1
strategy = build_strategy.create_fl_strategy()
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
job_generator.generate_fl_job(
strategy, server_endpoints=endpoints, worker_num=4, output=output)
from paddle_fl.core.scheduler.agent_master import FLScheduler
worker_num = 4
server_num = 1
scheduler = FLScheduler(worker_num,server_num)
scheduler.set_sample_worker_num(4)
scheduler.init_env()
print("init env done.")
scheduler.start_fl_training()
import paddle_fl as fl
import paddle.fluid as fluid
from paddle_fl.core.server.fl_server import FLServer
from paddle_fl.core.master.fl_job import FLRunTimeJob
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
job._scheduler_ep = "127.0.0.1:9091"
server.set_server_job(job)
server._current_ep = "127.0.0.1:8181"
server.start()
from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
from paddle_fl.core.master.fl_job import FLRunTimeJob
import paddle_fl.dataset.femnist
import numpy
import sys
import paddle
import paddle.fluid as fluid
import logging
import math
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
job._scheduler_ep = "127.0.0.1:9091"
print(job._target_names)
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id)
trainer.start()
print(trainer._step)
test_program = trainer._main_program.clone(for_test=True)
img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace())
def train_test(train_test_program, train_test_feed, train_test_reader):
acc_set = []
for test_data in train_test_reader():
acc_np = trainer.exe.run(
program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0"])
acc_set.append(float(acc_np[0]))
acc_val_mean = numpy.array(acc_set).mean()
return acc_val_mean
epoch_id = 0
step = 0
epoch = 3000
count_by_step = True
if count_by_step:
output_folder = "model_node%d" % trainer_id
else:
output_folder = "model_node%d_epoch" % trainer_id
while not trainer.stop():
count = 0
epoch_id += 1
if epoch_id > epoch:
break
print("epoch %d start train" % (epoch_id))
#train_data,test_data= data_generater(trainer_id,inner_step=trainer._step,batch_size=64,count_by_step=count_by_step)
train_reader = paddle.batch(
paddle.reader.shuffle(paddle_fl.dataset.femnist.train(trainer_id,inner_step=trainer._step,batch_size=64,count_by_step=count_by_step), buf_size=500),
batch_size=64)
test_reader = paddle.batch(
paddle_fl.dataset.femnist.test(trainer_id,inner_step=trainer._step,batch_size=64,count_by_step=count_by_step), batch_size=64)
if count_by_step:
for step_id, data in enumerate(train_reader()):
acc = trainer.run(feeder.feed(data), fetch=["accuracy_0.tmp_0"])
step += 1
count += 1
print(count)
if count % trainer._step == 0:
break
# print("acc:%.3f" % (acc[0]))
else:
trainer.run_with_epoch(train_reader,feeder,fetch=["accuracy_0.tmp_0"],num_epoch=1)
acc_val = train_test(
train_test_program=test_program,
train_test_reader=test_reader,
train_test_feed=feeder)
print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val))
if trainer_id == 0:
save_dir = (output_folder + "/epoch_%d") % epoch_id
trainer.save_inference_program(output_folder)
#killall python
python fl_master.py
sleep 2
python -u fl_scheduler.py >scheduler.log &
sleep 2
python -u fl_server.py >server0.log &
sleep 2
for ((i=0;i<4;i++))
do
python -u fl_trainer.py $i >trainer$i.log &
sleep 2
done
#!/bin/bash
echo "Stop service!"
ps -ef | grep -E "fl" | grep -v grep | awk '{print $2}' | xargs kill -9
# commonly configured
task_name=test_fl_job_submit_jingqinghe
hdfs_output=/user/feed/mlarch/sequence_generator/dongdaxiang/job_44
train_cmd=python dist_trainer.py
monitor_cmd=python system_monitor_app.py 10 100
#train_cmd=python test_hadoop.py
hdfs_path=afs://xingtian.afs.baidu.com:9902
ugi=mlarch,Fv1M87
hdfs_output=/user/feed/mlarch/sequence_generator/dongdaxiang/job_44
worker_nodes=2
server_nodes=1
hadoop_home=/home/jingqinghe/hadoop-xingtian/hadoop
hpc_home=/home/jingqinghe/mpi_feed4/smart_client
package_path=./package
priority=high
#queue name
queue=paddle-dev-amd
server=yq01-hpc-lvliang01-smart-master.dmop.baidu.com
python_tar=./python.tar.gz
wheel=./paddlepaddle-0.0.0-cp27-cp27mu-linux_x86_64.whl
/home/jingqinghe/mpi_feed4/smart_client/bin/qdel $1".yq01-hpc-lvliang01-smart-master.dmop.baidu.com"
import paddle.fluid as fluid
class Model(object):
def __init__(self):
pass
def mlp(self, inputs, label, hidden_size=128):
self.concat = fluid.layers.concat(inputs, axis=1)
self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu')
self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu')
self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax')
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=label)
self.loss = fluid.layers.reduce_mean(self.sum_cost)
self.startup_program = fluid.default_startup_program()
tar -xf python.tar.gz
python/bin/python scheduler_client.py conf.txt
import os
import socket
import random
import zmq
import time
import sys
from paddle_fl.core.submitter.client_base import HPCClient
from paddle_fl.core.scheduler.agent_master import FLScheduler
import paddle.fluid as fluid
from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
from model import Model
import tarfile
#random_port = random.randint(60001, 64001)
random_port = 60001
print(random_port)
current_ip = socket.gethostbyname(socket.gethostname())
endpoints = "{}:{}".format(current_ip, random_port)
#start a web server for remote endpoints to download their config
os.system("python -m SimpleHTTPServer 8080 &")
if os.path.exists("job_config"):
os.system("rm -rf job_config")
if os.path.exists("package"):
os.system("rm -rf package")
os.system("mkdir package")
os.system("cp train_program.py package")
with open("package/scheduler.conf", "w") as fout:
fout.write("ENDPOINT\t{}\n".format(endpoints))
# submit a job with current endpoint
default_dict = {
"task_name": "test_submit_job",
"hdfs_path": "afs://xingtian.afs.baidu.com:9902",
"ugi": "",
"worker_nodes": 5,
"server_nodes": 5,
"hadoop_home": "/home/jingqinghe/hadoop-xingtian/hadoop",
"hpc_home": "/home/jingqinghe/mpi_feed4/smart_client",
"package_path": "./package",
"priority": "high",
"queue": "paddle-dev-amd",
"server": "yq01-hpc-lvliang01-smart-master.dmop.baidu.com",
"mpi_node_mem": 11000,
"pcpu": 180,
"python_tar": "./python.tar.gz",
"wheel": "./paddlepaddle-0.0.0-cp27-cp27mu-linux_x86_64-0.whl"
}
def load_conf(conf_file, local_dict):
with open(conf_file) as fin:
for line in fin:
group = line.strip().split("=")
if len(group) != 2:
continue
local_dict[group[0]] = group[1]
return local_dict
client = HPCClient()
default_dict = load_conf(sys.argv[1], default_dict)
client.submit(
task_name=default_dict["task_name"],
hdfs_path=default_dict["hdfs_path"],
ugi=default_dict["ugi"],
hdfs_output=default_dict["hdfs_output"],
worker_nodes=default_dict["worker_nodes"],
server_nodes=default_dict["server_nodes"],
hadoop_home=default_dict["hadoop_home"],
hpc_home=default_dict["hpc_home"],
train_cmd=default_dict["train_cmd"],
monitor_cmd=default_dict["monitor_cmd"],
package_path=default_dict["package_path"],
priority=default_dict["priority"],
queue=default_dict["queue"],
server=default_dict["server"],
mpi_node_mem=default_dict["mpi_node_mem"],
pcpu=default_dict["pcpu"],
python_tar=default_dict["python_tar"],
wheel=default_dict["wheel"])
print("submit mpi job done.")
# start scheduler and receive the ip of allocated endpoints
context = zmq.Context()
zmq_socket = context.socket(zmq.REP)
zmq_socket.bind("tcp://{}:{}".format(current_ip, random_port))
print("binding tcp://{}:{}".format(current_ip, random_port))
all_ips_ready = False
ip_list = []
scheduler = FLScheduler(int(default_dict["worker_nodes"]),
int(default_dict["server_nodes"]),
port=random_port, socket=zmq_socket)
scheduler.set_sample_worker_num(int(default_dict["worker_nodes"]))
print("going to wait all ips ready")
while not all_ips_ready:
message = zmq_socket.recv()
group = message.split("\t")
if group[0] == "ENDPOINT":
ip_list.append(group[1])
zmq_socket.send("ACCEPT\t{}".format(group[1]))
else:
zmq_socket.send("WAIT\t0")
if len(ip_list) == \
int(default_dict["worker_nodes"]) + \
int(default_dict["server_nodes"]):
all_ips_ready = True
print("all worker ips are collected")
print(ip_list)
#allocate the role of each endpoint and their ids
ip_role = {}
for i in range(len(ip_list)):
if i < int(default_dict["server_nodes"]):
ip_role[ip_list[i]] = 'server%d' % i
else:
ip_role[ip_list[i]] = 'trainer%d' % (i-int(default_dict["server_nodes"]))
print(ip_role)
def job_generate():
#generate a fl job which is the same as fl_master
inputs = [fluid.layers.data( \
name=str(slot_id), shape=[5],
dtype="float32")
for slot_id in range(3)]
label = fluid.layers.data( \
name="label",
shape=[1],
dtype='int64')
model = Model()
model.mlp(inputs, label)
job_generator = JobGenerator()
optimizer = fluid.optimizer.SGD(learning_rate=0.1)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names(
[x.name for x in inputs], [model.predict.name])
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 10
strategy = build_strategy.create_fl_strategy()
# endpoints will be collected through the cluster
# in this example, we suppose endpoints have been collected
server_ip = ["{}".format(ip_list[0])]
output = "job_config"
job_generator.generate_fl_job(
strategy, server_endpoints=server_ip, worker_num=int(default_dict["worker_nodes"]), output=output)
file_list = os.listdir(output)
for file in file_list:
tar = tarfile.open('{}/{}.tar.gz'.format(output,file),'w:gz')
for root,dir,files in os.walk("{}/{}".format(output,file)):
for f in files:
fullpath = os.path.join(root,f)
tar.add(fullpath)
tar.close()
job_generate()
#send the allocated rolls to the remote endpoints
all_job_sent = False
download_job = []
while not all_job_sent:
message = zmq_socket.recv()
group = message.split("\t")
if group[0] == "GET_FL_JOB":
download_job.append(group[1])
zmq_socket.send(ip_role[group[1]])
else:
zmq_socket.send("WAIT\t0")
if len(download_job) == len(ip_list):
all_job_sent = True
#start training
scheduler.init_env()
print("init env done.")
scheduler.start_fl_training()
import socket
import random
import zmq
import os
import tarfile
import paddle_fl as fl
import paddle.fluid as fluid
from paddle_fl.core.server.fl_server import FLServer
from paddle_fl.core.master.fl_job import FLRunTimeJob
from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
import numpy as np
import sys
import logging
import time
random_port = 60001
scheduler_conf = {}
#connect to scheduler and get the role and id of the endpoint
with open("scheduler.conf") as fin:
for line in fin:
line = line.strip()
group = line.split("\t")
scheduler_conf[group[0]] = group[1]
current_ip = socket.gethostbyname(socket.gethostname())
endpoint = "{}:{}".format(current_ip, random_port)
scheduler_ip = scheduler_conf["ENDPOINT"].split(":")
download_url = "{}:8080".format(scheduler_ip[0])
print(download_url)
context = zmq.Context()
zmq_socket = context.socket(zmq.REQ)
zmq_socket.connect(
"tcp://{}".format(scheduler_conf["ENDPOINT"]))
zmq_socket.send("ENDPOINT\t{}".format(endpoint))
message = zmq_socket.recv()
print(message)
message = ""
#download the config file from scheduler
while True:
zmq_socket.send("GET_FL_JOB\t{}".format(endpoint))
message = zmq_socket.recv()
group = message.split("\t")
if group[0] == "WAIT":
continue
else:
os.system("wget {}/job_config/{}.tar.gz".format(download_url,message))
print(message)
break
os.system("ls")
os.system("gzip -d {}.tar.gz".format(message))
print("gzip finish")
os.system("tar -xf {}.tar".format(message))
os.system("ls")
zmq_socket.close()
print("close socket")
#program start
if 'server' in message:
server = FLServer()
server_id = 0
job_path = "job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
job._scheduler_ep = scheduler_conf["ENDPOINT"]
server.set_server_job(job)
server._current_ep = endpoint
server.start()
else:
def reader():
for i in range(1000):
data_dict = {}
for i in range(3):
data_dict[str(i)] = np.random.rand(1, 5).astype('float32')
data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64')
yield data_dict
trainer_id = message.split("trainer")[1]
job_path = "job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, int(trainer_id))
job._scheduler_ep = scheduler_conf["ENDPOINT"]
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = endpoint
trainer.start()
print(trainer._scheduler_ep, trainer._current_ep)
output_folder = "fl_model"
step_i = 0
while not trainer.stop():
print("batch %d start train" % (step_i))
train_step = 0
for data in reader():
trainer.run(feed=data, fetch=[])
train_step += 1
if train_step == trainer._step:
break
step_i += 1
if step_i % 100 == 0:
trainer.save_inference_program(output_folder)
......@@ -12,5 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
""" PaddleFL version string """
fl_version = "0.1.1"
module_proto_version = "0.1.1"
fl_version = "0.1.2"
module_proto_version = "0.1.2"
......@@ -29,7 +29,7 @@ def python_version():
max_version, mid_version, min_version = python_version()
REQUIRED_PACKAGES = [
'six >= 1.10.0', 'protobuf >= 3.1.0','paddlepaddle >= 1.6'
'six >= 1.10.0', 'protobuf >= 3.1.0','paddlepaddle >= 1.6'
]
if max_version < 3:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册