distribute_transpiler.py 11.9 KB
Newer Older
T
typhoonzero 已提交
1
from __future__ import print_function
T
done  
typhoonzero 已提交
2 3 4 5
import framework
from framework import Program, default_main_program, Parameter, Variable
import optimizer
from layer_helper import LayerHelper
T
typhoonzero 已提交
6
from distributed_spliter import *
T
done  
typhoonzero 已提交
7 8


T
typhoonzero 已提交
9 10 11 12 13 14
class VarBlock:
    def __init__(self, varname, offset, size):
        self.varname = varname
        # NOTE: real offset is offset * size
        self.offset = offset
        self.size = size
T
done  
typhoonzero 已提交
15

T
typhoonzero 已提交
16 17
    def __str__(self):
        return "%s:%d:%d" % (self.varname, self.offset, self.size)
T
done  
typhoonzero 已提交
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44


class DistributeTranspiler:
    def transpile(self,
                  optimize_ops,
                  params_grads,
                  program=None,
                  pservers="127.0.0.1:6174",
                  trainers=1,
                  split_method=round_robin):
        """
            Transpile the program to a distributed data-parallelism programs.
            The main_program will be transform to use a remote parameter server
            to do parameter optimization. And the optimization graph will be put
            in to a parameter server program.

            Use different methods to split trainable varialbles to different
            parameter servers.

            :param optimize_ops: op list of optimization, should be the
                                 return value of Optimizer.minimize
            :type optimize_ops: list
            :param program: program to optimize, default default_main_program
            :param pservers: parameter server endpoints like "m1:6174,m2:6174"
            :type pservers: string
            :return: return a list of programs
        """
T
typhoonzero 已提交
45
        assert (callable(split_method))
T
done  
typhoonzero 已提交
46 47
        if program is None:
            program = default_main_program()
T
typhoonzero 已提交
48
        self.program = program
T
done  
typhoonzero 已提交
49
        self.trainers = trainers
T
typhoonzero 已提交
50
        self.optimize_ops = optimize_ops
T
typhoonzero 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
        # steps to transpile:
        # 1. split variable to multiple blocks, align by product(dim[1:]) (width).
        # 2. modify trainer program add split_op to each Grad.
        # 3. append send_op to trainer.
        # 4. append concat_op to trainer to update local weights.
        # 5. create new program as parameter server.
        # 5. create parameter server program by split_method generated endpoint->VarBlock
        # 6. run compile time infershape for parameter server program

        if kwargs.has_key("split_method"):
            split_method = kwargs["split_method"]
        else:
            split_method = round_robin
        pserver_endpoints = kwargs["pservers"].split(",")

        grad2param = dict()
        for param, grad in params_and_grads:
            grad2param[grad.name()] = param.name()

        # step1
        param_list = [pg[0] for pg in params_and_grads]
        grad_list = [pg[1] for pg in params_and_grads]
        # TODO: add split selected rows support
        grad_blocks = _split_dense_variable(grad_list, len(pserver_endpoints))
        param_blocks = _split_dense_variable(param_list, len(pserver_endpoints))
        ep2gradblock = split_method(grad_blocks, pserver_endpoints)
        # self.param_grad_map
        # step2
        var2splited = self._split_trainer_vars(program, grad_blocks)

        # step3
        send_inputs = []
        send_outputs = []
        for _, splited in var2splited.iteritems():
            send_inputs.extend(splited)
        send_outputs = self._create_vars_from_blocklist(program, param_blocks)

        send_op = program.global_block().append_op(
            type="send",
            inputs={"X": send_inputs},
            outputs={"Out": send_outputs},
            attrs={"endpoints": pserver_endpoints,
                   "epmap": epmap})

    def _create_vars_from_blocklist(self, program, block_list):
        block_map = dict()
        ret_vars = []
        for block_str in block_list:
            varname, offset, size = block_str.split(":")
            if not block_map.has_key(varname):
                block_map[varname] = []
            block_map[varname].append((long(offset), long(size)))

        for varname, splited in block_map.iteritems():
            orig_var = program.global_block().vars[varname]
            for block in splited:
                size = block[1]
                var = program.global_block().create_var(
                    name="%s.block%d" % (varname, i),
                    psersistable=False,
                    dtype=orig_var.dtype,
                    shape=[1, size])  # flattend splited var
                ret_vars.append(var)
        return ret_vars
T
done  
typhoonzero 已提交
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140

    def _clone_param(self, block, v):
        assert isinstance(v, Parameter)
        new_p = Parameter(
            block=block,
            shape=v.shape,
            dtype=v.dtype,
            type=v.type,
            lod_level=v.lod_level,
            stop_gradient=v.stop_gradient,
            trainable=v.trainable,
            optimize_attr=v.optimize_attr,
            regularizer=v.regularizer,
            name=v.name)
        block.vars[new_p.name] = new_p

    def _clone_var(self, block, var):
        assert isinstance(var, Variable)
        return block.create_var(
            name=var.name,
            shape=var.shape,
            dtype=var.dtype,
            type=var.type,
            lod_level=var.lod_level,
            persistable=var.persistable)

