framework.py 5.8 KB
Newer Older
W
wangguanzhong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. 
#   
# Licensed under the Apache License, Version 2.0 (the "License");   
# you may not use this file except in compliance with the License.  
# You may obtain a copy of the License at   
#   
#     http://www.apache.org/licenses/LICENSE-2.0    
#   
# Unless required by applicable law or agreed to in writing, software   
# distributed under the License is distributed on an "AS IS" BASIS, 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
# See the License for the specific language governing permissions and   
# limitations under the License.

import os
import numpy as np
import math
import paddle
from collections import defaultdict
import ppcv
from ppcv.ops import *
from ppcv.utils.helper import get_output_keys, gen_input_name
from ppcv.core.workspace import create


class DAG(object):
    """
    Directed Acyclic Graph(DAG) engine, builds one DAG topology.
    """

    def __init__(self, cfg):
        self.graph, self.rev_graph, self.in_degrees = self.build_dag(cfg)
        self.num = len(self.in_degrees)

    def build_dag(self, cfg):
        graph = defaultdict(list)  # op -> next_op
        unique_name = set()
        unique_name.add('input')
        rev_graph = defaultdict(list)  # op -> last_op
        for op in cfg:
            op_dict = list(op.values())[0]
            unique_name.add(op_dict['name'])

        in_degrees = dict((u, 0) for u in unique_name)
        for op in cfg:
            op_cfg = list(op.values())[0]
            inputs = op_cfg['Inputs']
            for input in inputs:
                last_op = input.split('.')[0]
                graph[last_op].append(op_cfg['name'])
                rev_graph[op_cfg['name']].append(last_op)
                in_degrees[op_cfg['name']] += 1
        return graph, rev_graph, in_degrees

    def get_graph(self):
        return self.graph

    def get_reverse_graph(self):
        return self.rev_graph

    def topo_sort(self):
        """
        Topological sort of DAG, creates inverted multi-layers views.
        Args:
            graph (dict): the DAG stucture
            in_degrees (dict): Next op list for each op
        Returns:
            sort_result: the hierarchical topology list. examples:
                DAG :[A -> B -> C -> E]
                            \-> D /
                sort_result: [A, B, C, D, E]
        """

        # Select vertices with in_degree = 0
        Q = [u for u in self.in_degrees if self.in_degrees[u] == 0]
        sort_result = []
        while Q:
            u = Q.pop()
            sort_result.append(u)
            for v in self.graph[u]:
                # remove output degrees
                self.in_degrees[v] -= 1
                # re-select vertices with in_degree = 0
                if self.in_degrees[v] == 0:
                    Q.append(v)
        if len(sort_result) == self.num:
            return sort_result
        else:
            return None


class Executor(object):
    """
    The executor which implements model series pipeline

    Args:
        model_cfg: The models configuration
W
wangguanzhong 已提交
98
        env_cfg: The enrionment configuration
W
wangguanzhong 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
    """

    def __init__(self, model_cfg, env_cfg):
        dag = DAG(model_cfg)
        self.order = dag.topo_sort()
        self.model_cfg = model_cfg

        self.op_name2op = {}
        self.has_output_op = False
        for op in model_cfg:
            op_arch = list(op.keys())[0]
            op_cfg = list(op.values())[0]
            op_name = op_cfg['name']
            op = create(op_arch, op_cfg, env_cfg)
            self.op_name2op[op_name] = op
            if op.type() == 'OUTPUT':
                self.has_output_op = True

        self.output_keys = get_output_keys(model_cfg)
        self.last_ops_dict = dag.get_reverse_graph()
        self.input_dep = self.reset_dep()

    def reset_dep(self, ):
        return self.build_dep(self.model_cfg, self.output_keys)

    def build_dep(self, cfg, output_keys):
        # compute the output degree for each input name
        dep = dict()
        for op in cfg:
            inputs = list(op.values())[0]['Inputs']
            for name in inputs:
                if name in dep:
                    dep[name] += 1
                else:
                    dep.update({name: 1})
        return dep

    def update_res(self, results, op_outputs, input_name):
        # step1: remove the result when keys not used in later input
        for res, out in zip(results, op_outputs):
            if self.has_output_op:
                del_name = []
                for k in out.keys():
                    if k not in self.input_dep:
                        del_name.append(k)
                # remove the result when keys not used in later input
                for name in del_name:
                    del out[name]
            res.update(out)

        # step2: if the input name is no longer used, then result will be deleted  
        if self.has_output_op:
            for name in input_name:
                self.input_dep[name] -= 1
                if self.input_dep[name] == 0:
                    for res in results:
                        del res[name]

    def run(self, input, frame_id=-1):
        self.input_dep = self.reset_dep()
        # execute each operator according to toposort order
        results = input
        for i, op_name in enumerate(self.order[1:]):
            op = self.op_name2op[op_name]
            op.set_frame(frame_id)
            last_ops = self.last_ops_dict[op_name]
            input_keys = op.get_input_keys()
            output_keys = list(results[0].keys())
            input = op.filter_input(results, input_keys)
            last_op_output = op(input)
            if op.type() != 'OUTPUT':
                op.check_output(last_op_output, op_name)
                self.update_res(results, last_op_output, input_keys)
            else:
                results = last_op_output

        return results