diff --git a/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py b/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py index 2cbed1ee39819d43aa25857dcfa8e09cb438fa67..2d428d5fb5a5d8a166f72ec208039dea00266496 100644 --- a/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py +++ b/python/paddle/distributed/auto_parallel/static/parallelizer_v2.py @@ -25,7 +25,7 @@ from ..random import init_auto_parallel_rng from .partitioner import Partitioner from .process_group import get_world_process_group from .reshard import Resharder -from .utils import set_grad_var_shape, use_new_executor +from .utils import get_pp_stage, set_grad_var_shape, use_new_executor class Parallelizer: @@ -400,4 +400,6 @@ class Parallelizer: main_program._pipeline_opt["standalone_opt"] = { "schedule_mode": self._strategy.pipeline.schedule_mode, "num_micro_batches": self._strategy.pipeline.accumulate_steps, + "pp_degree": len(self._dist_context.process_meshes), + "pp_stage": get_pp_stage(self._dist_context, rank), } diff --git a/python/paddle/distributed/auto_parallel/static/utils.py b/python/paddle/distributed/auto_parallel/static/utils.py index 130098ac9d946e5f677651c7d0acea4875ce78dc..a5e19baf7cf121246a6bc6702b6c7151c8414c87 100644 --- a/python/paddle/distributed/auto_parallel/static/utils.py +++ b/python/paddle/distributed/auto_parallel/static/utils.py @@ -2382,6 +2382,15 @@ def use_new_executor(): ] +def get_pp_stage(dist_context, rank): + pp_idx = None + for idx, process_mesh in enumerate(dist_context.process_meshes): + if rank in process_mesh.process_ids: + pp_idx = idx + break + return pp_idx + + def wrap_data_for_completion( dist_op, input_names: list, output_names: list, attr_names: list ): diff --git a/python/paddle/distributed/passes/pipeline_scheduler_pass.py b/python/paddle/distributed/passes/pipeline_scheduler_pass.py index fcea7939d6554e987768730f36b55fa3941c8592..beb571d25fc25f23ff7b8b0e59d28a2094687c6e 100644 --- a/python/paddle/distributed/passes/pipeline_scheduler_pass.py +++ b/python/paddle/distributed/passes/pipeline_scheduler_pass.py @@ -326,9 +326,10 @@ class Pipeline1F1BPass(PassBase): def _check_conflict(self, other_pass): return True - def _create_job_list(self): + def _create_job_list(self, type_to_skip_vars): job_list = [] lr_job = core.Job("lr") + lr_job.set_skip_gc_vars(type_to_skip_vars["lr"]) job_list.append(lr_job) assert ( @@ -342,6 +343,7 @@ class Pipeline1F1BPass(PassBase): for i in range(micro_batch_in_warmup): forward_job = core.Job("forward") forward_job.set_micro_batch_id(forward_micro_batch_id) + forward_job.set_skip_gc_vars(type_to_skip_vars["forward"]) job_list.append(forward_job) forward_micro_batch_id += 1 @@ -349,20 +351,24 @@ class Pipeline1F1BPass(PassBase): for i in range(micro_batch_in_1f1b): backward_job = core.Job("backward") backward_job.set_micro_batch_id(backward_micro_batch_id) + backward_job.set_skip_gc_vars(type_to_skip_vars["backward"]) job_list.append(backward_job) backward_micro_batch_id += 1 forward_job = core.Job("forward") forward_job.set_micro_batch_id(forward_micro_batch_id) + forward_job.set_skip_gc_vars(type_to_skip_vars["forward"]) job_list.append(forward_job) forward_micro_batch_id += 1 for i in range(micro_batch_in_warmup): backward_job = core.Job("backward") backward_job.set_micro_batch_id(backward_micro_batch_id) + backward_job.set_skip_gc_vars(type_to_skip_vars["backward"]) job_list.append(backward_job) backward_micro_batch_id += 1 opt_job = core.Job("optimizer") + opt_job.set_skip_gc_vars(type_to_skip_vars["optimizer"]) job_list.append(opt_job) return job_list @@ -373,8 +379,10 @@ class Pipeline1F1BPass(PassBase): self._program = main_program _insert_sync_for_fthenb_1f1b(self._program) - type_to_program = _program_for_fthenb_and_1f1b(self._program) - job_list = self._create_job_list() + type_to_program, type_to_skip_vars = _program_for_fthenb_and_1f1b( + self._program + ) + job_list = self._create_job_list(type_to_skip_vars) plan = core.Plan(job_list, type_to_program) context.set_attr("plan", plan) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index f8595d3d8362d0df1fc6d25bf68be5ee31ca9d46..c183a2f9bd5232da554f7d07eac806a7076c1540 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -878,10 +878,9 @@ class _ExecutorCache: standalone_opt = new_program._pipeline_opt["standalone_opt"] pass_name = standalone_opt["schedule_mode"] - pass_attr = { - "num_micro_batches": standalone_opt["num_micro_batches"] - } - plan = apply_pass(new_program, new_program, pass_name, pass_attr) + plan = apply_pass( + new_program, new_program, pass_name, standalone_opt + ) else: default_job = core.Job("default") type_to_program = {"default": new_program.desc} diff --git a/test/auto_parallel/CMakeLists.txt b/test/auto_parallel/CMakeLists.txt index 08d0c4fd34a39fb756a292bd0c3bba16006cb2bf..cea07f2d86fa41490e187f7985b553a98193d925 100644 --- a/test/auto_parallel/CMakeLists.txt +++ b/test/auto_parallel/CMakeLists.txt @@ -68,9 +68,8 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_auto_tuner MODULES test_auto_tuner) set_tests_properties(test_auto_tuner PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 100) - py_test_modules(test_pipeline_scheduler_FThenB MODULES - test_pipeline_scheduler_FThenB) - set_tests_properties(test_pipeline_scheduler_FThenB + py_test_modules(test_pipeline_scheduler MODULES test_pipeline_scheduler) + set_tests_properties(test_pipeline_scheduler PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50) py_test_modules(test_auto_tuner_compare MODULES test_auto_tuner_compare) set_tests_properties(test_auto_tuner_compare diff --git a/test/auto_parallel/pipeline_scheduler_FThenB.py b/test/auto_parallel/pipeline_scheduler_unittest.py similarity index 76% rename from test/auto_parallel/pipeline_scheduler_FThenB.py rename to test/auto_parallel/pipeline_scheduler_unittest.py index b02ed3a4739d03219cff30d40ca4334364264db3..f19666d7aff5f29578739719b486e17a6539fd39 100644 --- a/test/auto_parallel/pipeline_scheduler_FThenB.py +++ b/test/auto_parallel/pipeline_scheduler_unittest.py @@ -26,14 +26,14 @@ from paddle.distributed.fleet import auto paddle.enable_static() -def apply_pass(use_standalone_exe=False): +def apply_pass(schedule_mode="FThenB"): strategy = auto.Strategy() strategy.auto_mode = "semi" strategy.reinit = True pipeline = strategy.pipeline pipeline.enable = True - pipeline.schedule_mode = "1F1B" if not use_standalone_exe else "FThenB" + pipeline.schedule_mode = schedule_mode pipeline.accumulate_steps = 2 return strategy @@ -61,10 +61,10 @@ class Test1F1BPass(unittest.TestCase): place = paddle.fluid.CUDAPlace(ParallelEnv().dev_id) engine._executor = paddle.static.Executor(place) - def get_engine(self, use_standalone_exe=False): + def get_engine(self, schedule_mode="FThenB"): reset_prog() - strategy = apply_pass(use_standalone_exe) + strategy = apply_pass(schedule_mode) clip = paddle.nn.ClipGradByGlobalNorm(self.clip_norm) opt = paddle.optimizer.AdamW(learning_rate=0.00001, grad_clip=clip) model, loss = generate_model("pp") @@ -87,31 +87,44 @@ class Test1F1BPass(unittest.TestCase): def test_pp_pass(self): # pp2 1f1b training with fleet executor os.environ['FLAGS_new_executor_micro_batching'] = 'False' - engine_1f1b = self.get_engine(use_standalone_exe=False) - history_1f1b = engine_1f1b.fit( + engine_fleet_1f1b = self.get_engine(schedule_mode="1F1B") + history_fleet_1f1b = engine_fleet_1f1b.fit( self.dataset, 3, batch_size=self.batch_size, log_freq=1 ) - assert engine_1f1b._strategy.pipeline.schedule_mode == "1F1B" + assert engine_fleet_1f1b._strategy.pipeline.schedule_mode == "1F1B" assert os.environ.get('FLAGS_new_executor_micro_batching') == "False" # pp2 fthenb training with standalone executor os.environ['FLAGS_new_executor_micro_batching'] = 'True' - engine_fthenb = self.get_engine(use_standalone_exe=True) + engine_fthenb = self.get_engine(schedule_mode="FThenB") history_fthenb = engine_fthenb.fit( self.dataset, 3, batch_size=self.batch_size, log_freq=1 ) assert engine_fthenb._strategy.pipeline.schedule_mode == "FThenB" assert os.environ.get('FLAGS_new_executor_micro_batching') == "True" + # pp2 1f1b training with standalone executor + os.environ['FLAGS_new_executor_micro_batching'] = 'True' + engine_1f1b = self.get_engine(schedule_mode="1F1B") + history_1f1b = engine_1f1b.fit( + self.dataset, 3, batch_size=self.batch_size, log_freq=1 + ) + assert engine_1f1b._strategy.pipeline.schedule_mode == "1F1B" + assert os.environ.get('FLAGS_new_executor_micro_batching') == "True" + # NOTE: every sample data from dataset is all the same if paddle.distributed.get_rank() == 1: - losses_1f1b = np.array(history_1f1b.history["loss"]) + losses_fleet_1f1b = np.array(history_fleet_1f1b.history["loss"]) losses_fthenb = np.array(history_fthenb.history["loss"]) + losses_1f1b = np.array(history_1f1b.history["loss"]) # accumulate_steps is 2 assert losses_fthenb[0].shape[0] == 2 - # losses_1f1b is the last loss of accumulate_steps + assert losses_1f1b[0].shape[0] == 2 + # losses_fleet_1f1b is the last loss of accumulate_steps # losses_fthenb is all the losses of accumulate_steps - self.check_results(losses_1f1b[0], losses_fthenb[0][-1]) + # losses_1f1b is alla the losses of accumulate_steps + self.check_results(losses_fleet_1f1b[0], losses_fthenb[0][-1]) + self.check_results(losses_fleet_1f1b[0], losses_1f1b[0][-1]) if __name__ == "__main__": diff --git a/test/auto_parallel/test_pipeline_scheduler_FThenB.py b/test/auto_parallel/test_pipeline_scheduler.py similarity index 96% rename from test/auto_parallel/test_pipeline_scheduler_FThenB.py rename to test/auto_parallel/test_pipeline_scheduler.py index fab5ed0863d93b2d9c5c83daa2708279539b9d0c..9739417f7aec98aa01b4ba8465f5660bfbd48d17 100644 --- a/test/auto_parallel/test_pipeline_scheduler_FThenB.py +++ b/test/auto_parallel/test_pipeline_scheduler.py @@ -23,7 +23,7 @@ class TestFThenBPass(unittest.TestCase): def test_pp2(self): file_dir = os.path.dirname(os.path.abspath(__file__)) launch_model_path = os.path.join( - file_dir, "pipeline_scheduler_FThenB.py" + file_dir, "pipeline_scheduler_unittest.py" ) if os.environ.get("WITH_COVERAGE", "OFF") == "ON":