未验证 提交 133e05c1 编写于 作者: L LiYuRio 提交者: GitHub

add 1f1b pass (#54787)

上级 ffc1b027
......@@ -272,6 +272,7 @@ class PipelineFThenBPass(PassBase):
job_list = []
lr_job = core.Job("lr")
job_list.append(lr_job)
for i in range(self._num_micro_batches):
forward_job = core.Job("forward")
forward_job.set_micro_batch_id(i)
......@@ -298,10 +299,76 @@ class PipelineFThenBPass(PassBase):
context.set_attr("plan", plan)
@register_pass("pipeline_scheduler_1F1B")
class Pipeline1F1BPass(PassBase):
def __init__(self):
super().__init__()
def _check_self(self):
return True
def _check_conflict(self, other_pass):
return True
def _create_job_list(self):
job_list = []
lr_job = core.Job("lr")
job_list.append(lr_job)
assert (
self._pp_degree <= self._num_micro_batches
), "Num of micro batches should larger than pp degree."
micro_batch_in_warmup = self._pp_degree - self._pp_stage
micro_batch_in_1f1b = self._num_micro_batches - micro_batch_in_warmup
forward_micro_batch_id = 0
for i in range(micro_batch_in_warmup):
forward_job = core.Job("forward")
forward_job.set_micro_batch_id(forward_micro_batch_id)
job_list.append(forward_job)
forward_micro_batch_id += 1
backward_micro_batch_id = 0
for i in range(micro_batch_in_1f1b):
backward_job = core.Job("backward")
backward_job.set_micro_batch_id(backward_micro_batch_id)
job_list.append(backward_job)
backward_micro_batch_id += 1
forward_job = core.Job("forward")
forward_job.set_micro_batch_id(forward_micro_batch_id)
job_list.append(forward_job)
forward_micro_batch_id += 1
for i in range(micro_batch_in_warmup):
backward_job = core.Job("backward")
backward_job.set_micro_batch_id(backward_micro_batch_id)
job_list.append(backward_job)
backward_micro_batch_id += 1
opt_job = core.Job("optimizer")
job_list.append(opt_job)
return job_list
def _apply_single_impl(self, main_program, startup_program, context):
self._num_micro_batches = self.get_attr("num_micro_batches")
self._pp_stage = self.get_attr("pp_stage")
self._pp_degree = self.get_attr("pp_degree")
self._program = main_program
_insert_sync_for_fthenb_1f1b(self._program)
type_to_program = _program_for_fthenb_and_1f1b(self._program)
job_list = self._create_job_list()
plan = core.Plan(job_list, type_to_program)
context.set_attr("plan", plan)
def apply_pass(main_program, startup_program, pass_name, pass_attr={}):
assert pass_name in [
"FThenB"
], "pipeline scheduler only support FThenB, but recieve {}".format(
"FThenB",
"1F1B",
], "pipeline scheduler only support FThenB and 1F1B, but recieve {}".format(
pass_name
)
pipeline_pass = new_pass("pipeline_scheduler_" + pass_name, pass_attr)
......
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from paddle import static
from paddle.distributed.passes import PassContext, new_pass
class TestStandaloneExecutor1F1BPlan(unittest.TestCase):
def test_standalone_executor_1f1b_plan_stage0(self):
config = {"num_micro_batches": 8, "pp_stage": 0, "pp_degree": 4}
pass_context = PassContext()
startup_program = static.Program()
main_program = static.Program()
pipeline_1f1b_pass = new_pass("pipeline_scheduler_1F1B", config)
pipeline_1f1b_pass.apply(
[main_program], [startup_program], pass_context
)
plan = pass_context.get_attr("plan")
job_type_list = []
micro_batch_id_list = []
for job in plan.job_list():
job_type_list.append(job.type())
micro_batch_id_list.append(job.micro_batch_id())
expect_job_type_list = [
"lr",
"forward",
"forward",
"forward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"backward",
"backward",
"backward",
"optimizer",
]
expect_micro_batch_id_list = [
0,
0,
1,
2,
3,
0,
4,
1,
5,
2,
6,
3,
7,
4,
5,
6,
7,
0,
]
self.assertEqual(job_type_list, expect_job_type_list)
self.assertEqual(micro_batch_id_list, expect_micro_batch_id_list)
def test_standalone_executor_1f1b_plan_stage1(self):
config = {"num_micro_batches": 8, "pp_stage": 1, "pp_degree": 4}
pass_context = PassContext()
startup_program = static.Program()
main_program = static.Program()
pipeline_1f1b_pass = new_pass("pipeline_scheduler_1F1B", config)
pipeline_1f1b_pass.apply(
[main_program], [startup_program], pass_context
)
plan = pass_context.get_attr("plan")
job_type_list = []
micro_batch_id_list = []
for job in plan.job_list():
job_type_list.append(job.type())
micro_batch_id_list.append(job.micro_batch_id())
expect_job_type_list = [
"lr",
"forward",
"forward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"backward",
"backward",
"optimizer",
]
expect_micro_batch_id_list = [
0,
0,
1,
2,
0,
3,
1,
4,
2,
5,
3,
6,
4,
7,
5,
6,
7,
0,
]
self.assertEqual(job_type_list, expect_job_type_list)
self.assertEqual(micro_batch_id_list, expect_micro_batch_id_list)
def test_standalone_executor_1f1b_plan_stage2(self):
config = {"num_micro_batches": 8, "pp_stage": 2, "pp_degree": 4}
pass_context = PassContext()
startup_program = static.Program()
main_program = static.Program()
pipeline_1f1b_pass = new_pass("pipeline_scheduler_1F1B", config)
pipeline_1f1b_pass.apply(
[main_program], [startup_program], pass_context
)
plan = pass_context.get_attr("plan")
job_type_list = []
micro_batch_id_list = []
for job in plan.job_list():
job_type_list.append(job.type())
micro_batch_id_list.append(job.micro_batch_id())
expect_job_type_list = [
"lr",
"forward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"backward",
"optimizer",
]
expect_micro_batch_id_list = [
0,
0,
1,
0,
2,
1,
3,
2,
4,
3,
5,
4,
6,
5,
7,
6,
7,
0,
]
self.assertEqual(job_type_list, expect_job_type_list)
self.assertEqual(micro_batch_id_list, expect_micro_batch_id_list)
def test_standalone_executor_1f1b_plan_stage3(self):
config = {"num_micro_batches": 8, "pp_stage": 3, "pp_degree": 4}
pass_context = PassContext()
startup_program = static.Program()
main_program = static.Program()
pipeline_1f1b_pass = new_pass("pipeline_scheduler_1F1B", config)
pipeline_1f1b_pass.apply(
[main_program], [startup_program], pass_context
)
plan = pass_context.get_attr("plan")
job_type_list = []
micro_batch_id_list = []
for job in plan.job_list():
job_type_list.append(job.type())
micro_batch_id_list.append(job.micro_batch_id())
expect_job_type_list = [
"lr",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"forward",
"backward",
"optimizer",
]
expect_micro_batch_id_list = [
0,
0,
0,
1,
1,
2,
2,
3,
3,
4,
4,
5,
5,
6,
6,
7,
7,
0,
]
self.assertEqual(job_type_list, expect_job_type_list)
self.assertEqual(micro_batch_id_list, expect_micro_batch_id_list)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册