diff --git a/benchmark/fluid/fluid_benchmark.py b/benchmark/fluid/fluid_benchmark.py index 2450c2d777a16fc1d6e81a67e14466d05119f9da..ece1102dce987cda994ff086b07f756498ce26e6 100644 --- a/benchmark/fluid/fluid_benchmark.py +++ b/benchmark/fluid/fluid_benchmark.py @@ -264,8 +264,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, break else: loss, = exe.run([avg_loss.name], feed=feeder.feed(data)) - if args.update_method == "pserver": - exe.bcast_params() if args.use_reader_op: num_samples += args.batch_size * args.gpus else: diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index afa6d91145648e7e60e16e81bed7e5fb060d8256..25cc1355d5a53e44b7f45c1f7d80673abcf567ec 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -71,7 +71,6 @@ class ParallelExecutor(object): num_trainers=1, trainer_id=0, **kwargs): - if len(kwargs) != 0: err_msg = "" for key in kwargs: @@ -130,6 +129,11 @@ class ParallelExecutor(object): main = main_program main = main if main else framework.default_main_program() scope = executor.global_scope() + # FIXME(Yancey1989): it's a temporary approach to determinate the distribute + # train program, call self.bcast_param() at the end of each mini-batch. + self.is_dist = True if "recv" in [ + op.type for op in main.global_block().ops + ] else False if share_vars_from and not isinstance(share_vars_from, ParallelExecutor): @@ -262,6 +266,10 @@ class ParallelExecutor(object): fetch_var_name = '@FETCHED_VAR_NAME@' self.executor.run(fetch_list, fetch_var_name) arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array() + + if self.is_dist: + self.bcast_params() + return [arr[i] for i in range(len(arr))] def bcast_params(self):