T
typhoonzero 已提交
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
    def _split_dense_variable(self,
                              var_list,
                              pserver_count,
                              min_block_size=1024,
                              max_block_size=1048576):
        """
            We may need to split dense tensor to one or several blocks and put
            them equally onto parameter server. One block is a sub-tensor
            aligned by dim[0] of the tensor.
            
            We need to have a minimal block size so that the calculations in
            the parameter server side can gain better performance. By default
            mininum block size is 1024. The max block size is used to prevent
            too large block that may causing send error.
        """
        block_sizes = []
        blocks = []
        for grad in var_list:
            dim1 = reduce(lambda x, y: x * y, grad.shape[1:])
            grad_numel = reduce(lambda x, y: x * y, grad.shape)
            if grad_numel < min_block_size:
                block_sizes.append(grad_numel)
            block_size = grad_numel / min_block_size
            if block_size < min_block_size:
                block_size = min_block_size
            # align by dim1(width)
            remains = block_size % dim1
            if remains != 0:
                block_size += dim1 - remains
            block_sizes.append(block_size)
            num_blocks = grad_numel / block_size
            print("grad numel :%d, blocksize: %d" % grad_numel, block_size)
            for block_id in xrange(num_blocks):
                block = VarBlock(grad.name(), block_id, block_size)
                blocks.append(str(block))
        return blocks

    def _split_trainer_vars(self, program, gradblocks, params_and_grads):
        var2blocks = dict()
        splited = dict()
        for block_str in gradblocks:
            varname, offset, size = block_str.split(":")
            if not var2blocks.has_key(varname):
                var2blocks[varname] = []
            var2blocks[varname].append((long(offset), long(size)))
        for varname, blocks in var2blocks.iteritems():
            orig_var = program.global_block().vars[varname]
            split_outs = []
            for i in xrange(len(blocks)):
                size = blocks[i][1]
                var = program.global_block().create_var(
                    name="%s.block%d" % (varname, i),
                    psersistable=False,
                    dtype=orig_var.dtype,
                    shape=[1, size])  # flattend splited var
                split_outs.append(var)

            splited[varname] = split_outs
            program.global_block().append_op(
                type="split",
                inputs={"X": orig_var},
                outputs={"Out": split_outs},
                attrs={"num": len(blocks)}  # assume split evenly
            )
        return splited

    def _concat_trainer_vars(self, program, splited):
        for varname, to_merge_list in splited.iteritems():
            orig_var = program.global_block().vars[varname]
            program.global_block().append_op(
                type="concat",
                inputs={"X": to_merge_list},
                outputs={"Out": orig_var},
                attrs={})
T
done  
typhoonzero 已提交
215

T
typhoonzero 已提交
216
    def get_trainer_program(self):
T
typhoonzero 已提交
217
        # remove optimize ops and add a send op to main_program
T
typhoonzero 已提交
218 219
        self.program.global_block().delete_ops(self.optimize_ops)
        return self.program
T
typhoonzero 已提交
220

T
done  
typhoonzero 已提交
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
    def _create_var_for_trainers(self, block, var, trainers):
        var_list = []
        for i in xrange(trainers):
            var_each = block.create_var(
                name="%s.trainer_%d" % (var.name, i),
                psersistable=var.persistable,
                dtype=var.dtype,
                shape=var.shape)
            var_list.append(var_each)
        return var_list

    def get_pserver_program(self, endpoint, optimize_ops):
        pserver_program = Program()
        for v in self.param_grad_map[endpoint]["params"]:
            self._clone_param(pserver_program.global_block(), v)

        optimize_sub_program = Program()
        grad_var_names = [
            var.name for var in self.param_grad_map[endpoint]["grads"]
        ]
        for opt_op in optimize_ops:
            for _, var in opt_op.inputs.iteritems():
                # NOTE: append operators to merge gradients from multiple
                # trainers. If trainers == 1, this is not needed.
                if self.trainers > 1 and var.name in grad_var_names:
                    vars2merge = self._create_var_for_trainers(
                        optimize_sub_program.global_block(), var, self.trainers)
                    merged_var = optimize_sub_program.global_block().create_var(
                        name=var.name,
                        persistable=var.persistable,
                        dtype=var.dtype,
                        shape=var.shape)
                    optimize_sub_program.global_block().append_op(
                        type="sum",
                        inputs={"X": vars2merge},
                        outputs={"Out": merged_var})
                    optimize_sub_program.global_block().append_op(
                        type="scale",
                        inputs={"X": merged_var},
                        outputs={"Out": merged_var},
                        attrs={"scale": 1.0 / float(self.trainers)})
                else:
                    optimize_sub_program.global_block().create_var(
                        name=var.name,
                        persistable=var.persistable,
                        dtype=var.dtype,
                        shape=var.shape)
T
typhoonzero 已提交
268 269 270 271 272 273 274 275 276 277 278 279 280 281

            if opt_op.inputs.has_key("Grad"):
                if opt_op.inputs["Grad"].name in grad_var_names:
                    optimize_sub_program.global_block().append_op(
                        type=opt_op.type,
                        inputs=opt_op.inputs,
                        outputs=opt_op.outputs,
                        attrs=opt_op.attrs)
            else:
                optimize_sub_program.global_block().append_op(
                    type=opt_op.type,
                    inputs=opt_op.inputs,
                    outputs=opt_op.outputs,
                    attrs=opt_op.attrs)
T
done  
typhoonzero 已提交
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
        pserver_program.global_block().append_op(
            type="recv",
            inputs={"RX":
                    self.param_grad_map[endpoint]["grads"]},  # grads to recv
            outputs={},
            attrs={
                "OptimizeProgram": optimize_sub_program.desc,
                "endpoint": endpoint,
                "ParamList":
                [p.name for p in self.param_grad_map[endpoint]["params"]],
                "GradList":
                [p.name for p in self.param_grad_map[endpoint]["grads"]],
                "Trainers": self.trainers
            })
        pserver_program.sync_with_cpp()
        return pserver_program