diff --git a/paddle_fl/__init__.py b/paddle_fl/__init__.py index 1fd2f81d3034b3fbc3271cad25e4f18de8af3a3d..c2585de444569fa3a12f6866bc25b372f78fb1da 100644 --- a/paddle_fl/__init__.py +++ b/paddle_fl/__init__.py @@ -18,5 +18,3 @@ from . import common from . import core from . import dataset from . import reader - - diff --git a/paddle_fl/core/master/__init__.py b/paddle_fl/core/master/__init__.py index 30dd0b60c6adcef5b7f88396753961e846818602..33ed0ecf10ec4cad807ebb6df1590de65eeeab1e 100644 --- a/paddle_fl/core/master/__init__.py +++ b/paddle_fl/core/master/__init__.py @@ -11,5 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - diff --git a/paddle_fl/core/master/fl_job.py b/paddle_fl/core/master/fl_job.py index 6fcd8fc9bb45480e3fdf35258ad1cf6f45bb6732..6f0faf12d29389cfa43ed61a966540a61ff2ed5f 100644 --- a/paddle_fl/core/master/fl_job.py +++ b/paddle_fl/core/master/fl_job.py @@ -176,7 +176,7 @@ class FLRunTimeJob(FLJobBase): self._server_main_program = None self._feed_names = None self._target_names = None - self._scheduler_ep = None + self._scheduler_ep = None def _load_strategy(self, input_file): import pickle @@ -201,7 +201,7 @@ class FLRunTimeJob(FLJobBase): main_fn = "%s/trainer.main.program" % folder_name self._trainer_main_program = self._load_program(main_fn) - + try: send_fn = "%s/trainer.send.program" % folder_name self._trainer_send_program = self._load_program(send_fn) @@ -228,7 +228,7 @@ class FLRunTimeJob(FLJobBase): Load server job given training folder and server_id Currently, a server_id is assigned to a server node, and corresponding FL Job will be sent to the server node. - + Args: folder(str): FL Job folder name server_id(int): server index for current job diff --git a/paddle_fl/core/master/job_generator.py b/paddle_fl/core/master/job_generator.py index 1ddd0d87257126b0fc98cf4d103f77379c197a74..e4291241a864c354c1a549ef07acb7652ec377dc 100644 --- a/paddle_fl/core/master/job_generator.py +++ b/paddle_fl/core/master/job_generator.py @@ -115,7 +115,7 @@ class JobGenerator(object): server_endpoints=server_endpoints, worker_num=1, output=output) - + """ local_job = FLCompileTimeJob() assert len(self._losses) > 0 @@ -146,4 +146,3 @@ class JobGenerator(object): local_job.set_target_names(self._target_names) local_job.set_strategy(fl_strategy) local_job.save(output) - diff --git a/paddle_fl/core/scheduler/agent_master.py b/paddle_fl/core/scheduler/agent_master.py index d012f7c34308167f18da61ebfd424d57854b6e73..c8868a88d15b87a227d785855ee0b570135ca4ce 100644 --- a/paddle_fl/core/scheduler/agent_master.py +++ b/paddle_fl/core/scheduler/agent_master.py @@ -43,8 +43,8 @@ class FLWorkerAgent(object): key, value = recv_and_parse_kv(self.socket) if key == "WAIT": time.sleep(3) - return True - return False + return True + return False def can_join_training(self): self.socket.send("JOIN\t{}".format(self.current_ep)) @@ -94,10 +94,10 @@ class FLScheduler(object): def start_fl_training(self): # loop until training is done - loop = 0 + loop = 0 while True: - if loop <= 1: - print(loop) + if loop <= 1: + print(loop) random.shuffle(self.fl_workers) worker_dict = {} for worker in self.fl_workers[:self.sample_worker_num]: @@ -115,8 +115,8 @@ class FLScheduler(object): self.socket.send("ACCEPT\t0") continue else: - if value not in ready_workers: - ready_workers.append(value) + if value not in ready_workers: + ready_workers.append(value) self.socket.send("REJECT\t0") if len(ready_workers) == len(self.fl_workers): all_ready_to_train = True @@ -125,7 +125,7 @@ class FLScheduler(object): finish_training_dict = {} while not all_finish_training: key, value = recv_and_parse_kv(self.socket) - if key == "FINISH": + if key == "FINISH": finish_training_dict[value] = 1 self.socket.send("WAIT\t0") else: @@ -133,4 +133,4 @@ class FLScheduler(object): if len(finish_training_dict) == len(worker_dict): all_finish_training = True time.sleep(5) - loop += 1 + loop += 1 diff --git a/paddle_fl/core/server/fl_server.py b/paddle_fl/core/server/fl_server.py index c97e51d92660b293196f4f87893a5e021f73d2bc..f8fc1922da8b9ebe2a5abe68782a6f32cf272989 100644 --- a/paddle_fl/core/server/fl_server.py +++ b/paddle_fl/core/server/fl_server.py @@ -19,8 +19,8 @@ class FLServer(object): def __init__(self): self._startup_program = None self._main_program = None - self._scheduler_ep = None - self._current_ep = None + self._scheduler_ep = None + self._current_ep = None def set_server_job(self, job): # need to parse startup and main program in job @@ -28,12 +28,12 @@ class FLServer(object): # need to parse master endpoint self._startup_program = job._server_startup_program self._main_program = job._server_main_program - self._scheduler_ep = job._scheduler_ep - self._current_ep = None + self._scheduler_ep = job._scheduler_ep + self._current_ep = None def start(self): - self.agent = FLServerAgent(self._scheduler_ep, self._current_ep) - self.agent.connect_scheduler() + self.agent = FLServerAgent(self._scheduler_ep, self._current_ep) + self.agent.connect_scheduler() exe = fluid.Executor(fluid.CPUPlace()) exe.run(self._startup_program) exe.run(self._main_program) diff --git a/paddle_fl/core/strategy/fl_distribute_transpiler.py b/paddle_fl/core/strategy/fl_distribute_transpiler.py index 46f59987e04057e164cb1c509227e6c9bd682c59..6c5667e404193061f4c1e79d0ee79539974944b7 100644 --- a/paddle_fl/core/strategy/fl_distribute_transpiler.py +++ b/paddle_fl/core/strategy/fl_distribute_transpiler.py @@ -60,7 +60,7 @@ class FLDistributeTranspiler(object): Convert the fluid program to distributed data-parallelism programs. - In pserver mode, the trainers' main program do forward, backward and optimizaiton. + In pserver mode, the trainers' main program do forward, backward and optimizaiton. pserver's main_program will sum and scale. diff --git a/paddle_fl/core/strategy/fl_strategy_base.py b/paddle_fl/core/strategy/fl_strategy_base.py index 42425701a2e993ea631939b51c369ac9099cfd4e..14e97ce346f4c67dbd7eb5ee34e6a12f2bdd866d 100644 --- a/paddle_fl/core/strategy/fl_strategy_base.py +++ b/paddle_fl/core/strategy/fl_strategy_base.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .fl_distribute_transpiler import FLDistributeTranspiler +from .fl_distribute_transpiler import FLDistributeTranspiler from paddle.fluid.optimizer import SGD import paddle.fluid as fluid @@ -258,8 +258,8 @@ class FedAvgStrategy(FLStrategyBase): class SecAggStrategy(FedAvgStrategy): """ DPSGDStrategy: this is model averaging optimization proposed in - Aaron Segal, Antonio Marcedone, Benjamin Kreuter, et al. - Practical Secure Aggregation for Privacy-Preserving Machine Learning, + Aaron Segal, Antonio Marcedone, Benjamin Kreuter, et al. + Practical Secure Aggregation for Privacy-Preserving Machine Learning, The 24th ACM Conference on Computer and Communications Security ( CCS2017 ). """ def __init__(self): @@ -273,5 +273,3 @@ class SecAggStrategy(FedAvgStrategy): @param_name_list.setter def param_name_list(self, s): self._param_name_list = s - - diff --git a/paddle_fl/core/trainer/diffiehellman/__init__.py b/paddle_fl/core/trainer/diffiehellman/__init__.py index c81ea145aa57d2c9bd2afdb2382dfe99f2efd363..a1143ffe5f7d034f11053e1ea5dbe38be50064f6 100644 --- a/paddle_fl/core/trainer/diffiehellman/__init__.py +++ b/paddle_fl/core/trainer/diffiehellman/__init__.py @@ -1,6 +1,6 @@ # coding=utf-8 -# +# # (c) Chris von Csefalvay, 2015. """ diff --git a/paddle_fl/core/trainer/diffiehellman/diffiehellman.py b/paddle_fl/core/trainer/diffiehellman/diffiehellman.py index f9e216264f340e5f3c8eb50d6e9b68916e4fa357..3d3e18a706a15865d9ef50c8bd3138645e0cff8c 100644 --- a/paddle_fl/core/trainer/diffiehellman/diffiehellman.py +++ b/paddle_fl/core/trainer/diffiehellman/diffiehellman.py @@ -59,7 +59,7 @@ class DiffieHellman: self.key_length = max(200, key_length) self.generator = PRIMES[group]["generator"] self.prime = PRIMES[group]["prime"] - + def load_private_key(self, priv_key_filepath="priv_key.txt"): f = open(priv_key_filepath, "r") self.private_key = int(f.read()) diff --git a/paddle_fl/core/trainer/diffiehellman/exceptions.py b/paddle_fl/core/trainer/diffiehellman/exceptions.py index 96a590e2324ea45aa3618bc2c5dd1f740a2ca948..289062c9477e7f00ea27cfc05d56d570b8de76ad 100644 --- a/paddle_fl/core/trainer/diffiehellman/exceptions.py +++ b/paddle_fl/core/trainer/diffiehellman/exceptions.py @@ -1,6 +1,6 @@ # coding=utf-8 -# +# # (c) Chris von Csefalvay, 2015. """ @@ -23,4 +23,4 @@ class RNGError(BaseException): """ def __str__(self): - return "RNG could not be obtained. This module currently only works with Python 3." \ No newline at end of file + return "RNG could not be obtained. This module currently only works with Python 3." diff --git a/paddle_fl/core/trainer/fl_trainer.py b/paddle_fl/core/trainer/fl_trainer.py index cbe0c805887c21183f5b202b09866b525ad26861..7f56199a3c97437ccdaa32a976e1db3439d3cbfc 100755 --- a/paddle_fl/core/trainer/fl_trainer.py +++ b/paddle_fl/core/trainer/fl_trainer.py @@ -52,8 +52,8 @@ class FLTrainer(object): self._feed_names = job._feed_names self._target_names = job._target_names self._scheduler_ep = job._scheduler_ep - self._current_ep = None - self.cur_step = 0 + self._current_ep = None + self.cur_step = 0 def start(self): #current_ep = "to be added" @@ -68,7 +68,7 @@ class FLTrainer(object): feed=feed, fetch_list=fetch) self._logger.debug("end to run current batch") - self.cur_step += 1 + self.cur_step += 1 def save_inference_program(self, output_folder): target_vars = [] @@ -87,15 +87,15 @@ class FLTrainer(object): # ask for termination with master endpoint # currently not open sourced, will release the code later # TODO(guru4elephant): add connection with master - if self.cur_step != 0: - while not self.agent.finish_training(): - print('wait others finish') - continue + if self.cur_step != 0: + while not self.agent.finish_training(): + print('wait others finish') + continue while not self.agent.can_join_training(): - print("wait permit") - continue - print("ready to train") - return False + print("wait permit") + continue + print("ready to train") + return False class FedAvgTrainer(FLTrainer): @@ -104,9 +104,9 @@ class FedAvgTrainer(FLTrainer): pass def start(self): - #current_ep = "to be added" + #current_ep = "to be added" self.agent = FLWorkerAgent(self._scheduler_ep, self._current_ep) - self.agent.connect_scheduler() + self.agent.connect_scheduler() self.exe = fluid.Executor(fluid.CPUPlace()) self.exe.run(self._startup_program) @@ -125,7 +125,7 @@ class FedAvgTrainer(FLTrainer): self._logger.debug("begin to run recv program") self.exe.run(self._recv_program) self._logger.debug("begin to run current step") - loss = self.exe.run(self._main_program, + loss = self.exe.run(self._main_program, feed=feed, fetch_list=fetch) if self.cur_step % self._step == 0: @@ -136,8 +136,8 @@ class FedAvgTrainer(FLTrainer): def stop(self): return False - - + + class SecAggTrainer(FLTrainer): def __init__(self): super(SecAggTrainer, self).__init__() @@ -198,7 +198,7 @@ class SecAggTrainer(FLTrainer): self.exe.run(self._recv_program) scope = fluid.global_scope() self._logger.debug("begin to run current step") - loss = self.exe.run(self._main_program, + loss = self.exe.run(self._main_program, feed=feed, fetch_list=fetch) if self.cur_step % self._step == 0: @@ -233,4 +233,3 @@ class SecAggTrainer(FLTrainer): def stop(self): return False - diff --git a/paddle_fl/examples/ctr_demo/fl_master.py b/paddle_fl/examples/ctr_demo/fl_master.py index b82d261c2fc66c092cacc272e7ec06624df94d0c..1207c1c6b809c9bec201ab2409616b5fac1b340a 100644 --- a/paddle_fl/examples/ctr_demo/fl_master.py +++ b/paddle_fl/examples/ctr_demo/fl_master.py @@ -38,7 +38,7 @@ job_generator.set_infer_feed_and_target_names( [x.name for x in inputs], [model.predict.name]) build_strategy = FLStrategyFactory() -build_strategy.fed_avg = True +build_strategy.fed_avg = True build_strategy.inner_step = 10 strategy = build_strategy.create_fl_strategy() diff --git a/paddle_fl/examples/ctr_demo/fl_trainer.py b/paddle_fl/examples/ctr_demo/fl_trainer.py index 0be980c28804dc14a9a5feae540c1dad84e0628c..022dd3d6c7c0bc3e65d7fa25790e1e6c39492821 100644 --- a/paddle_fl/examples/ctr_demo/fl_trainer.py +++ b/paddle_fl/examples/ctr_demo/fl_trainer.py @@ -31,10 +31,10 @@ while not trainer.stop(): print("batch %d start train" % (step_i)) train_step = 0 for data in reader(): - trainer.run(feed=data, fetch=[]) - train_step += 1 - if train_step == trainer._step: - break + trainer.run(feed=data, fetch=[]) + train_step += 1 + if train_step == trainer._step: + break step_i += 1 if step_i % 100 == 0: trainer.save_inference_program(output_folder) diff --git a/paddle_fl/examples/dpsgd_demo/fl_master.py b/paddle_fl/examples/dpsgd_demo/fl_master.py index 50ede32582cc03a78184744c8a61044fe667c3c9..f79472e8a7def3f3851d6b7eaf544a27c6eb8cc9 100644 --- a/paddle_fl/examples/dpsgd_demo/fl_master.py +++ b/paddle_fl/examples/dpsgd_demo/fl_master.py @@ -2,7 +2,7 @@ import paddle.fluid as fluid import paddle_fl as fl from paddle_fl.core.master.job_generator import JobGenerator from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory -import math +import math class Model(object): def __init__(self): diff --git a/paddle_fl/examples/dpsgd_demo/fl_trainer.py b/paddle_fl/examples/dpsgd_demo/fl_trainer.py index 3468f8df17a865f9a3e61a0c46615994dec91ff2..76f8841a1afed0decc03c5257a49f1609be21a6e 100644 --- a/paddle_fl/examples/dpsgd_demo/fl_trainer.py +++ b/paddle_fl/examples/dpsgd_demo/fl_trainer.py @@ -31,15 +31,15 @@ label = fluid.layers.data(name='label', shape=[1], dtype='int64') feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace()) def train_test(train_test_program, train_test_feed, train_test_reader): - acc_set = [] - for test_data in train_test_reader(): - acc_np = trainer.exe.run( - program=train_test_program, - feed=train_test_feed.feed(test_data), - fetch_list=["accuracy_0.tmp_0"]) - acc_set.append(float(acc_np[0])) - acc_val_mean = numpy.array(acc_set).mean() - return acc_val_mean + acc_set = [] + for test_data in train_test_reader(): + acc_np = trainer.exe.run( + program=train_test_program, + feed=train_test_feed.feed(test_data), + fetch_list=["accuracy_0.tmp_0"]) + acc_set.append(float(acc_np[0])) + acc_val_mean = numpy.array(acc_set).mean() + return acc_val_mean def compute_privacy_budget(sample_ratio, epsilon, step, delta): E = 2 * epsilon * math.sqrt(step * sample_ratio) @@ -57,7 +57,7 @@ while not trainer.stop(): acc = trainer.run(feeder.feed(data), fetch=["accuracy_0.tmp_0"]) step += 1 # print("acc:%.3f" % (acc[0])) - + acc_val = train_test( train_test_program=test_program, train_test_reader=test_reader, @@ -65,8 +65,6 @@ while not trainer.stop(): print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val)) compute_privacy_budget(sample_ratio=0.001, epsilon=0.1, step=step, delta=0.00001) - + save_dir = (output_folder + "/epoch_%d") % epoch_id trainer.save_inference_program(output_folder) - - diff --git a/paddle_fl/examples/gru4rec_demo/fl_trainer.py b/paddle_fl/examples/gru4rec_demo/fl_trainer.py index 34571c1463b5ecebe1da47d13bbb70b26ad5bf9e..84410e9077c5c39963de6fee324ff77ed3b70b9b 100644 --- a/paddle_fl/examples/gru4rec_demo/fl_trainer.py +++ b/paddle_fl/examples/gru4rec_demo/fl_trainer.py @@ -33,8 +33,8 @@ while not trainer.stop(): ret_avg_cost = trainer.run(feed=data, fetch=["mean_0.tmp_0"]) train_step += 1 - if train_step == trainer._step: - break + if train_step == trainer._step: + break avg_ppl = np.exp(ret_avg_cost[0]) newest_ppl = np.mean(avg_ppl) print("ppl:%.3f" % (newest_ppl)) diff --git a/paddle_fl/examples/secagg_demo/fl_master.py b/paddle_fl/examples/secagg_demo/fl_master.py index 1412039996b2c3faf78a16f3e3ee0c71fd68116e..e0e19e6fc77392a0d628d287bf53295977ab7e81 100644 --- a/paddle_fl/examples/secagg_demo/fl_master.py +++ b/paddle_fl/examples/secagg_demo/fl_master.py @@ -9,10 +9,10 @@ class Model(object): def linear_regression(self, inputs, label): param_attrs = fluid.ParamAttr( - name="fc_0.b_0", + name="fc_0.b_0", initializer=fluid.initializer.ConstantInitializer(0.0)) param_attrs = fluid.ParamAttr( - name="fc_0.w_0", + name="fc_0.w_0", initializer=fluid.initializer.ConstantInitializer(0.0)) self.predict = fluid.layers.fc(input=inputs, size=10, act='softmax', param_attr=param_attrs) self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label) @@ -40,7 +40,7 @@ build_strategy = FLStrategyFactory() build_strategy.sec_agg = True param_name_list = [] param_name_list.append("fc_0.w_0.opti.trainer_") # need trainer_id when running -param_name_list.append("fc_0.b_0.opti.trainer_") +param_name_list.append("fc_0.b_0.opti.trainer_") build_strategy.param_name_list = param_name_list build_strategy.inner_step = 10 diff --git a/paddle_fl/examples/secagg_demo/fl_trainer.py b/paddle_fl/examples/secagg_demo/fl_trainer.py index 7a9ec1f39fb9b915ba77b0e9b814f92586e488b8..d6f9fdffb6e270674831a6c4f826f6673d073d44 100644 --- a/paddle_fl/examples/secagg_demo/fl_trainer.py +++ b/paddle_fl/examples/secagg_demo/fl_trainer.py @@ -20,7 +20,7 @@ train_reader = paddle.batch( paddle.reader.shuffle(paddle.dataset.mnist.train(), buf_size=500), batch_size=BATCH_SIZE) test_reader = paddle.batch( - paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) + paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) trainer_num = 2 trainer_id = int(sys.argv[1]) # trainer id for each guest @@ -68,10 +68,10 @@ while not trainer.stop(): for data in train_reader(): step_i += 1 trainer.step_id = step_i - accuracy, = trainer.run(feed=feeder.feed(data), - fetch=["accuracy_0.tmp_0"]) + accuracy, = trainer.run(feed=feeder.feed(data), + fetch=["accuracy_0.tmp_0"]) if step_i % 100 == 0: - print("Epoch: {0}, step: {1}, accuracy: {2}".format(epoch_id, step_i, accuracy[0])) + print("Epoch: {0}, step: {1}, accuracy: {2}".format(epoch_id, step_i, accuracy[0])) avg_loss_val, acc_val = train_test(train_test_program=test_program, train_test_reader=test_reader, diff --git a/setup.py b/setup.py index 050d63ff684146c0c390bc3f81af6a6791688197..a419cb2c6f0c2b4ee79562a6b843d095431732ec 100644 --- a/setup.py +++ b/setup.py @@ -71,4 +71,3 @@ setup( ], license='Apache 2.0', keywords=('paddle_fl paddlepaddle multi-task transfer distributed-training')) -