diff --git a/contrib/data_safety_training/image_classification/server/receive.py b/contrib/data_safety_training/image_classification/server/receiver.py similarity index 100% rename from contrib/data_safety_training/image_classification/server/receive.py rename to contrib/data_safety_training/image_classification/server/receiver.py diff --git a/docs/source/md/quick_start.md b/docs/source/md/quick_start.md index bfdd9450d2d7801c60d4111a016f59983a76fc72..93731d6d656ce63cdd0f10f9acce411f57fba0e0 100644 --- a/docs/source/md/quick_start.md +++ b/docs/source/md/quick_start.md @@ -67,7 +67,8 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler worker_num = 2 server_num = 1 -scheduler = FLScheduler(worker_num,server_num) +# Define the number of worker/server and the port for scheduler +scheduler = FLScheduler(worker_num,server_num,port=9091) scheduler.set_sample_worker_num(worker_num) scheduler.init_env() print("init env done.") @@ -94,6 +95,7 @@ trainer_id = int(sys.argv[1]) # trainer id for each guest job_path = "fl_job_config" job = FLRunTimeJob() job.load_trainer_job(job_path, trainer_id) +job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer trainer = FLTrainerFactory().create_fl_trainer(job) trainer.start() @@ -122,6 +124,8 @@ server_id = 0 job_path = "fl_job_config" job = FLRunTimeJob() job.load_server_job(job_path, server_id) +job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler server.set_server_job(job) +server._current_ep = "127.0.0.1:8181" # IP address for server server.start() ``` diff --git a/paddle_fl/core/scheduler/agent_master.py b/paddle_fl/core/scheduler/agent_master.py index e9b42a91a256d0ea6c59f8fc8d1b2390b9566aaf..f26a85a7f44aca7e662dd3140b269944d89201f5 100644 --- a/paddle_fl/core/scheduler/agent_master.py +++ b/paddle_fl/core/scheduler/agent_master.py @@ -140,4 +140,3 @@ class FLScheduler(object): if len(finish_training_dict) == len(worker_dict): all_finish_training = True time.sleep(5) - loop += 1 diff --git a/paddle_fl/core/trainer/diffiehellman/diffiehellman.py b/paddle_fl/core/trainer/diffiehellman/diffiehellman.py index 3374924a3c57c5352b1da2d97474c88e130d6bc7..d5e1b196f56e5694b4112a8ff82022ee3bac7910 100644 --- a/paddle_fl/core/trainer/diffiehellman/diffiehellman.py +++ b/paddle_fl/core/trainer/diffiehellman/diffiehellman.py @@ -44,7 +44,6 @@ try: except(AttributeError, ImportError): rng = os.urandom - class DiffieHellman: """ Implements the Diffie-Hellman key exchange protocol. diff --git a/paddle_fl/examples/ctr_demo/fl_scheduler.py b/paddle_fl/examples/ctr_demo/fl_scheduler.py index 7a0c8f5b3411aac07f7e50e3daf3a1c0a53144e4..9dc5d84497b376d1aa7fd5731771b0799343c2ec 100644 --- a/paddle_fl/examples/ctr_demo/fl_scheduler.py +++ b/paddle_fl/examples/ctr_demo/fl_scheduler.py @@ -2,7 +2,8 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler worker_num = 2 server_num = 1 -scheduler = FLScheduler(worker_num,server_num) +# Define the number of worker/server and the port for scheduler +scheduler = FLScheduler(worker_num,server_num,port=9091) scheduler.set_sample_worker_num(worker_num) scheduler.init_env() print("init env done.") diff --git a/paddle_fl/examples/ctr_demo/fl_server.py b/paddle_fl/examples/ctr_demo/fl_server.py index 3671838a60b678c8c29ad530888ec75252c5430a..529df8da4079fbbd217c58a857f7ab8a3c307586 100644 --- a/paddle_fl/examples/ctr_demo/fl_server.py +++ b/paddle_fl/examples/ctr_demo/fl_server.py @@ -21,8 +21,8 @@ server_id = 0 job_path = "fl_job_config" job = FLRunTimeJob() job.load_server_job(job_path, server_id) -job._scheduler_ep = "127.0.0.1:9091" +job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler server.set_server_job(job) -server._current_ep = "127.0.0.1:8181" +server._current_ep = "127.0.0.1:8181" # IP address for server server.start() print("connect") diff --git a/paddle_fl/examples/ctr_demo/fl_trainer.py b/paddle_fl/examples/ctr_demo/fl_trainer.py index b6c864b5ac9979b76f5246d5b2b2bc3b6a9896cc..0bcfb19544741eb1ef7158dcbf6136c2e34d1b9c 100644 --- a/paddle_fl/examples/ctr_demo/fl_trainer.py +++ b/paddle_fl/examples/ctr_demo/fl_trainer.py @@ -19,22 +19,22 @@ trainer_id = int(sys.argv[1]) # trainer id for each guest job_path = "fl_job_config" job = FLRunTimeJob() job.load_trainer_job(job_path, trainer_id) -job._scheduler_ep = "127.0.0.1:9091" +job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer trainer = FLTrainerFactory().create_fl_trainer(job) trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) trainer.start() print(trainer._scheduler_ep, trainer._current_ep) output_folder = "fl_model" -step_i = 0 +epoch_id = 0 while not trainer.stop(): - print("batch %d start train" % (step_i)) + print("batch %d start train" % (epoch_id)) train_step = 0 for data in reader(): trainer.run(feed=data, fetch=[]) train_step += 1 if train_step == trainer._step: break - step_i += 1 - if step_i % 100 == 0: + epoch_id += 1 + if epoch_id % 5 == 0: trainer.save_inference_program(output_folder) diff --git a/paddle_fl/examples/dpsgd_demo/fl_scheduler.py b/paddle_fl/examples/dpsgd_demo/fl_scheduler.py index 0016fe70cc5d128404f6dc931028dedad1ee79e8..f8ea641e2fa356102ffc08eec179e84ca1993f7d 100644 --- a/paddle_fl/examples/dpsgd_demo/fl_scheduler.py +++ b/paddle_fl/examples/dpsgd_demo/fl_scheduler.py @@ -2,7 +2,8 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler worker_num = 4 server_num = 1 -scheduler = FLScheduler(worker_num,server_num) +#Define number of worker/server and the port for scheduler +scheduler = FLScheduler(worker_num,server_num,port=9091) scheduler.set_sample_worker_num(4) scheduler.init_env() print("init env done.") diff --git a/paddle_fl/examples/dpsgd_demo/fl_server.py b/paddle_fl/examples/dpsgd_demo/fl_server.py index dbbeaa8d779ecb96d3c442c9565e088b23590764..39056e82d99fb924f52c201e0fb230b6bc1626a1 100644 --- a/paddle_fl/examples/dpsgd_demo/fl_server.py +++ b/paddle_fl/examples/dpsgd_demo/fl_server.py @@ -21,7 +21,7 @@ server_id = 0 job_path = "fl_job_config" job = FLRunTimeJob() job.load_server_job(job_path, server_id) -job._scheduler_ep = "127.0.0.1:9091" +job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler server.set_server_job(job) -server._current_ep = "127.0.0.1:8181" +server._current_ep = "127.0.0.1:8181" # IP address for server server.start() diff --git a/paddle_fl/examples/dpsgd_demo/fl_trainer.py b/paddle_fl/examples/dpsgd_demo/fl_trainer.py index 76f8841a1afed0decc03c5257a49f1609be21a6e..074368194e20defdea2f1151c9a0f940ed613189 100644 --- a/paddle_fl/examples/dpsgd_demo/fl_trainer.py +++ b/paddle_fl/examples/dpsgd_demo/fl_trainer.py @@ -13,7 +13,7 @@ trainer_id = int(sys.argv[1]) # trainer id for each guest job_path = "fl_job_config" job = FLRunTimeJob() job.load_trainer_job(job_path, trainer_id) -job._scheduler_ep = "127.0.0.1:9091" +job._scheduler_ep = "127.0.0.1:9091" # Inform scheduler IP address to trainer trainer = FLTrainerFactory().create_fl_trainer(job) trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) trainer.start() diff --git a/paddle_fl/examples/dpsgd_demo/run.sh b/paddle_fl/examples/dpsgd_demo/run.sh index b17d100cb380749443380d275a91d2c8703a892b..9fb7ad97fef90531d9f52004ad36877aae669137 100644 --- a/paddle_fl/examples/dpsgd_demo/run.sh +++ b/paddle_fl/examples/dpsgd_demo/run.sh @@ -1,3 +1,5 @@ +unset http_proxy +unset https_proxy python fl_master.py sleep 2 python -u fl_scheduler.py >scheduler.log & diff --git a/paddle_fl/examples/femnist_demo/fl_scheduler.py b/paddle_fl/examples/femnist_demo/fl_scheduler.py index 0016fe70cc5d128404f6dc931028dedad1ee79e8..346529fd6caadef06d1e46078fa873da264a7507 100644 --- a/paddle_fl/examples/femnist_demo/fl_scheduler.py +++ b/paddle_fl/examples/femnist_demo/fl_scheduler.py @@ -2,7 +2,8 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler worker_num = 4 server_num = 1 -scheduler = FLScheduler(worker_num,server_num) +# Define the number of worker/server and the port for scheduler +scheduler = FLScheduler(worker_num,server_num,port=9091) scheduler.set_sample_worker_num(4) scheduler.init_env() print("init env done.") diff --git a/paddle_fl/examples/femnist_demo/fl_server.py b/paddle_fl/examples/femnist_demo/fl_server.py index 33c29d29855fe5f2f5ea02e358b9843970e0481e..cd558deb4ba577e5d0dbc74ecc90d0e22d278d8e 100644 --- a/paddle_fl/examples/femnist_demo/fl_server.py +++ b/paddle_fl/examples/femnist_demo/fl_server.py @@ -7,7 +7,7 @@ server_id = 0 job_path = "fl_job_config" job = FLRunTimeJob() job.load_server_job(job_path, server_id) -job._scheduler_ep = "127.0.0.1:9091" +job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler server.set_server_job(job) -server._current_ep = "127.0.0.1:8181" +server._current_ep = "127.0.0.1:8181" # IP address for server server.start() diff --git a/paddle_fl/examples/femnist_demo/fl_trainer.py b/paddle_fl/examples/femnist_demo/fl_trainer.py index b46661f2099cc7ef091c87404e21fdf5074e6b25..dce2c8af02ce3a3be58f0a6fe03eca18a882472a 100644 --- a/paddle_fl/examples/femnist_demo/fl_trainer.py +++ b/paddle_fl/examples/femnist_demo/fl_trainer.py @@ -14,7 +14,7 @@ trainer_id = int(sys.argv[1]) # trainer id for each guest job_path = "fl_job_config" job = FLRunTimeJob() job.load_trainer_job(job_path, trainer_id) -job._scheduler_ep = "127.0.0.1:9091" +job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer print(job._target_names) trainer = FLTrainerFactory().create_fl_trainer(job) trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) @@ -40,7 +40,7 @@ def train_test(train_test_program, train_test_feed, train_test_reader): epoch_id = 0 step = 0 epoch = 3000 -count_by_step = True +count_by_step = False if count_by_step: output_folder = "model_node%d" % trainer_id else: diff --git a/paddle_fl/examples/femnist_demo/run.sh b/paddle_fl/examples/femnist_demo/run.sh index 4a32be225fe2145630b3ca581525da00faa58930..186ce530a0b4f8e6cf4c96fab2e40bc90cda9d63 100644 --- a/paddle_fl/examples/femnist_demo/run.sh +++ b/paddle_fl/examples/femnist_demo/run.sh @@ -1,3 +1,5 @@ +unset http_proxy +unset https_proxy #killall python python fl_master.py sleep 2 diff --git a/paddle_fl/examples/gru4rec_demo/fl_scheduler.py b/paddle_fl/examples/gru4rec_demo/fl_scheduler.py index 0016fe70cc5d128404f6dc931028dedad1ee79e8..346529fd6caadef06d1e46078fa873da264a7507 100644 --- a/paddle_fl/examples/gru4rec_demo/fl_scheduler.py +++ b/paddle_fl/examples/gru4rec_demo/fl_scheduler.py @@ -2,7 +2,8 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler worker_num = 4 server_num = 1 -scheduler = FLScheduler(worker_num,server_num) +# Define the number of worker/server and the port for scheduler +scheduler = FLScheduler(worker_num,server_num,port=9091) scheduler.set_sample_worker_num(4) scheduler.init_env() print("init env done.") diff --git a/paddle_fl/examples/gru4rec_demo/fl_server.py b/paddle_fl/examples/gru4rec_demo/fl_server.py index dbbeaa8d779ecb96d3c442c9565e088b23590764..39056e82d99fb924f52c201e0fb230b6bc1626a1 100644 --- a/paddle_fl/examples/gru4rec_demo/fl_server.py +++ b/paddle_fl/examples/gru4rec_demo/fl_server.py @@ -21,7 +21,7 @@ server_id = 0 job_path = "fl_job_config" job = FLRunTimeJob() job.load_server_job(job_path, server_id) -job._scheduler_ep = "127.0.0.1:9091" +job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler server.set_server_job(job) -server._current_ep = "127.0.0.1:8181" +server._current_ep = "127.0.0.1:8181" # IP address for server server.start() diff --git a/paddle_fl/examples/gru4rec_demo/fl_trainer.py b/paddle_fl/examples/gru4rec_demo/fl_trainer.py index 84410e9077c5c39963de6fee324ff77ed3b70b9b..b8416f739d1b0dc7cb493768cf1c9283214778c4 100644 --- a/paddle_fl/examples/gru4rec_demo/fl_trainer.py +++ b/paddle_fl/examples/gru4rec_demo/fl_trainer.py @@ -14,7 +14,7 @@ train_file_dir = "mid_data/node4/%d/" % trainer_id job_path = "fl_job_config" job = FLRunTimeJob() job.load_trainer_job(job_path, trainer_id) -job._scheduler_ep = "127.0.0.1:9091" +job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer trainer = FLTrainerFactory().create_fl_trainer(job) trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) trainer.start() diff --git a/paddle_fl/examples/secagg_demo/fl_scheduler.py b/paddle_fl/examples/secagg_demo/fl_scheduler.py index 7a0c8f5b3411aac07f7e50e3daf3a1c0a53144e4..649f74768ef76cf1f8af52319b3818144f37e98a 100644 --- a/paddle_fl/examples/secagg_demo/fl_scheduler.py +++ b/paddle_fl/examples/secagg_demo/fl_scheduler.py @@ -2,7 +2,8 @@ from paddle_fl.core.scheduler.agent_master import FLScheduler worker_num = 2 server_num = 1 -scheduler = FLScheduler(worker_num,server_num) + +scheduler = FLScheduler(worker_num,server_num,port=9091) scheduler.set_sample_worker_num(worker_num) scheduler.init_env() print("init env done.") diff --git a/paddle_fl/examples/secagg_demo/fl_server.py b/paddle_fl/examples/secagg_demo/fl_server.py index 3671838a60b678c8c29ad530888ec75252c5430a..529df8da4079fbbd217c58a857f7ab8a3c307586 100644 --- a/paddle_fl/examples/secagg_demo/fl_server.py +++ b/paddle_fl/examples/secagg_demo/fl_server.py @@ -21,8 +21,8 @@ server_id = 0 job_path = "fl_job_config" job = FLRunTimeJob() job.load_server_job(job_path, server_id) -job._scheduler_ep = "127.0.0.1:9091" +job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler server.set_server_job(job) -server._current_ep = "127.0.0.1:8181" +server._current_ep = "127.0.0.1:8181" # IP address for server server.start() print("connect") diff --git a/paddle_fl/examples/secagg_demo/fl_trainer.py b/paddle_fl/examples/secagg_demo/fl_trainer.py index e97ffbc0fc33b384699edbc5f0703aed412846be..7b08e8724a67219654c7f94809fc281353202ad2 100644 --- a/paddle_fl/examples/secagg_demo/fl_trainer.py +++ b/paddle_fl/examples/secagg_demo/fl_trainer.py @@ -28,7 +28,7 @@ trainer_id = int(sys.argv[1]) # trainer id for each guest job_path = "fl_job_config" job = FLRunTimeJob() job.load_trainer_job(job_path, trainer_id) -job._scheduler_ep = "127.0.0.1:9091" +job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer trainer = FLTrainerFactory().create_fl_trainer(job) trainer.trainer_id = trainer_id trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) @@ -75,6 +75,7 @@ while not trainer.stop(): if step_i % 100 == 0: print("Epoch: {0}, step: {1}, accuracy: {2}".format(epoch_id, step_i, accuracy[0])) + print(step_i) avg_loss_val, acc_val = train_test(train_test_program=test_program, train_test_reader=test_reader, train_test_feed=feeder) @@ -82,5 +83,5 @@ while not trainer.stop(): if epoch_id > 40: break - if step_i % 100 == 0: + if epoch_id % 5 == 0: trainer.save_inference_program(output_folder) diff --git a/paddle_fl/examples/submitter_demo/conf.txt b/paddle_fl/examples/submitter_demo/conf.txt index 4f952b7f9f9549cab96dd61f174111c41966dda7..0880739e809063bc21531a92bb205ee2df4f169a 100644 --- a/paddle_fl/examples/submitter_demo/conf.txt +++ b/paddle_fl/examples/submitter_demo/conf.txt @@ -2,7 +2,8 @@ task_name=test_fl_job_submit_jingqinghe hdfs_output=/user/feed/mlarch/sequence_generator/dongdaxiang/job_44 train_cmd=python dist_trainer.py -monitor_cmd=python system_monitor_app.py 10 100 +#monitor_cmd=python system_monitor_app.py 10 100 +monitor_cmd= #train_cmd=python test_hadoop.py hdfs_path=afs://xingtian.afs.baidu.com:9902 diff --git a/paddle_fl/examples/submitter_demo/kill.sh b/paddle_fl/examples/submitter_demo/kill.sh index 44c26767d2d80cfc2a8950da0e822a06a691137e..3f2a3a9586028ee4e892a4b4aad5bb390e32d50a 100644 --- a/paddle_fl/examples/submitter_demo/kill.sh +++ b/paddle_fl/examples/submitter_demo/kill.sh @@ -1 +1,3 @@ -/home/jingqinghe/mpi_feed4/smart_client/bin/qdel $1".yq01-hpc-lvliang01-smart-master.dmop.baidu.com" +unset http_proxy +unset https_proxy +/home/jingqinghe/tools/mpi_feed4/smart_client/bin/qdel $1".yq01-hpc-lvliang01-smart-master.dmop.baidu.com" diff --git a/paddle_fl/examples/submitter_demo/scheduler_client.py b/paddle_fl/examples/submitter_demo/scheduler_client.py index 59068a8d87b200ac6e4d375a40757c70afd5c0c7..b19c1793e401e473536c72f22fbbc1fe30d7d3a5 100644 --- a/paddle_fl/examples/submitter_demo/scheduler_client.py +++ b/paddle_fl/examples/submitter_demo/scheduler_client.py @@ -18,7 +18,8 @@ print(random_port) current_ip = socket.gethostbyname(socket.gethostname()) endpoints = "{}:{}".format(current_ip, random_port) #start a web server for remote endpoints to download their config -os.system("python -m SimpleHTTPServer 8080 &") +#os.system("python -m SimpleHTTPServer 8080 &") +os.system("python -m http.server 8080 &") if os.path.exists("job_config"): os.system("rm -rf job_config") if os.path.exists("package"): @@ -120,10 +121,10 @@ print(ip_list) #allocate the role of each endpoint and their ids ip_role = {} for i in range(len(ip_list)): - if i < int(default_dict["server_nodes"]): - ip_role[ip_list[i]] = 'server%d' % i + if i < int(default_dict["server_nodes"]): + ip_role[ip_list[i]] = 'server%d' % i else: - ip_role[ip_list[i]] = 'trainer%d' % (i-int(default_dict["server_nodes"])) + ip_role[ip_list[i]] = 'trainer%d' % (i-int(default_dict["server_nodes"])) print(ip_role) def job_generate(): @@ -179,7 +180,7 @@ while not all_job_sent: message = zmq_socket.recv() group = message.split("\t") if group[0] == "GET_FL_JOB": - download_job.append(group[1]) + download_job.append(group[1]) zmq_socket.send(ip_role[group[1]]) else: zmq_socket.send("WAIT\t0") diff --git a/paddle_fl/examples/submitter_demo/train_program.py b/paddle_fl/examples/submitter_demo/train_program.py index 7dae086f2a02c39a2166fd415fff66adebe1535a..22cef9f92012dc816906431a5671994f5fbb3363 100644 --- a/paddle_fl/examples/submitter_demo/train_program.py +++ b/paddle_fl/examples/submitter_demo/train_program.py @@ -89,15 +89,15 @@ else: trainer.start() print(trainer._scheduler_ep, trainer._current_ep) output_folder = "fl_model" - step_i = 0 + epoch_id = 0 while not trainer.stop(): print("batch %d start train" % (step_i)) - train_step = 0 + step_i = 0 for data in reader(): trainer.run(feed=data, fetch=[]) - train_step += 1 + step_i += 1 if train_step == trainer._step: break - step_i += 1 - if step_i % 100 == 0: + epoch_id += 1 + if epoch_id % 5 == 0: trainer.save_inference_program(output_folder)