diff --git a/python/paddle/fluid/distribute_transpiler.py b/python/paddle/fluid/distribute_transpiler.py index d4d47a073bed104af424e673aff365fe5d31dae8..902759d16fac0ecc051c867f407d8d93f5d78027 100644 --- a/python/paddle/fluid/distribute_transpiler.py +++ b/python/paddle/fluid/distribute_transpiler.py @@ -373,22 +373,18 @@ class DistributeTranspiler: recv_inputs.append(single_trainer_var) # step 3 - # each optimization op will has a optimize block - optimize_block = None - - # step 4 # Create a union-find data structure from optimize ops, # If two ops are connected, we could add these two ops # into one set. ufind = self._create_ufind(self.optimize_ops) - # step 4.2 + # step 3.2 # Iterate through the ops and append optimize op which # located on current pserver opt_op_on_pserver = [] for _, op in enumerate(self.optimize_ops): if self._is_opt_op(op) and self._is_opt_op_on_pserver(endpoint, op): opt_op_on_pserver.append(op) - # step 4.3 + # step 3.3 # Iterate through the ops, and if an op and the optimize ops # which located on current pserver are in one set, then # append it into the sub program. @@ -422,23 +418,17 @@ class DistributeTranspiler: self._append_pserver_non_opt_ops(block, op) # append lr decay ops to the child block if exists - lr_decay_block = None lr_ops = self._get_lr_ops() if len(lr_ops) > 0: - lr_decay_block = pserver_program.create_block(GLOBAL_BLOCK_IDX) + lr_decay_block = pserver_program.create_block( + pserver_program.num_blocks - 1) for _, op in enumerate(lr_ops): self._append_pserver_non_opt_ops(lr_decay_block, op) # append op to the current block - per_opt_block = None - pre_block_idx = GLOBAL_BLOCK_IDX - if lr_decay_block is not None: - pre_block_idx = lr_decay_block.idx + pre_block_idx = pserver_program.num_blocks - 1 for idx, opt_op in enumerate(opt_op_on_pserver): per_opt_block = pserver_program.create_block(pre_block_idx) - if optimize_block is None: - # first optimize block - optimize_block = per_opt_block for _, op in enumerate(self.optimize_ops): # optimizer is connected to itself if ufind.is_connected(op, opt_op) and op not in global_ops: @@ -447,9 +437,10 @@ class DistributeTranspiler: # append global ops opt_state_block = None if global_ops: - opt_state_block = pserver_program.create_block(per_opt_block.idx) - for glb_op in global_ops: - __append_optimize_op__(glb_op, opt_state_block) + opt_state_block = pserver_program.create_block( + pserver_program.num_blocks - 1) + for glb_op in global_ops: + __append_optimize_op__(glb_op, opt_state_block) # NOT USED: single block version: # @@ -464,8 +455,7 @@ class DistributeTranspiler: if self.has_distributed_lookup_table: pserver_index = self.pserver_endpoints.index(endpoint) table_opt_block = self._create_table_optimize_block( - pserver_index, pserver_program, opt_state_block or - pserver_program.global_block()) + pserver_index, pserver_program, pre_block_idx) prefetch_block = self._create_prefetch_block( pserver_index, pserver_program, table_opt_block) @@ -483,7 +473,7 @@ class DistributeTranspiler: inputs={'X': recv_inputs}, outputs={}, attrs={ - "OptimizeBlock": optimize_block, + "OptimizeBlock": pserver_program.block(1), "endpoint": endpoint, "Fanin": self.trainer_num, "PrefetchBlock": prefetch_block, @@ -678,7 +668,7 @@ class DistributeTranspiler: return prefetch_block def _create_table_optimize_block(self, pserver_index, pserver_program, - append_block): + pre_block_idx): def _clone_var(block, var, persistable=True): assert isinstance(var, Variable) return block.create_var( @@ -715,7 +705,7 @@ class DistributeTranspiler: op for op in self.optimize_ops if op.input("Param")[0] == self.table_name ][0] - table_opt_block = pserver_program.create_block(append_block.idx) + table_opt_block = pserver_program.create_block(pre_block_idx) # only support sgd now assert table_opt_op.type == "sgd" diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 5e6c6204c5894235ea4f8814afe02e4d3acec50a..340882ea9e7b0e2a0c52749c771308c6b860ed07 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -1107,6 +1107,10 @@ class Program(object): def random_seed(self): return self._seed + @property + def num_blocks(self): + return self.desc.num_blocks() + @random_seed.setter def random_seed(self, seed): if not isinstance(seed, int):