diff --git a/python/paddle/distributed/auto_parallel/graph.py b/python/paddle/distributed/auto_parallel/graph.py index d671d95aa9f4ada3f3f753e7e1ad4fa0510d7305..be27bd50867d735533704e1cc749267c6b049afb 100644 --- a/python/paddle/distributed/auto_parallel/graph.py +++ b/python/paddle/distributed/auto_parallel/graph.py @@ -123,6 +123,8 @@ class Graph: else: self._nodes[node_id].attrs.update(attrs) + return self._nodes[node_id] + def add_edge(self, src_id, tgt_id, **attrs): # add nodes if src_id is None: @@ -140,6 +142,7 @@ class Graph: # add the edge edge = Edge(src_id, tgt_id, **attrs) self._adjs[src_id][tgt_id] = edge + return edge def __len__(self): return len(self._nodes) diff --git a/python/paddle/distributed/auto_parallel/tuner/rule_based_tuner.py b/python/paddle/distributed/auto_parallel/tuner/rule_based_tuner.py index 83b4586af7c65da92d58653e10f968805122d9b7..cf057b4a62e7d05f5ce3d744290c5eace943ab35 100644 --- a/python/paddle/distributed/auto_parallel/tuner/rule_based_tuner.py +++ b/python/paddle/distributed/auto_parallel/tuner/rule_based_tuner.py @@ -12,6 +12,143 @@ # See the License for the specific language governing permissions and # limitations under the License. +from abc import ABC, abstractmethod + +from ..graph import Graph + + +_PATTERNS = {} + + +def register_pattern(cls): + """Register pattern for rule-based tuner.""" + name = cls.name + + def register(name): + global _PATTERNS + _PATTERNS[name] = cls() + + register(name) + + return cls + + +def convert_to_graph(ops, block): + """Convert ops to graph.""" + graph = Graph() + graph.attrs["var_to_id"] = {} # {var_name: node_id} + graph.attrs["id_to_var"] = {} # {node_id: var_name} + graph.attrs["op_to_id"] = {} # {op_id: node_id} + graph.attrs["id_to_op"] = {} # {node_id: op_id} + + node_id = -1 + for op in ops: + attrs = op.all_attrs() + attrs["type"] = op.type + node_id += 1 + + # create op node + op_node = graph.add_node(node_id, **attrs) + graph.attrs["op_to_id"][op.desc.id()] = op_node.id + graph.attrs["id_to_op"][op_node.id] = op.desc.id() + for input_name in op.input_names: + for var_name in op.input(input_name): + if var_name not in graph.attrs["var_to_id"]: + # create var node + node_id += 1 + var_node = graph.add_node(node_id) + var = block._var_recursive(var_name) + if var.is_parameter: + var_node.attrs["type"] = "param" + else: + var_node.attrs["type"] = "var" + graph.attrs["var_to_id"][var_name] = var_node.id + graph.attrs["id_to_var"][var_node.id] = var_name + else: + var_node_id = graph.attrs["var_to_id"][var_name] + var_node = graph._nodes[var_node_id] + + # create edge that input -> op + input_edge = graph.add_edge(var_node.id, op_node.id) + input_edge.attrs["input_name"] = input_name + + for output_name in op.output_names: + for var_name in op.output(output_name): + if var_name not in graph.attrs["var_to_id"]: + # create var node + node_id += 1 + var_node = graph.add_node(node_id) + var = block._var_recursive(var_name) + if var.is_parameter: + var_node.attrs["type"] = "param" + else: + var_node.attrs["type"] = "var" + graph.attrs["var_to_id"][var_name] = var_node.id + graph.attrs["id_to_var"][var_node.id] = var_name + else: + var_node_id = graph.attrs["var_to_id"][var_name] + var_node = graph._nodes[var_node_id] + + # create edge that op -> output + output_edge = graph.add_edge(op_node.id, var_node.id) + output_edge.attrs["output_name"] = output_name + + return graph + + +class BasePattern(ABC): + name = "base" + + def __init__(self): + self.graph = None + self.build() + + @abstractmethod + def build(self): + pass + + +@register_pattern +class QKVPattern(BasePattern): + name = "qkv" + + def __init__(self): + super().__init__() + + def build(self): + self.graph = Graph() + + query = self.graph.add_node(0, **{"type": "var"}) + + q_weight = self.graph.add_node(1, **{"dim": 2, "type": "param"}) + k_weight = self.graph.add_node(2, **{"dim": 2, "type": "param"}) + v_weight = self.graph.add_node(3, **{"dim": 2, "type": "param"}) + + q_matmul = self.graph.add_node(4, **{"type": "matmul_v2"}) + k_matmul = self.graph.add_node(5, **{"type": "matmul_v2"}) + v_matmul = self.graph.add_node(6, **{"type": "matmul_v2"}) + + q_x = self.graph.add_edge(0, 4, **{"input_name": "X"}) + k_x = self.graph.add_edge(0, 5, **{"input_name": "X"}) + v_x = self.graph.add_edge(0, 6, **{"input_name": "X"}) + q_y = self.graph.add_edge(1, 4, **{"input_name": "Y"}) + k_y = self.graph.add_edge(2, 5, **{"input_name": "Y"}) + v_y = self.graph.add_edge(3, 6, **{"input_name": "Y"}) + + q = self.graph.add_node(7, **{"type": "var"}) + k = self.graph.add_node(8, **{"type": "var"}) + v = self.graph.add_node(9, **{"type": "var"}) + + q_out = self.graph.add_edge(7, 4, **{"output_name": "Out"}) + k_out = self.graph.add_edge(8, 5, **{"output_name": "Out"}) + v_out = self.graph.add_edge(9, 6, **{"output_name": "Out"}) + + # Pattern + self.graph.attrs["shard_tensor"] = [ + (1, 2, 3), + [[-1, 0], [-1, 1]], + ] # 2-tuple such as (tensor_id, patterns) + class OperatorGroupUtil: common_starts = ["layer_norm", "matmul_v2", "matmul"] diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt index 5ba84df8395d3bdf70d711d2819959de5cd73c13..8486056984cf0d2bdf61edee0e433d8a097b5a26 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt @@ -119,5 +119,5 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_engine_api_error MODULES test_engine_api_error) py_test_modules(test_fp16_assign MODULES test_fp16_assign) py_test_modules(test_group_operators MODULES test_group_operators) - + py_test_modules(test_pattern MODULES test_pattern) endif() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_pattern.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_pattern.py new file mode 100644 index 0000000000000000000000000000000000000000..159def7617a2fdc7f89b1d79f55f5dc3e81fee1e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_pattern.py @@ -0,0 +1,133 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import unittest +import numpy as np + +import paddle +import paddle.static as static + +sys.path.append("..") +import auto_parallel_gpt_model as modeling +from auto_parallel_gpt_model import ( + GPTModel, + GPTForPretraining, + GPTPretrainingCriterion, +) + + +def get_gpt_model( + train_program, start_program, place, batch_size, sequence_len, vocab_size +): + with static.program_guard(train_program, start_program): + tokens = paddle.static.data( + name="tokens", shape=[batch_size, sequence_len], dtype='int64' + ) + position_ids = paddle.static.data( + name="position_ids", shape=[batch_size, sequence_len], dtype='int64' + ) + attention_mask = paddle.static.data( + name="attention_mask", + shape=[batch_size, 1, sequence_len, sequence_len], + dtype='float32', + ) + labels = paddle.static.data( + name="labels", shape=[batch_size, sequence_len], dtype='int64' + ) + loss_mask = paddle.static.data( + name="loss_mask", shape=[batch_size, sequence_len], dtype='float32' + ) + + gpt = GPTModel( + vocab_size=1000, + hidden_size=64, + num_hidden_layers=2, + num_attention_heads=8, + intermediate_size=256, + hidden_act="gelu", + hidden_dropout_prob=0.0, + attention_probs_dropout_prob=0.0, + max_position_embeddings=1024, + type_vocab_size=1, + initializer_range=0.02, + pad_token_id=0, + eos_token_id=7, + bos_token_id=0, + eol_token_id=3, + ) + + model = GPTForPretraining( + gpt, vocab_size=1000, hidden_size=64, initializer_range=0.02 + ) + preds = model(tokens, position_ids, attention_mask) + criterion = GPTPretrainingCriterion() + loss = criterion(preds, labels, loss_mask) + + def gen_data(): + np.random.seed(2021) + tokens = [] + position_ids = [] + attention_mask = [] + labels = [] + loss_mask = [] + for _ in range(batch_size): + tokens.append(np.random.randint(vocab_size, size=sequence_len)) + position_ids.append(np.arange(sequence_len)) + attention_mask.append([np.tril(np.ones(sequence_len))]) + labels.append(np.random.randint(vocab_size, size=sequence_len)) + loss_mask.append(np.ones(sequence_len)) + + return tokens, position_ids, attention_mask, labels, loss_mask + + return train_program, start_program, loss, gen_data + + +class TestGroupOperators(unittest.TestCase): + def test_gpt(self): + modeling.init_global() + train_program = static.Program() + start_program = static.Program() + place = paddle.set_device("gpu") + batch_size = 8 + sequence_len = 512 + vocab_size = 1000 + train_program, start_program, loss, gen_data = get_gpt_model( + train_program, + start_program, + place, + batch_size, + sequence_len, + vocab_size, + ) + from paddle.distributed.auto_parallel.tuner.rule_based_tuner import ( + RuleBasedTuner, + convert_to_graph, + _PATTERNS, + ) + from paddle.distributed.auto_parallel.dist_context import ( + DistributedContext, + ) + + dist_context = DistributedContext() + tuner = RuleBasedTuner(dist_context) + layers = tuner.group_operators(train_program.global_block().ops) + layer = layers[0] + graph = convert_to_graph(layer, train_program.global_block()) + print(graph) + print("qkv: ", _PATTERNS["qkv"].graph) + + +if __name__ == "__main__": + unittest.main()