提交 f54729cb 编写于 作者: Q qjing666
......@@ -18,4 +18,3 @@ from . import common
from . import core
from . import dataset
from . import reader
......@@ -11,5 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
......@@ -181,7 +181,7 @@ class FLRunTimeJob(FLJobBase):
self._server_main_program = None
self._feed_names = None
self._target_names = None
self._scheduler_ep = None
self._scheduler_ep = None
def _load_strategy(self, input_file):
import pickle
......@@ -206,7 +206,7 @@ class FLRunTimeJob(FLJobBase):
main_fn = "%s/trainer.main.program" % folder_name
self._trainer_main_program = self._load_program(main_fn)
try:
send_fn = "%s/trainer.send.program" % folder_name
self._trainer_send_program = self._load_program(send_fn)
......@@ -233,7 +233,7 @@ class FLRunTimeJob(FLJobBase):
Load server job given training folder and server_id
Currently, a server_id is assigned to a server node, and
corresponding FL Job will be sent to the server node.
Args:
folder(str): FL Job folder name
server_id(int): server index for current job
......
......@@ -115,7 +115,7 @@ class JobGenerator(object):
server_endpoints=server_endpoints,
worker_num=1,
output=output)
"""
local_job = FLCompileTimeJob()
assert len(self._losses) > 0
......@@ -146,4 +146,3 @@ class JobGenerator(object):
local_job.set_target_names(self._target_names)
local_job.set_strategy(fl_strategy)
local_job.save(output)
......@@ -50,8 +50,8 @@ class FLWorkerAgent(object):
key, value = recv_and_parse_kv(self.socket)
if key == "WAIT":
time.sleep(3)
return True
return False
return True
return False
def can_join_training(self):
self.socket.send("JOIN\t{}".format(self.current_ep))
......@@ -104,10 +104,10 @@ class FLScheduler(object):
def start_fl_training(self):
# loop until training is done
loop = 0
loop = 0
while True:
if loop <= 1:
print(loop)
if loop <= 1:
print(loop)
random.shuffle(self.fl_workers)
worker_dict = {}
for worker in self.fl_workers[:self.sample_worker_num]:
......@@ -125,8 +125,8 @@ class FLScheduler(object):
self.socket.send("ACCEPT\t0")
continue
else:
if value not in ready_workers:
ready_workers.append(value)
if value not in ready_workers:
ready_workers.append(value)
self.socket.send("REJECT\t0")
if len(ready_workers) == len(self.fl_workers):
all_ready_to_train = True
......@@ -135,7 +135,7 @@ class FLScheduler(object):
finish_training_dict = {}
while not all_finish_training:
key, value = recv_and_parse_kv(self.socket)
if key == "FINISH":
if key == "FINISH":
finish_training_dict[value] = 1
self.socket.send("WAIT\t0")
else:
......@@ -143,4 +143,4 @@ class FLScheduler(object):
if len(finish_training_dict) == len(worker_dict):
all_finish_training = True
time.sleep(5)
loop += 1
loop += 1
......@@ -19,8 +19,8 @@ class FLServer(object):
def __init__(self):
self._startup_program = None
self._main_program = None
self._scheduler_ep = None
self._current_ep = None
self._scheduler_ep = None
self._current_ep = None
def set_server_job(self, job):
# need to parse startup and main program in job
......@@ -28,12 +28,12 @@ class FLServer(object):
# need to parse master endpoint
self._startup_program = job._server_startup_program
self._main_program = job._server_main_program
self._scheduler_ep = job._scheduler_ep
self._current_ep = None
self._scheduler_ep = job._scheduler_ep
self._current_ep = None
def start(self):
self.agent = FLServerAgent(self._scheduler_ep, self._current_ep)
self.agent.connect_scheduler()
self.agent = FLServerAgent(self._scheduler_ep, self._current_ep)
self.agent.connect_scheduler()
exe = fluid.Executor(fluid.CPUPlace())
exe.run(self._startup_program)
exe.run(self._main_program)
......@@ -60,7 +60,7 @@ class FLDistributeTranspiler(object):
Convert the fluid program to distributed data-parallelism programs.
In pserver mode, the trainers' main program do forward, backward and optimizaiton.
In pserver mode, the trainers' main program do forward, backward and optimizaiton.
pserver's main_program will sum and scale.
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .fl_distribute_transpiler import FLDistributeTranspiler
from .fl_distribute_transpiler import FLDistributeTranspiler
from paddle.fluid.optimizer import SGD
import paddle.fluid as fluid
......@@ -258,8 +258,8 @@ class FedAvgStrategy(FLStrategyBase):
class SecAggStrategy(FedAvgStrategy):
"""
DPSGDStrategy: this is model averaging optimization proposed in
Aaron Segal, Antonio Marcedone, Benjamin Kreuter, et al.
Practical Secure Aggregation for Privacy-Preserving Machine Learning,
Aaron Segal, Antonio Marcedone, Benjamin Kreuter, et al.
Practical Secure Aggregation for Privacy-Preserving Machine Learning,
The 24th ACM Conference on Computer and Communications Security ( CCS2017 ).
"""
def __init__(self):
......@@ -273,5 +273,3 @@ class SecAggStrategy(FedAvgStrategy):
@param_name_list.setter
def param_name_list(self, s):
self._param_name_list = s
# coding=utf-8
#
#
# (c) Chris von Csefalvay, 2015.
"""
......
......@@ -59,7 +59,7 @@ class DiffieHellman:
self.key_length = max(200, key_length)
self.generator = PRIMES[group]["generator"]
self.prime = PRIMES[group]["prime"]
def load_private_key(self, priv_key_filepath="priv_key.txt"):
f = open(priv_key_filepath, "r")
self.private_key = int(f.read())
......
# coding=utf-8
#
#
# (c) Chris von Csefalvay, 2015.
"""
......@@ -23,4 +23,4 @@ class RNGError(BaseException):
"""
def __str__(self):
return "RNG could not be obtained. This module currently only works with Python 3."
\ No newline at end of file
return "RNG could not be obtained. This module currently only works with Python 3."
......@@ -16,7 +16,7 @@ import logging
from paddle_fl.core.scheduler.agent_master import FLWorkerAgent
import numpy
import hmac
from .diffiehellman.diffiehellman import DiffieHellman
#from .diffiehellman.diffiehellman import DiffieHellman
class FLTrainerFactory(object):
def __init__(self):
......@@ -52,8 +52,8 @@ class FLTrainer(object):
self._feed_names = job._feed_names
self._target_names = job._target_names
self._scheduler_ep = job._scheduler_ep
self._current_ep = None
self.cur_step = 0
self._current_ep = None
self.cur_step = 0
def start(self):
#current_ep = "to be added"
......@@ -68,7 +68,7 @@ class FLTrainer(object):
feed=feed,
fetch_list=fetch)
self._logger.debug("end to run current batch")
self.cur_step += 1
self.cur_step += 1
def save_inference_program(self, output_folder):
target_vars = []
......@@ -87,15 +87,15 @@ class FLTrainer(object):
# ask for termination with master endpoint
# currently not open sourced, will release the code later
# TODO(guru4elephant): add connection with master
if self.cur_step != 0:
while not self.agent.finish_training():
print('wait others finish')
continue
if self.cur_step != 0:
while not self.agent.finish_training():
print('wait others finish')
continue
while not self.agent.can_join_training():
print("wait permit")
continue
print("ready to train")
return False
print("wait permit")
continue
print("ready to train")
return False
class FedAvgTrainer(FLTrainer):
......@@ -104,9 +104,9 @@ class FedAvgTrainer(FLTrainer):
pass
def start(self):
#current_ep = "to be added"
#current_ep = "to be added"
self.agent = FLWorkerAgent(self._scheduler_ep, self._current_ep)
self.agent.connect_scheduler()
self.agent.connect_scheduler()
self.exe = fluid.Executor(fluid.CPUPlace())
self.exe.run(self._startup_program)
......@@ -119,18 +119,18 @@ class FedAvgTrainer(FLTrainer):
self.cur_step = 0
def run_with_epoch(self,reader,feeder,fetch,num_epoch):
self._logger.debug("begin to run recv program")
self._logger.debug("begin to run recv program")
self.exe.run(self._recv_program)
epoch = 0
for i in range(num_epoch):
print(epoch)
for data in reader():
self.exe.run(self._main_program,
feed=feeder.feed(data),
epoch = 0
for i in range(num_epoch):
print(epoch)
for data in reader():
self.exe.run(self._main_program,
feed=feeder.feed(data),
fetch_list=fetch)
self.cur_step += 1
epoch += 1
self._logger.debug("begin to run send program")
self.cur_step += 1
epoch += 1
self._logger.debug("begin to run send program")
self.exe.run(self._send_program)
def run(self, feed, fetch):
self._logger.debug("begin to run FedAvgTrainer, cur_step=%d, inner_step=%d" %
......@@ -139,7 +139,7 @@ class FedAvgTrainer(FLTrainer):
self._logger.debug("begin to run recv program")
self.exe.run(self._recv_program)
self._logger.debug("begin to run current step")
loss = self.exe.run(self._main_program,
loss = self.exe.run(self._main_program,
feed=feed,
fetch_list=fetch)
if self.cur_step % self._step == 0:
......@@ -149,6 +149,9 @@ class FedAvgTrainer(FLTrainer):
return loss
class SecAggTrainer(FLTrainer):
def __init__(self):
super(SecAggTrainer, self).__init__()
......@@ -209,7 +212,7 @@ class SecAggTrainer(FLTrainer):
self.exe.run(self._recv_program)
scope = fluid.global_scope()
self._logger.debug("begin to run current step")
loss = self.exe.run(self._main_program,
loss = self.exe.run(self._main_program,
feed=feed,
fetch_list=fetch)
if self.cur_step % self._step == 0:
......@@ -244,4 +247,3 @@ class SecAggTrainer(FLTrainer):
def stop(self):
return False
......@@ -38,7 +38,7 @@ job_generator.set_infer_feed_and_target_names(
[x.name for x in inputs], [model.predict.name])
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.fed_avg = True
build_strategy.inner_step = 10
strategy = build_strategy.create_fl_strategy()
......
......@@ -30,10 +30,10 @@ while not trainer.stop():
print("batch %d start train" % (step_i))
train_step = 0
for data in reader():
trainer.run(feed=data, fetch=[])
train_step += 1
if train_step == trainer._step:
break
trainer.run(feed=data, fetch=[])
train_step += 1
if train_step == trainer._step:
break
step_i += 1
if step_i % 100 == 0:
trainer.save_inference_program(output_folder)
......
......@@ -2,7 +2,7 @@ import paddle.fluid as fluid
import paddle_fl as fl
from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
import math
import math
class Model(object):
def __init__(self):
......
......@@ -31,15 +31,15 @@ label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace())
def train_test(train_test_program, train_test_feed, train_test_reader):
acc_set = []
for test_data in train_test_reader():
acc_np = trainer.exe.run(
program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0"])
acc_set.append(float(acc_np[0]))
acc_val_mean = numpy.array(acc_set).mean()
return acc_val_mean
acc_set = []
for test_data in train_test_reader():
acc_np = trainer.exe.run(
program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0"])
acc_set.append(float(acc_np[0]))
acc_val_mean = numpy.array(acc_set).mean()
return acc_val_mean
def compute_privacy_budget(sample_ratio, epsilon, step, delta):
E = 2 * epsilon * math.sqrt(step * sample_ratio)
......@@ -57,7 +57,7 @@ while not trainer.stop():
acc = trainer.run(feeder.feed(data), fetch=["accuracy_0.tmp_0"])
step += 1
# print("acc:%.3f" % (acc[0]))
acc_val = train_test(
train_test_program=test_program,
train_test_reader=test_reader,
......@@ -65,8 +65,6 @@ while not trainer.stop():
print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val))
compute_privacy_budget(sample_ratio=0.001, epsilon=0.1, step=step, delta=0.00001)
save_dir = (output_folder + "/epoch_%d") % epoch_id
trainer.save_inference_program(output_folder)
......@@ -33,8 +33,8 @@ while not trainer.stop():
ret_avg_cost = trainer.run(feed=data,
fetch=["mean_0.tmp_0"])
train_step += 1
if train_step == trainer._step:
break
if train_step == trainer._step:
break
avg_ppl = np.exp(ret_avg_cost[0])
newest_ppl = np.mean(avg_ppl)
print("ppl:%.3f" % (newest_ppl))
......
......@@ -9,10 +9,10 @@ class Model(object):
def linear_regression(self, inputs, label):
param_attrs = fluid.ParamAttr(
name="fc_0.b_0",
name="fc_0.b_0",
initializer=fluid.initializer.ConstantInitializer(0.0))
param_attrs = fluid.ParamAttr(
name="fc_0.w_0",
name="fc_0.w_0",
initializer=fluid.initializer.ConstantInitializer(0.0))
self.predict = fluid.layers.fc(input=inputs, size=10, act='softmax', param_attr=param_attrs)
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label)
......@@ -40,7 +40,7 @@ build_strategy = FLStrategyFactory()
build_strategy.sec_agg = True
param_name_list = []
param_name_list.append("fc_0.w_0.opti.trainer_") # need trainer_id when running
param_name_list.append("fc_0.b_0.opti.trainer_")
param_name_list.append("fc_0.b_0.opti.trainer_")
build_strategy.param_name_list = param_name_list
build_strategy.inner_step = 10
......
......@@ -20,7 +20,7 @@ train_reader = paddle.batch(
paddle.reader.shuffle(paddle.dataset.mnist.train(), buf_size=500),
batch_size=BATCH_SIZE)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=BATCH_SIZE)
paddle.dataset.mnist.test(), batch_size=BATCH_SIZE)
trainer_num = 2
trainer_id = int(sys.argv[1]) # trainer id for each guest
......@@ -68,10 +68,10 @@ while not trainer.stop():
for data in train_reader():
step_i += 1
trainer.step_id = step_i
accuracy, = trainer.run(feed=feeder.feed(data),
fetch=["accuracy_0.tmp_0"])
accuracy, = trainer.run(feed=feeder.feed(data),
fetch=["accuracy_0.tmp_0"])
if step_i % 100 == 0:
print("Epoch: {0}, step: {1}, accuracy: {2}".format(epoch_id, step_i, accuracy[0]))
print("Epoch: {0}, step: {1}, accuracy: {2}".format(epoch_id, step_i, accuracy[0]))
avg_loss_val, acc_val = train_test(train_test_program=test_program,
train_test_reader=test_reader,
......
......@@ -71,4 +71,3 @@ setup(
],
license='Apache 2.0',
keywords=('paddle_fl paddlepaddle multi-task transfer distributed-training'))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册