提交 a6caa651 编写于 作者: Q qjing666

fix code style

上级 d4e75537
...@@ -12,14 +12,14 @@ import math ...@@ -12,14 +12,14 @@ import math
import msgpack import msgpack
def data_generater(samples,r): def data_generater(samples, r):
# data generater # data generater
def train_data(): def train_data():
for item in samples: for item in samples:
sample = msgpack.loads(r.get(str(item))) sample = msgpack.loads(r.get(str(item)))
conv = sample[0] conv = sample[0]
label = sample[1] label = sample[1]
yield conv,label yield conv, label
return train_data return train_data
...@@ -67,7 +67,7 @@ class ResNet(): ...@@ -67,7 +67,7 @@ class ResNet():
size=class_dim, size=class_dim,
param_attr=fluid.param_attr.ParamAttr( param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv, stdv)), initializer=fluid.initializer.Uniform(-stdv, stdv)),
act = "softmax") act="softmax")
else: else:
for block in range(len(depth)): for block in range(len(depth)):
for i in range(depth[block]): for i in range(depth[block]):
...@@ -87,7 +87,7 @@ class ResNet(): ...@@ -87,7 +87,7 @@ class ResNet():
size=class_dim, size=class_dim,
param_attr=fluid.param_attr.ParamAttr( param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv, stdv)), initializer=fluid.initializer.Uniform(-stdv, stdv)),
act = "softmax") act="softmax")
return out return out
def conv_bn_layer(self, def conv_bn_layer(self,
...@@ -123,8 +123,6 @@ class ResNet(): ...@@ -123,8 +123,6 @@ class ResNet():
moving_mean_name=bn_name + '_mean', moving_mean_name=bn_name + '_mean',
moving_variance_name=bn_name + '_variance', ) moving_variance_name=bn_name + '_variance', )
def shortcut(self, input, ch_out, stride, is_first, name): def shortcut(self, input, ch_out, stride, is_first, name):
ch_in = input.shape[1] ch_in = input.shape[1]
if ch_in != ch_out or stride != 1 or is_first == True: if ch_in != ch_out or stride != 1 or is_first == True:
...@@ -181,31 +179,33 @@ class ResNet(): ...@@ -181,31 +179,33 @@ class ResNet():
input, num_filters, stride, is_first, name=name + "_branch1") input, num_filters, stride, is_first, name=name + "_branch1")
return fluid.layers.elementwise_add(x=short, y=conv1, act='relu') return fluid.layers.elementwise_add(x=short, y=conv1, act='relu')
# local redis config # local redis config
redis_host = "127.0.0.1" redis_host = "127.0.0.1"
redis_port = 6379 redis_port = 6379
redis_password = "" redis_password = ""
r = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password) r = redis.StrictRedis(
host=redis_host, port=redis_port, password=redis_password)
# reader generation # reader generation
reader = fluid.layers.py_reader(capacity=64, reader = fluid.layers.py_reader(
shapes=[(-1,64, 8, 8), (-1,1)], capacity=64, shapes=[(-1, 64, 8, 8), (-1, 1)],
dtypes=['float32', 'int64']) dtypes=['float32', 'int64'])
samples = r.keys() samples = r.keys()
train_data = data_generater(samples,r) train_data = data_generater(samples, r)
reader.decorate_paddle_reader(paddle.batch( reader.decorate_paddle_reader(
paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
train_data, buf_size=5000), train_data, buf_size=5000), batch_size=64))
batch_size=64))
conv1,label = fluid.layers.read_file(reader) conv1, label = fluid.layers.read_file(reader)
# train program # train program
place = fluid.CUDAPlace(0) place = fluid.CUDAPlace(0)
model = ResNet(layers=50) model = ResNet(layers=50)
predicts = model.net(conv1,10) predicts = model.net(conv1, 10)
cost = fluid.layers.cross_entropy(input=predicts, label=label) cost = fluid.layers.cross_entropy(input=predicts, label=label)
accuracy = fluid.layers.accuracy(input=predicts, label=label) accuracy = fluid.layers.accuracy(input=predicts, label=label)
loss = fluid.layers.mean(cost) loss = fluid.layers.mean(cost)
...@@ -226,10 +226,12 @@ for pass_id in range(EPOCH_NUM): ...@@ -226,10 +226,12 @@ for pass_id in range(EPOCH_NUM):
try: try:
while True: while True:
start_time = time.time() start_time = time.time()
loss_value,acc_value = exe.run(fetch_list=[loss.name,accuracy.name]) loss_value, acc_value = exe.run(
fetch_list=[loss.name, accuracy.name])
step += 1 step += 1
if step % 10 == 0: if step % 10 == 0:
print("epoch: "+ str(pass_id)+"step: "+str(step)+"loss: "+ str(loss_value)+"acc: "+str(acc_value)) print("epoch: " + str(pass_id) + "step: " + str(step) +
"loss: " + str(loss_value) + "acc: " + str(acc_value))
end_time = time.time() end_time = time.time()
total_time += (end_time - start_time) total_time += (end_time - start_time)
except fluid.core.EOFException: except fluid.core.EOFException:
......
...@@ -9,6 +9,8 @@ import time ...@@ -9,6 +9,8 @@ import time
from paddle.fluid import layers from paddle.fluid import layers
from paddle.fluid.param_attr import ParamAttr from paddle.fluid.param_attr import ParamAttr
import msgpack import msgpack
def conv_bn_layer(input, def conv_bn_layer(input,
num_filters, num_filters,
filter_size, filter_size,
...@@ -51,6 +53,7 @@ def load_conf(conf_file, local_dict): ...@@ -51,6 +53,7 @@ def load_conf(conf_file, local_dict):
local_dict[group[0]] = group[1] local_dict[group[0]] = group[1]
return local_dict return local_dict
# redis DB configuration # redis DB configuration
redis_host = "127.0.0.1" redis_host = "127.0.0.1"
redis_port = 6379 redis_port = 6379
...@@ -58,27 +61,40 @@ redis_password = "" ...@@ -58,27 +61,40 @@ redis_password = ""
start_time = time.time() start_time = time.time()
# start a redis client and empty the DB # start a redis client and empty the DB
r = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password) r = redis.StrictRedis(
host=redis_host, port=redis_port, password=redis_password)
r.flushall() r.flushall()
# encoding program # encoding program
images = fluid.layers.data(name='images', shape=[3,32,32], dtype='float32') images = fluid.layers.data(name='images', shape=[3, 32, 32], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64') label = fluid.layers.data(name='label', shape=[1], dtype='int64')
place = fluid.CPUPlace() place = fluid.CPUPlace()
conv1 = conv_bn_layer(input=images,num_filters=64,filter_size=7,stride=2,act='relu',name="conv1") conv1 = conv_bn_layer(
pool = fluid.layers.pool2d(input=conv1,pool_size=3,pool_stride=2,pool_padding=1,pool_type='max') input=images,
feeder = fluid.DataFeeder(place=place, feed_list=[images,label]) num_filters=64,
filter_size=7,
stride=2,
act='relu',
name="conv1")
pool = fluid.layers.pool2d(
input=conv1, pool_size=3, pool_stride=2, pool_padding=1, pool_type='max')
feeder = fluid.DataFeeder(place=place, feed_list=[images, label])
pretrained_model = 'ResNet50_pretrained' pretrained_model = 'ResNet50_pretrained'
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
# load pretrained mode and prepare datal # load pretrained mode and prepare datal
def if_exist(var): def if_exist(var):
return os.path.exists(os.path.join(pretrained_model, var.name)) return os.path.exists(os.path.join(pretrained_model, var.name))
fluid.io.load_vars(exe, pretrained_model, main_program=fluid.default_main_program(),
predicate=if_exist)
fluid.io.load_vars(
exe,
pretrained_model,
main_program=fluid.default_main_program(),
predicate=if_exist)
train_data = paddle.dataset.cifar.train10() train_data = paddle.dataset.cifar.train10()
step = 0 step = 0
...@@ -86,11 +102,13 @@ step = 0 ...@@ -86,11 +102,13 @@ step = 0
for data in train_data(): for data in train_data():
pre_data = [] pre_data = []
pre_data.append(data) pre_data.append(data)
res = exe.run(program=fluid.default_main_program(),feed=feeder.feed(pre_data), fetch_list=[pool.name]) res = exe.run(program=fluid.default_main_program(),
sample = [res[0][0].tolist(),data[1]] feed=feeder.feed(pre_data),
fetch_list=[pool.name])
sample = [res[0][0].tolist(), data[1]]
step += 1 step += 1
file = msgpack.dumps(sample) file = msgpack.dumps(sample)
r.set(step,file) r.set(step, file)
if step % 100 == 0: if step % 100 == 0:
print(numpy.array(sample[0]).shape) print(numpy.array(sample[0]).shape)
print("%dstart" % step) print("%dstart" % step)
...@@ -99,6 +117,4 @@ files = r.keys() ...@@ -99,6 +117,4 @@ files = r.keys()
print("upload file numbers: %d" % len(files)) print("upload file numbers: %d" % len(files))
end_time = time.time() end_time = time.time()
total_time = end_time - start_time total_time = end_time - start_time
print("total time: %d"% total_time) print("total time: %d" % total_time)
...@@ -2,7 +2,7 @@ import zmq ...@@ -2,7 +2,7 @@ import zmq
import socket import socket
import msgpack import msgpack
import os import os
mission_dict = {"mission": "image classification", "image_size": [3,32,32]} mission_dict = {"mission": "image classification", "image_size": [3, 32, 32]}
#send request #send request
context = zmq.Context() context = zmq.Context()
zmq_socket = context.socket(zmq.REQ) zmq_socket = context.socket(zmq.REQ)
......
...@@ -4,16 +4,22 @@ from paddle_fl.core.master.job_generator import JobGenerator ...@@ -4,16 +4,22 @@ from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
import math import math
class Model(object): class Model(object):
def __init__(self): def __init__(self):
pass pass
def lr_network(self): def lr_network(self):
self.inputs = fluid.layers.data(name='img', shape=[1, 28, 28], dtype="float32") self.inputs = fluid.layers.data(
self.label = fluid.layers.data(name='label', shape=[1],dtype='int64') name='img', shape=[1, 28, 28], dtype="float32")
self.predict = fluid.layers.fc(input=self.inputs, size=10, act='softmax') self.label = fluid.layers.data(name='label', shape=[1], dtype='int64')
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=self.label) self.predict = fluid.layers.fc(input=self.inputs,
self.accuracy = fluid.layers.accuracy(input=self.predict, label=self.label) size=10,
act='softmax')
self.sum_cost = fluid.layers.cross_entropy(
input=self.predict, label=self.label)
self.accuracy = fluid.layers.accuracy(
input=self.predict, label=self.label)
self.loss = fluid.layers.mean(self.sum_cost) self.loss = fluid.layers.mean(self.sum_cost)
self.startup_program = fluid.default_startup_program() self.startup_program = fluid.default_startup_program()
...@@ -23,7 +29,7 @@ model.lr_network() ...@@ -23,7 +29,7 @@ model.lr_network()
STEP_EPSILON = 0.1 STEP_EPSILON = 0.1
DELTA = 0.00001 DELTA = 0.00001
SIGMA = math.sqrt(2.0 * math.log(1.25/DELTA)) / STEP_EPSILON SIGMA = math.sqrt(2.0 * math.log(1.25 / DELTA)) / STEP_EPSILON
CLIP = 4.0 CLIP = 4.0
batch_size = 64 batch_size = 64
...@@ -33,7 +39,8 @@ job_generator.set_optimizer(optimizer) ...@@ -33,7 +39,8 @@ job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss]) job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program) job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names( job_generator.set_infer_feed_and_target_names(
[model.inputs.name, model.label.name], [model.loss.name, model.accuracy.name]) [model.inputs.name, model.label.name],
[model.loss.name, model.accuracy.name])
build_strategy = FLStrategyFactory() build_strategy = FLStrategyFactory()
build_strategy.dpsgd = True build_strategy.dpsgd = True
......
...@@ -3,7 +3,7 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler ...@@ -3,7 +3,7 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler
worker_num = 4 worker_num = 4
server_num = 1 server_num = 1
#Define number of worker/server and the port for scheduler #Define number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num,server_num,port=9091) scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(4) scheduler.set_sample_worker_num(4)
scheduler.init_env() scheduler.init_env()
print("init env done.") print("init env done.")
......
...@@ -7,7 +7,12 @@ import paddle.fluid as fluid ...@@ -7,7 +7,12 @@ import paddle.fluid as fluid
import logging import logging
import math import math
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG) logging.basicConfig(
filename="test.log",
filemode="w",
format="%(asctime)s %(name)s:%(levelname)s:%(message)s",
datefmt="%d-%M-%Y %H:%M:%S",
level=logging.DEBUG)
trainer_id = int(sys.argv[1]) # trainer id for each guest trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config" job_path = "fl_job_config"
...@@ -15,36 +20,38 @@ job = FLRunTimeJob() ...@@ -15,36 +20,38 @@ job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id) job.load_trainer_job(job_path, trainer_id)
job._scheduler_ep = "127.0.0.1:9091" # Inform scheduler IP address to trainer job._scheduler_ep = "127.0.0.1:9091" # Inform scheduler IP address to trainer
trainer = FLTrainerFactory().create_fl_trainer(job) trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) trainer._current_ep = "127.0.0.1:{}".format(9000 + trainer_id)
trainer.start() trainer.start()
test_program = trainer._main_program.clone(for_test=True) test_program = trainer._main_program.clone(for_test=True)
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.reader.shuffle(paddle.dataset.mnist.train(), buf_size=500), paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=500),
batch_size=64) batch_size=64)
test_reader = paddle.batch( test_reader = paddle.batch(paddle.dataset.mnist.test(), batch_size=64)
paddle.dataset.mnist.test(), batch_size=64)
img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64') label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace()) feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace())
def train_test(train_test_program, train_test_feed, train_test_reader): def train_test(train_test_program, train_test_feed, train_test_reader):
acc_set = [] acc_set = []
for test_data in train_test_reader(): for test_data in train_test_reader():
acc_np = trainer.exe.run( acc_np = trainer.exe.run(program=train_test_program,
program=train_test_program,
feed=train_test_feed.feed(test_data), feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0"]) fetch_list=["accuracy_0.tmp_0"])
acc_set.append(float(acc_np[0])) acc_set.append(float(acc_np[0]))
acc_val_mean = numpy.array(acc_set).mean() acc_val_mean = numpy.array(acc_set).mean()
return acc_val_mean return acc_val_mean
def compute_privacy_budget(sample_ratio, epsilon, step, delta): def compute_privacy_budget(sample_ratio, epsilon, step, delta):
E = 2 * epsilon * math.sqrt(step * sample_ratio) E = 2 * epsilon * math.sqrt(step * sample_ratio)
print("({0}, {1})-DP".format(E, delta)) print("({0}, {1})-DP".format(E, delta))
output_folder = "model_node%d" % trainer_id output_folder = "model_node%d" % trainer_id
epoch_id = 0 epoch_id = 0
step = 0 step = 0
...@@ -64,7 +71,8 @@ while not trainer.stop(): ...@@ -64,7 +71,8 @@ while not trainer.stop():
train_test_feed=feeder) train_test_feed=feeder)
print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val)) print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val))
compute_privacy_budget(sample_ratio=0.001, epsilon=0.1, step=step, delta=0.00001) compute_privacy_budget(
sample_ratio=0.001, epsilon=0.1, step=step, delta=0.00001)
save_dir = (output_folder + "/epoch_%d") % epoch_id save_dir = (output_folder + "/epoch_%d") % epoch_id
trainer.save_inference_program(output_folder) trainer.save_inference_program(output_folder)
...@@ -9,14 +9,31 @@ class Model(object): ...@@ -9,14 +9,31 @@ class Model(object):
pass pass
def cnn(self): def cnn(self):
self.inputs = fluid.layers.data(name='img', shape=[1, 28, 28], dtype="float32") self.inputs = fluid.layers.data(
self.label = fluid.layers.data(name='label', shape=[1],dtype='int64') name='img', shape=[1, 28, 28], dtype="float32")
self.conv_pool_1 = fluid.nets.simple_img_conv_pool(input=self.inputs,num_filters=20,filter_size=5,pool_size=2,pool_stride=2,act='relu') self.label = fluid.layers.data(name='label', shape=[1], dtype='int64')
self.conv_pool_2 = fluid.nets.simple_img_conv_pool(input=self.conv_pool_1,num_filters=50,filter_size=5,pool_size=2,pool_stride=2,act='relu') self.conv_pool_1 = fluid.nets.simple_img_conv_pool(
input=self.inputs,
self.predict = self.predict = fluid.layers.fc(input=self.conv_pool_2, size=62, act='softmax') num_filters=20,
self.cost = fluid.layers.cross_entropy(input=self.predict, label=self.label) filter_size=5,
self.accuracy = fluid.layers.accuracy(input=self.predict, label=self.label) pool_size=2,
pool_stride=2,
act='relu')
self.conv_pool_2 = fluid.nets.simple_img_conv_pool(
input=self.conv_pool_1,
num_filters=50,
filter_size=5,
pool_size=2,
pool_stride=2,
act='relu')
self.predict = self.predict = fluid.layers.fc(input=self.conv_pool_2,
size=62,
act='softmax')
self.cost = fluid.layers.cross_entropy(
input=self.predict, label=self.label)
self.accuracy = fluid.layers.accuracy(
input=self.predict, label=self.label)
self.loss = fluid.layers.mean(self.cost) self.loss = fluid.layers.mean(self.cost)
self.startup_program = fluid.default_startup_program() self.startup_program = fluid.default_startup_program()
...@@ -30,8 +47,8 @@ job_generator.set_optimizer(optimizer) ...@@ -30,8 +47,8 @@ job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss]) job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program) job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names( job_generator.set_infer_feed_and_target_names(
[model.inputs.name, model.label.name], [model.loss.name, model.accuracy.name]) [model.inputs.name, model.label.name],
[model.loss.name, model.accuracy.name])
build_strategy = FLStrategyFactory() build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True build_strategy.fed_avg = True
......
...@@ -3,7 +3,7 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler ...@@ -3,7 +3,7 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler
worker_num = 4 worker_num = 4
server_num = 1 server_num = 1
# Define the number of worker/server and the port for scheduler # Define the number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num,server_num,port=9091) scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(4) scheduler.set_sample_worker_num(4)
scheduler.init_env() scheduler.init_env()
print("init env done.") print("init env done.")
......
...@@ -8,7 +8,12 @@ import paddle.fluid as fluid ...@@ -8,7 +8,12 @@ import paddle.fluid as fluid
import logging import logging
import math import math
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG) logging.basicConfig(
filename="test.log",
filemode="w",
format="%(asctime)s %(name)s:%(levelname)s:%(message)s",
datefmt="%d-%M-%Y %H:%M:%S",
level=logging.DEBUG)
trainer_id = int(sys.argv[1]) # trainer id for each guest trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config" job_path = "fl_job_config"
...@@ -17,7 +22,7 @@ job.load_trainer_job(job_path, trainer_id) ...@@ -17,7 +22,7 @@ job.load_trainer_job(job_path, trainer_id)
job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer
print(job._target_names) print(job._target_names)
trainer = FLTrainerFactory().create_fl_trainer(job) trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) trainer._current_ep = "127.0.0.1:{}".format(9000 + trainer_id)
trainer.start() trainer.start()
print(trainer._step) print(trainer._step)
test_program = trainer._main_program.clone(for_test=True) test_program = trainer._main_program.clone(for_test=True)
...@@ -26,17 +31,18 @@ img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') ...@@ -26,17 +31,18 @@ img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64') label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace()) feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace())
def train_test(train_test_program, train_test_feed, train_test_reader): def train_test(train_test_program, train_test_feed, train_test_reader):
acc_set = [] acc_set = []
for test_data in train_test_reader(): for test_data in train_test_reader():
acc_np = trainer.exe.run( acc_np = trainer.exe.run(program=train_test_program,
program=train_test_program,
feed=train_test_feed.feed(test_data), feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0"]) fetch_list=["accuracy_0.tmp_0"])
acc_set.append(float(acc_np[0])) acc_set.append(float(acc_np[0]))
acc_val_mean = numpy.array(acc_set).mean() acc_val_mean = numpy.array(acc_set).mean()
return acc_val_mean return acc_val_mean
epoch_id = 0 epoch_id = 0
step = 0 step = 0
epoch = 3000 epoch = 3000
...@@ -46,7 +52,6 @@ if count_by_step: ...@@ -46,7 +52,6 @@ if count_by_step:
else: else:
output_folder = "model_node%d_epoch" % trainer_id output_folder = "model_node%d_epoch" % trainer_id
while not trainer.stop(): while not trainer.stop():
count = 0 count = 0
epoch_id += 1 epoch_id += 1
...@@ -55,11 +60,22 @@ while not trainer.stop(): ...@@ -55,11 +60,22 @@ while not trainer.stop():
print("epoch %d start train" % (epoch_id)) print("epoch %d start train" % (epoch_id))
#train_data,test_data= data_generater(trainer_id,inner_step=trainer._step,batch_size=64,count_by_step=count_by_step) #train_data,test_data= data_generater(trainer_id,inner_step=trainer._step,batch_size=64,count_by_step=count_by_step)
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.reader.shuffle(paddle_fl.dataset.femnist.train(trainer_id,inner_step=trainer._step,batch_size=64,count_by_step=count_by_step), buf_size=500), paddle.reader.shuffle(
paddle_fl.dataset.femnist.train(
trainer_id,
inner_step=trainer._step,
batch_size=64,
count_by_step=count_by_step),
buf_size=500),
batch_size=64) batch_size=64)
test_reader = paddle.batch( test_reader = paddle.batch(
paddle_fl.dataset.femnist.test(trainer_id,inner_step=trainer._step,batch_size=64,count_by_step=count_by_step), batch_size=64) paddle_fl.dataset.femnist.test(
trainer_id,
inner_step=trainer._step,
batch_size=64,
count_by_step=count_by_step),
batch_size=64)
if count_by_step: if count_by_step:
for step_id, data in enumerate(train_reader()): for step_id, data in enumerate(train_reader()):
...@@ -71,8 +87,8 @@ while not trainer.stop(): ...@@ -71,8 +87,8 @@ while not trainer.stop():
break break
# print("acc:%.3f" % (acc[0])) # print("acc:%.3f" % (acc[0]))
else: else:
trainer.run_with_epoch(train_reader,feeder,fetch=["accuracy_0.tmp_0"],num_epoch=1) trainer.run_with_epoch(
train_reader, feeder, fetch=["accuracy_0.tmp_0"], num_epoch=1)
acc_val = train_test( acc_val = train_test(
train_test_program=test_program, train_test_program=test_program,
......
...@@ -3,6 +3,7 @@ import paddle_fl as fl ...@@ -3,6 +3,7 @@ import paddle_fl as fl
from paddle_fl.core.master.job_generator import JobGenerator from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
class Model(object): class Model(object):
def __init__(self): def __init__(self):
pass pass
...@@ -34,7 +35,8 @@ class Model(object): ...@@ -34,7 +35,8 @@ class Model(object):
size=hid_size * 3, size=hid_size * 3,
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform( initializer=fluid.initializer.Uniform(
low=init_low_bound, high=init_high_bound), low=init_low_bound,
high=init_high_bound),
learning_rate=gru_lr_x)) learning_rate=gru_lr_x))
gru_h0 = fluid.layers.dynamic_gru( gru_h0 = fluid.layers.dynamic_gru(
input=fc0, input=fc0,
...@@ -49,7 +51,8 @@ class Model(object): ...@@ -49,7 +51,8 @@ class Model(object):
act='softmax', act='softmax',
param_attr=fluid.ParamAttr( param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform( initializer=fluid.initializer.Uniform(
low=init_low_bound, high=init_high_bound), low=init_low_bound,
high=init_high_bound),
learning_rate=fc_lr_x)) learning_rate=fc_lr_x))
cost = fluid.layers.cross_entropy( cost = fluid.layers.cross_entropy(
input=self.fc, label=self.dst_wordseq) input=self.fc, label=self.dst_wordseq)
...@@ -59,7 +62,6 @@ class Model(object): ...@@ -59,7 +62,6 @@ class Model(object):
self.startup_program = fluid.default_startup_program() self.startup_program = fluid.default_startup_program()
model = Model() model = Model()
model.gru4rec_network() model.gru4rec_network()
...@@ -69,7 +71,8 @@ job_generator.set_optimizer(optimizer) ...@@ -69,7 +71,8 @@ job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss]) job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program) job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names( job_generator.set_infer_feed_and_target_names(
[model.src_wordseq.name, model.dst_wordseq.name], [model.loss.name, model.acc.name]) [model.src_wordseq.name, model.dst_wordseq.name],
[model.loss.name, model.acc.name])
build_strategy = FLStrategyFactory() build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True build_strategy.fed_avg = True
......
...@@ -3,7 +3,7 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler ...@@ -3,7 +3,7 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler
worker_num = 4 worker_num = 4
server_num = 1 server_num = 1
# Define the number of worker/server and the port for scheduler # Define the number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num,server_num,port=9091) scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(4) scheduler.set_sample_worker_num(4)
scheduler.init_env() scheduler.init_env()
print("init env done.") print("init env done.")
......
...@@ -6,7 +6,12 @@ import numpy as np ...@@ -6,7 +6,12 @@ import numpy as np
import sys import sys
import os import os
import logging import logging
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG) logging.basicConfig(
filename="test.log",
filemode="w",
format="%(asctime)s %(name)s:%(levelname)s:%(message)s",
datefmt="%d-%M-%Y %H:%M:%S",
level=logging.DEBUG)
trainer_id = int(sys.argv[1]) # trainer id for each guest trainer_id = int(sys.argv[1]) # trainer id for each guest
place = fluid.CPUPlace() place = fluid.CPUPlace()
...@@ -16,11 +21,11 @@ job = FLRunTimeJob() ...@@ -16,11 +21,11 @@ job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id) job.load_trainer_job(job_path, trainer_id)
job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer
trainer = FLTrainerFactory().create_fl_trainer(job) trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) trainer._current_ep = "127.0.0.1:{}".format(9000 + trainer_id)
trainer.start() trainer.start()
r = Gru4rec_Reader() r = Gru4rec_Reader()
train_reader = r.reader(train_file_dir, place, batch_size = 125) train_reader = r.reader(train_file_dir, place, batch_size=125)
output_folder = "model_node4" output_folder = "model_node4"
step_i = 0 step_i = 0
...@@ -30,8 +35,7 @@ while not trainer.stop(): ...@@ -30,8 +35,7 @@ while not trainer.stop():
train_step = 0 train_step = 0
for data in train_reader(): for data in train_reader():
#print(np.array(data['src_wordseq'])) #print(np.array(data['src_wordseq']))
ret_avg_cost = trainer.run(feed=data, ret_avg_cost = trainer.run(feed=data, fetch=["mean_0.tmp_0"])
fetch=["mean_0.tmp_0"])
train_step += 1 train_step += 1
if train_step == trainer._step: if train_step == trainer._step:
break break
......
...@@ -5,6 +5,7 @@ import paddle_fl as fl ...@@ -5,6 +5,7 @@ import paddle_fl as fl
from paddle_fl.core.master.job_generator import JobGenerator from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="master") parser = argparse.ArgumentParser(description="master")
parser.add_argument( parser.add_argument(
...@@ -25,7 +26,8 @@ class Model(object): ...@@ -25,7 +26,8 @@ class Model(object):
self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu') self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu')
self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu') self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu')
self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax') self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax')
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label) self.sum_cost = fluid.layers.cross_entropy(
input=self.predict, label=label)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=label) self.accuracy = fluid.layers.accuracy(input=self.predict, label=label)
self.loss = fluid.layers.reduce_mean(self.sum_cost) self.loss = fluid.layers.reduce_mean(self.sum_cost)
self.startup_program = fluid.default_startup_program() self.startup_program = fluid.default_startup_program()
...@@ -47,8 +49,8 @@ optimizer = fluid.optimizer.SGD(learning_rate=0.1) ...@@ -47,8 +49,8 @@ optimizer = fluid.optimizer.SGD(learning_rate=0.1)
job_generator.set_optimizer(optimizer) job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss]) job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program) job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names( job_generator.set_infer_feed_and_target_names([x.name for x in inputs],
[x.name for x in inputs], [model.predict.name]) [model.predict.name])
build_strategy = FLStrategyFactory() build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True build_strategy.fed_avg = True
...@@ -57,7 +59,8 @@ strategy = build_strategy.create_fl_strategy() ...@@ -57,7 +59,8 @@ strategy = build_strategy.create_fl_strategy()
# endpoints will be collected through the cluster # endpoints will be collected through the cluster
# in this example, we suppose endpoints have been collected # in this example, we suppose endpoints have been collected
server_service_ip = os.environ['FL_SERVER_SERVICE_HOST'] + ":" + os.environ['FL_SERVER_SERVICE_PORT_FL_SERVER'] server_service_ip = os.environ['FL_SERVER_SERVICE_HOST'] + ":" + os.environ[
'FL_SERVER_SERVICE_PORT_FL_SERVER']
service_endpoints = [server_service_ip] service_endpoints = [server_service_ip]
pod_endpoints = ["0.0.0.0:8181"] pod_endpoints = ["0.0.0.0:8181"]
output = "fl_job_config" output = "fl_job_config"
...@@ -68,4 +71,8 @@ num_trainer = args.trainer_num ...@@ -68,4 +71,8 @@ num_trainer = args.trainer_num
# fl_job_config will be dispatched to workers # fl_job_config will be dispatched to workers
job_generator.generate_fl_job_for_k8s( job_generator.generate_fl_job_for_k8s(
strategy, server_pod_endpoints=pod_endpoints,server_service_endpoints=service_endpoints, worker_num=2, output=output) strategy,
server_pod_endpoints=pod_endpoints,
server_service_endpoints=service_endpoints,
worker_num=2,
output=output)
import argparse import argparse
from paddle_fl.core.scheduler.agent_master import FLScheduler from paddle_fl.core.scheduler.agent_master import FLScheduler
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="scheduler") parser = argparse.ArgumentParser(description="scheduler")
parser.add_argument( parser.add_argument(
...@@ -11,12 +12,13 @@ def parse_args(): ...@@ -11,12 +12,13 @@ def parse_args():
return parser.parse_args() return parser.parse_args()
args = parse_args() args = parse_args()
num_trainer = args.trainer_num num_trainer = args.trainer_num
worker_num = num_trainer worker_num = num_trainer
server_num = 1 server_num = 1
# Define the number of worker/server and the port for scheduler # Define the number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num,server_num,port=9091) scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(worker_num) scheduler.set_sample_worker_num(worker_num)
scheduler.init_env() scheduler.init_env()
print("init env done.") print("init env done.")
......
...@@ -23,10 +23,12 @@ server_id = 0 ...@@ -23,10 +23,12 @@ server_id = 0
job_path = "fl_job_config" job_path = "fl_job_config"
job = FLRunTimeJob() job = FLRunTimeJob()
job.load_server_job(job_path, server_id) job.load_server_job(job_path, server_id)
job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ['FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER']# IP address for scheduler job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ[
'FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER'] # IP address for scheduler
#job._endpoints = os.environ['POD_IP'] + ":" + os.environ['FL_SERVER_SERVICE_PORT_FL_SERVER'] # IP address for server #job._endpoints = os.environ['POD_IP'] + ":" + os.environ['FL_SERVER_SERVICE_PORT_FL_SERVER'] # IP address for server
server.set_server_job(job) server.set_server_job(job)
server._current_ep = os.environ['FL_SERVER_SERVICE_HOST'] + ":" + os.environ['FL_SERVER_SERVICE_PORT_FL_SERVER'] # IP address for server server._current_ep = os.environ['FL_SERVER_SERVICE_HOST'] + ":" + os.environ[
print(job._scheduler_ep,server._current_ep) 'FL_SERVER_SERVICE_PORT_FL_SERVER'] # IP address for server
print(job._scheduler_ep, server._current_ep)
server.start() server.start()
print("connect") print("connect")
...@@ -5,7 +5,12 @@ import sys ...@@ -5,7 +5,12 @@ import sys
import os import os
import logging import logging
import time import time
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG) logging.basicConfig(
filename="test.log",
filemode="w",
format="%(asctime)s %(name)s:%(levelname)s:%(message)s",
datefmt="%d-%M-%Y %H:%M:%S",
level=logging.DEBUG)
def reader(): def reader():
...@@ -16,15 +21,18 @@ def reader(): ...@@ -16,15 +21,18 @@ def reader():
data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64') data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64')
yield data_dict yield data_dict
trainer_id = int(sys.argv[1]) # trainer id for each guest trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config" job_path = "fl_job_config"
job = FLRunTimeJob() job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id) job.load_trainer_job(job_path, trainer_id)
#job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer #job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer
job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ['FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER'] job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ[
'FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER']
trainer = FLTrainerFactory().create_fl_trainer(job) trainer = FLTrainerFactory().create_fl_trainer(job)
#trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) #trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id)
trainer._current_ep = os.environ['TRAINER0_SERVICE_HOST'] + ":" + os.environ['TRAINER0_SERVICE_PORT_TRAINER0'] trainer._current_ep = os.environ['TRAINER0_SERVICE_HOST'] + ":" + os.environ[
'TRAINER0_SERVICE_PORT_TRAINER0']
trainer.start() trainer.start()
print(trainer._scheduler_ep, trainer._current_ep) print(trainer._scheduler_ep, trainer._current_ep)
output_folder = "fl_model" output_folder = "fl_model"
...@@ -40,4 +48,3 @@ while not trainer.stop(): ...@@ -40,4 +48,3 @@ while not trainer.stop():
epoch_id += 1 epoch_id += 1
if epoch_id % 5 == 0: if epoch_id % 5 == 0:
trainer.save_inference_program(output_folder) trainer.save_inference_program(output_folder)
...@@ -5,7 +5,12 @@ import sys ...@@ -5,7 +5,12 @@ import sys
import os import os
import logging import logging
import time import time
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG) logging.basicConfig(
filename="test.log",
filemode="w",
format="%(asctime)s %(name)s:%(levelname)s:%(message)s",
datefmt="%d-%M-%Y %H:%M:%S",
level=logging.DEBUG)
def reader(): def reader():
...@@ -16,15 +21,18 @@ def reader(): ...@@ -16,15 +21,18 @@ def reader():
data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64') data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64')
yield data_dict yield data_dict
trainer_id = int(sys.argv[1]) # trainer id for each guest trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config" job_path = "fl_job_config"
job = FLRunTimeJob() job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id) job.load_trainer_job(job_path, trainer_id)
#job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer #job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer
job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ['FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER'] job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ[
'FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER']
trainer = FLTrainerFactory().create_fl_trainer(job) trainer = FLTrainerFactory().create_fl_trainer(job)
#trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) #trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id)
trainer._current_ep = os.environ['TRAINER1_SERVICE_HOST'] + ":" + os.environ['TRAINER1_SERVICE_PORT_TRAINER1'] trainer._current_ep = os.environ['TRAINER1_SERVICE_HOST'] + ":" + os.environ[
'TRAINER1_SERVICE_PORT_TRAINER1']
trainer.start() trainer.start()
print(trainer._scheduler_ep, trainer._current_ep) print(trainer._scheduler_ep, trainer._current_ep)
output_folder = "fl_model" output_folder = "fl_model"
...@@ -40,4 +48,3 @@ while not trainer.stop(): ...@@ -40,4 +48,3 @@ while not trainer.stop():
epoch_id += 1 epoch_id += 1
if epoch_id % 5 == 0: if epoch_id % 5 == 0:
trainer.save_inference_program(output_folder) trainer.save_inference_program(output_folder)
...@@ -3,6 +3,7 @@ import paddle_fl as fl ...@@ -3,6 +3,7 @@ import paddle_fl as fl
from paddle_fl.core.master.job_generator import JobGenerator from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
class Model(object): class Model(object):
def __init__(self): def __init__(self):
pass pass
...@@ -14,12 +15,17 @@ class Model(object): ...@@ -14,12 +15,17 @@ class Model(object):
param_attrs = fluid.ParamAttr( param_attrs = fluid.ParamAttr(
name="fc_0.w_0", name="fc_0.w_0",
initializer=fluid.initializer.ConstantInitializer(0.0)) initializer=fluid.initializer.ConstantInitializer(0.0))
self.predict = fluid.layers.fc(input=inputs, size=10, act='softmax', param_attr=param_attrs) self.predict = fluid.layers.fc(input=inputs,
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label) size=10,
act='softmax',
param_attr=param_attrs)
self.sum_cost = fluid.layers.cross_entropy(
input=self.predict, label=label)
self.loss = fluid.layers.mean(self.sum_cost) self.loss = fluid.layers.mean(self.sum_cost)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=label) self.accuracy = fluid.layers.accuracy(input=self.predict, label=label)
self.startup_program = fluid.default_startup_program() self.startup_program = fluid.default_startup_program()
inputs = fluid.layers.data(name='x', shape=[1, 28, 28], dtype='float32') inputs = fluid.layers.data(name='x', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='y', shape=[1], dtype='int64') label = fluid.layers.data(name='y', shape=[1], dtype='int64')
...@@ -31,15 +37,16 @@ optimizer = fluid.optimizer.SGD(learning_rate=0.01) ...@@ -31,15 +37,16 @@ optimizer = fluid.optimizer.SGD(learning_rate=0.01)
job_generator.set_optimizer(optimizer) job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss]) job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program) job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names( job_generator.set_infer_feed_and_target_names([inputs.name, label.name],
[inputs.name, label.name], [model.loss.name]) [model.loss.name])
build_strategy = FLStrategyFactory() build_strategy = FLStrategyFactory()
#build_strategy.fed_avg = True #build_strategy.fed_avg = True
build_strategy.sec_agg = True build_strategy.sec_agg = True
param_name_list = [] param_name_list = []
param_name_list.append("fc_0.w_0.opti.trainer_") # need trainer_id when running param_name_list.append(
"fc_0.w_0.opti.trainer_") # need trainer_id when running
param_name_list.append("fc_0.b_0.opti.trainer_") param_name_list.append("fc_0.b_0.opti.trainer_")
build_strategy.param_name_list = param_name_list build_strategy.param_name_list = param_name_list
......
...@@ -3,7 +3,7 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler ...@@ -3,7 +3,7 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler
worker_num = 2 worker_num = 2
server_num = 1 server_num = 1
scheduler = FLScheduler(worker_num,server_num,port=9091) scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(worker_num) scheduler.set_sample_worker_num(worker_num)
scheduler.init_env() scheduler.init_env()
print("init env done.") print("init env done.")
......
...@@ -21,4 +21,3 @@ server=yq01-hpc-lvliang01-smart-master.dmop.baidu.com ...@@ -21,4 +21,3 @@ server=yq01-hpc-lvliang01-smart-master.dmop.baidu.com
python_tar=./python.tar.gz python_tar=./python.tar.gz
wheel=./paddlepaddle-0.0.0-cp27-cp27mu-linux_x86_64.whl wheel=./paddlepaddle-0.0.0-cp27-cp27mu-linux_x86_64.whl
import paddle.fluid as fluid import paddle.fluid as fluid
class Model(object): class Model(object):
def __init__(self): def __init__(self):
pass pass
...@@ -9,8 +10,8 @@ class Model(object): ...@@ -9,8 +10,8 @@ class Model(object):
self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu') self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu')
self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu') self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu')
self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax') self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax')
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label) self.sum_cost = fluid.layers.cross_entropy(
input=self.predict, label=label)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=label) self.accuracy = fluid.layers.accuracy(input=self.predict, label=label)
self.loss = fluid.layers.reduce_mean(self.sum_cost) self.loss = fluid.layers.reduce_mean(self.sum_cost)
self.startup_program = fluid.default_startup_program() self.startup_program = fluid.default_startup_program()
...@@ -49,6 +49,7 @@ default_dict = { ...@@ -49,6 +49,7 @@ default_dict = {
"wheel": "./paddlepaddle-0.0.0-cp27-cp27mu-linux_x86_64-0.whl" "wheel": "./paddlepaddle-0.0.0-cp27-cp27mu-linux_x86_64-0.whl"
} }
def load_conf(conf_file, local_dict): def load_conf(conf_file, local_dict):
with open(conf_file) as fin: with open(conf_file) as fin:
for line in fin: for line in fin:
...@@ -58,6 +59,7 @@ def load_conf(conf_file, local_dict): ...@@ -58,6 +59,7 @@ def load_conf(conf_file, local_dict):
local_dict[group[0]] = group[1] local_dict[group[0]] = group[1]
return local_dict return local_dict
client = HPCClient() client = HPCClient()
default_dict = load_conf(sys.argv[1], default_dict) default_dict = load_conf(sys.argv[1], default_dict)
...@@ -94,9 +96,11 @@ all_ips_ready = False ...@@ -94,9 +96,11 @@ all_ips_ready = False
ip_list = [] ip_list = []
scheduler = FLScheduler(int(default_dict["worker_nodes"]), scheduler = FLScheduler(
int(default_dict["worker_nodes"]),
int(default_dict["server_nodes"]), int(default_dict["server_nodes"]),
port=random_port, socket=zmq_socket) port=random_port,
socket=zmq_socket)
scheduler.set_sample_worker_num(int(default_dict["worker_nodes"])) scheduler.set_sample_worker_num(int(default_dict["worker_nodes"]))
...@@ -124,9 +128,11 @@ for i in range(len(ip_list)): ...@@ -124,9 +128,11 @@ for i in range(len(ip_list)):
if i < int(default_dict["server_nodes"]): if i < int(default_dict["server_nodes"]):
ip_role[ip_list[i]] = 'server%d' % i ip_role[ip_list[i]] = 'server%d' % i
else: else:
ip_role[ip_list[i]] = 'trainer%d' % (i-int(default_dict["server_nodes"])) ip_role[ip_list[i]] = 'trainer%d' % (
i - int(default_dict["server_nodes"]))
print(ip_role) print(ip_role)
def job_generate(): def job_generate():
#generate a fl job which is the same as fl_master #generate a fl job which is the same as fl_master
inputs = [fluid.layers.data( \ inputs = [fluid.layers.data( \
...@@ -146,8 +152,8 @@ def job_generate(): ...@@ -146,8 +152,8 @@ def job_generate():
job_generator.set_optimizer(optimizer) job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss]) job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program) job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names( job_generator.set_infer_feed_and_target_names([x.name for x in inputs],
[x.name for x in inputs], [model.predict.name]) [model.predict.name])
build_strategy = FLStrategyFactory() build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True build_strategy.fed_avg = True
...@@ -160,17 +166,21 @@ def job_generate(): ...@@ -160,17 +166,21 @@ def job_generate():
output = "job_config" output = "job_config"
job_generator.generate_fl_job( job_generator.generate_fl_job(
strategy, server_endpoints=server_ip, worker_num=int(default_dict["worker_nodes"]), output=output) strategy,
server_endpoints=server_ip,
worker_num=int(default_dict["worker_nodes"]),
output=output)
file_list = os.listdir(output) file_list = os.listdir(output)
for file in file_list: for file in file_list:
tar = tarfile.open('{}/{}.tar.gz'.format(output,file),'w:gz') tar = tarfile.open('{}/{}.tar.gz'.format(output, file), 'w:gz')
for root,dir,files in os.walk("{}/{}".format(output,file)): for root, dir, files in os.walk("{}/{}".format(output, file)):
for f in files: for f in files:
fullpath = os.path.join(root,f) fullpath = os.path.join(root, f)
tar.add(fullpath) tar.add(fullpath)
tar.close() tar.close()
job_generate() job_generate()
#send the allocated rolls to the remote endpoints #send the allocated rolls to the remote endpoints
......
...@@ -13,7 +13,6 @@ import sys ...@@ -13,7 +13,6 @@ import sys
import logging import logging
import time import time
random_port = 60001 random_port = 60001
scheduler_conf = {} scheduler_conf = {}
...@@ -31,8 +30,7 @@ download_url = "{}:8080".format(scheduler_ip[0]) ...@@ -31,8 +30,7 @@ download_url = "{}:8080".format(scheduler_ip[0])
print(download_url) print(download_url)
context = zmq.Context() context = zmq.Context()
zmq_socket = context.socket(zmq.REQ) zmq_socket = context.socket(zmq.REQ)
zmq_socket.connect( zmq_socket.connect("tcp://{}".format(scheduler_conf["ENDPOINT"]))
"tcp://{}".format(scheduler_conf["ENDPOINT"]))
zmq_socket.send("ENDPOINT\t{}".format(endpoint)) zmq_socket.send("ENDPOINT\t{}".format(endpoint))
message = zmq_socket.recv() message = zmq_socket.recv()
print(message) print(message)
...@@ -47,7 +45,7 @@ while True: ...@@ -47,7 +45,7 @@ while True:
if group[0] == "WAIT": if group[0] == "WAIT":
continue continue
else: else:
os.system("wget {}/job_config/{}.tar.gz".format(download_url,message)) os.system("wget {}/job_config/{}.tar.gz".format(download_url, message))
print(message) print(message)
break break
...@@ -71,6 +69,7 @@ if 'server' in message: ...@@ -71,6 +69,7 @@ if 'server' in message:
server._current_ep = endpoint server._current_ep = endpoint
server.start() server.start()
else: else:
def reader(): def reader():
for i in range(1000): for i in range(1000):
data_dict = {} data_dict = {}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册