提交 f32f84a4 编写于 作者: W WangXi 提交者: sandyhouse

update

上级 c7472f16
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..common import is_optimizer_op, OP_ROLE_KEY, OpRole
from paddle.fluid import unique_name
class OffloadHelper(object):
cpu_place_type = 0
cuda_place_type = 1
cuda_pinned_place_type = 2
def __init__(self):
pass
"0: dst is on CPUPlace. "
"1: dst is on CUDAPlace. "
"2: dst is on CUDAPinnedPlace. "
def _insert_memcpy_op(self, block, idx, src_name, dst_name, dst_place_type):
src_var = block.var(src_name)
dst_var = block.var(dst_name)
block._insert_op_without_sync(
idx,
type='memcpy',
inputs={'X': src_var},
outputs={'Out': dst_var},
attrs={
'dst_place_type': dst_place_type,
OP_ROLE_KEY: OpRole.Optimize,
})
def _insert_fetch_op(self, block, idx, src_name, dst_name):
self._insert_memcpy_op(block, idx, src_name, dst_name,
OffloadHelper.cuda_place_type)
def _insert_offload_op(self, block, idx, src_name, dst_name):
self._insert_memcpy_op(block, idx, src_name, dst_name,
OffloadHelper.cuda_pinned_place_type)
def _get_offload_var_name(self, name):
return unique_name.generate(name + '@offload')
def _create_offload_var(self, var_name, offload_var_name, blocks):
for block in blocks:
var = block.var(var_name)
var.persistable = False
offload_var = block.create_var(
name=offload_var_name,
shape=var.shape,
dtype=var.dtype,
persistable=True)
def offload(self, block, startup_block):
"""
(m1, m2) = prefetch(m1@offload, m2@offload)
(m1out, m2out, pout) = adam(m1, m2, p)
(m1@offload, m2@offload) = memcpy(m1, m2)
"""
vars_name_to_offload_name = dict()
# main_block add offload
for idx, op in reversed(list(enumerate(block.ops))):
if not is_optimizer_op(op):
break
vars_name = []
if op.type == "adam":
# {Moment1Out = [''], Moment2Out = [''], ParamOut = ['']} =
# adam(inputs={Moment1 = [''], Moment2 = [''], Param = ['']})
vars_name.append(op.desc.input("Moment1")[0])
vars_name.append(op.desc.input("Moment1")[0])
elif op.type == 'momentum':
pass
elif op.type == 'lars':
pass
elif op.type == 'lamb':
pass
# step1: create and init offload_var
for var_name in vars_name:
assert var_name not in vars_name_to_offload_name
offload_var_name = self._get_offload_var_name(var_name)
vars_name_to_offload_name[var_name] = offload_var_name
self._create_offload_var(var_name, offload_var_name,
[block, startup_block])
# step2: insert offload op
for var_name in vars_name:
offload_var_name = vars_name_to_offload_name[var_name]
self._insert_offload_op(block, idx + 1, var_name,
offload_var_name)
# step3: insert fetch op
for var_name in vars_name:
offload_var_name = vars_name_to_offload_name[var_name]
self._insert_fetch_op(block, idx, offload_var_name, var_name)
# startup_block add offload
visited_vars = set()
for idx, op in reversed(list(enumerate(startup_block.ops))):
for out_name in op.output_arg_names:
if out_name in visited_vars:
continue
if out_name in vars_name_to_offload_name:
var_name = out_name
offload_var_name = vars_name_to_offload_name[var_name]
# insert offload op after var is generated
self._insert_offload_op(startup_block, idx + 1, var_name,
offload_var_name)
visited_vars.add(out_name)
...@@ -22,6 +22,7 @@ from paddle.distributed.fleet.meta_optimizers.sharding.shard import Shard, Progr ...@@ -22,6 +22,7 @@ from paddle.distributed.fleet.meta_optimizers.sharding.shard import Shard, Progr
from paddle.distributed.fleet.meta_optimizers.sharding.fp16_helper import FP16Utils from paddle.distributed.fleet.meta_optimizers.sharding.fp16_helper import FP16Utils
from paddle.distributed.fleet.meta_optimizers.sharding.weight_decay_helper import WeightDecayHelper from paddle.distributed.fleet.meta_optimizers.sharding.weight_decay_helper import WeightDecayHelper
from paddle.distributed.fleet.meta_optimizers.sharding.gradient_clip_helper import GradientClipHelper from paddle.distributed.fleet.meta_optimizers.sharding.gradient_clip_helper import GradientClipHelper
from .sharding.offload_helper import OffloadHelper
from paddle.distributed.fleet.meta_optimizers.sharding.prune import ProgramDeps from paddle.distributed.fleet.meta_optimizers.sharding.prune import ProgramDeps
from paddle.distributed.fleet.meta_optimizers.sharding.utils import * from paddle.distributed.fleet.meta_optimizers.sharding.utils import *
...@@ -245,132 +246,137 @@ class ShardingOptimizer(MetaOptimizerBase): ...@@ -245,132 +246,137 @@ class ShardingOptimizer(MetaOptimizerBase):
# 'op_role': core.op_proto_and_checker_maker.OpRole.LRSched, # 'op_role': core.op_proto_and_checker_maker.OpRole.LRSched,
# }) # })
#def _create_var(block, ref_var, name): pass
# """ #def _create_var(block, ref_var, name):
# Create a new var for block, which has the same type, # """
# shape and dtype as ref_var, then rename it with the # Create a new var for block, which has the same type,
# name `name`. # shape and dtype as ref_var, then rename it with the
# """ # name `name`.
# new_var = block.create_var( # """
# name=name, # new_var = block.create_var(
# shape=ref_var.shape, # name=name,
# dtype=ref_var.dtype, # shape=ref_var.shape,
# type=ref_var.type, # dtype=ref_var.dtype,
# lod_level=ref_var.lod_level, # type=ref_var.type,
# persistable=ref_var.persistable, # lod_level=ref_var.lod_level,
# is_data=ref_var.is_data, # persistable=ref_var.persistable,
# need_check_feed=ref_var.desc.need_check_feed()) # is_data=ref_var.is_data,
# new_var.stop_gradient = ref_var.stop_gradient # need_check_feed=ref_var.desc.need_check_feed())
# return new_var # new_var.stop_gradient = ref_var.stop_gradient
# return new_var
#def _rename_arg(op, old_name, new_name):
# op_desc = op.desc #def _rename_arg(op, old_name, new_name):
# if isinstance(op_desc, tuple): # op_desc = op.desc
# op_desc = op_desc[0] # if isinstance(op_desc, tuple):
# op_desc._rename_input(old_name, new_name) # op_desc = op_desc[0]
# op_desc._rename_output(old_name, new_name) # op_desc._rename_input(old_name, new_name)
# op_desc._rename_output(old_name, new_name)
#print("params_grads:", params_grads)
#for param_name, grad_name in params_grads: #print("params_grads:", params_grads)
# if not self._shard.has_param(param_name): continue #for param_name, grad_name in params_grads:
# #if not main_block.has_var(grad_name): continue # if not self._shard.has_param(param_name): continue
# assert main_block.has_var(grad_name) # #if not main_block.has_var(grad_name): continue
# use_fp16 = False # assert main_block.has_var(grad_name)
# fp16_grad_name = param_name + '.cast_fp16@GRAD' # use_fp16 = False
# if main_block.has_var(grad_name): # fp16_grad_name = param_name + '.cast_fp16@GRAD'
# fp16_grad_var = main_block.vars[fp16_grad_name] # if main_block.has_var(grad_name):
# use_fp16 = True # fp16_grad_var = main_block.vars[fp16_grad_name]
# grad_var = main_block.vars[grad_name] # use_fp16 = True
# if use_fp16: # grad_var = main_block.vars[grad_name]
# cast_grad_var_name = paddle.fluid.unique_name.generate( # if use_fp16:
# grad_name) # cast_grad_var_name = paddle.fluid.unique_name.generate(
# cast_var = _create_var(main_block, fp16_grad_var, # grad_name)
# cast_grad_var_name) # cast_var = _create_var(main_block, fp16_grad_var,
# cast_var.persistable = False # cast_grad_var_name)
# main_block.append_op( # cast_var.persistable = False
# #index=offset + 1, # main_block.append_op(
# type='cast', # #index=offset + 1,
# inputs={'X': grad_var}, # type='cast',
# outputs={'Out': cast_var}, # inputs={'X': grad_var},
# attrs={ # outputs={'Out': cast_var},
# 'in_dtype': grad_var.dtype, # attrs={
# 'out_dtype': cast_var.dtype, # 'in_dtype': grad_var.dtype,
# 'op_role': # 'out_dtype': cast_var.dtype,
# core.op_proto_and_checker_maker.OpRole.Backward, # 'op_role':
# }) # core.op_proto_and_checker_maker.OpRole.Backward,
# #offset += 1 # })
# main_block.append_op( # #offset += 1
# #index=offset + 1, # main_block.append_op(
# type='sum', # #index=offset + 1,
# inputs={'X': [fp16_grad_var, cast_var]}, # type='sum',
# outputs={'Out': fp16_grad_var}, # inputs={'X': [fp16_grad_var, cast_var]},
# attrs={ # outputs={'Out': fp16_grad_var},
# 'op_role': # attrs={
# core.op_proto_and_checker_maker.OpRole.Backward, # 'op_role':
# 'op_role_var': op_role_var # core.op_proto_and_checker_maker.OpRole.Backward,
# }) # 'op_role_var': op_role_var
# })
# for index, op in reversed(tuple(enumerate(list(main_block.ops)))):
# offset = index # for index, op in reversed(tuple(enumerate(list(main_block.ops)))):
# if is_backward_op(op) and ( # offset = index
# 'op_role_var' in op.attr_names): # if is_backward_op(op) and (
# op_role_var = op.all_attrs()['op_role_var'] # 'op_role_var' in op.attr_names):
# op_role_var = op.all_attrs()['op_role_var']
# if len(op_role_var) == 0:
# continue # if len(op_role_var) == 0:
# assert len(op_role_var) % 2 == 0 # continue
# offset = index # assert len(op_role_var) % 2 == 0
# for i in range(0, len(op_role_var), 2): # offset = index
# grad_name = op_role_var[i + 1] # for i in range(0, len(op_role_var), 2):
# if not main_block.has_var(grad_name): continue # grad_name = op_role_var[i + 1]
# grad_var = main_block.vars[grad_name] # if not main_block.has_var(grad_name): continue
# if not 'cast_fp16' in grad_name: # grad_var = main_block.vars[grad_name]
# new_grad_var_name = paddle.fluid.unique_name.generate(grad_name) # if not 'cast_fp16' in grad_name:
# new_var = _create_var(main_block, grad_var, # new_grad_var_name = paddle.fluid.unique_name.generate(grad_name)
# new_grad_var_name) # new_var = _create_var(main_block, grad_var,
# new_var.persistable = False # new_grad_var_name)
# _rename_arg(op, grad_name, new_grad_var_name) # new_var.persistable = False
# main_block._insert_op( # _rename_arg(op, grad_name, new_grad_var_name)
# index=offset + 1, # main_block._insert_op(
# type='sum', # index=offset + 1,
# inputs={'X': [grad_var, new_var]}, # type='sum',
# outputs={'Out': grad_var}, # inputs={'X': [grad_var, new_var]},
# attrs={ # outputs={'Out': grad_var},
# 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, # attrs={
# 'op_role_var': op_role_var # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward,
# }) # 'op_role_var': op_role_var
# offset += 1 # })
# if 'cast_fp16' in grad_name: # offset += 1
# param_name = op_role_var[i] # if 'cast_fp16' in grad_name:
# fp32_grad_var_name = param_name + "@GRAD" # param_name = op_role_var[i]
# fp32_grad_var = main_block.vars[grad_name] # fp32_grad_var_name = param_name + "@GRAD"
# cast_grad_var_name = paddle.fluid.unique_name.generate( # fp32_grad_var = main_block.vars[grad_name]
# fp32_grad_var_name) # cast_grad_var_name = paddle.fluid.unique_name.generate(
# cast_var = _create_var(main_block, grad_var, # fp32_grad_var_name)
# cast_grad_var_name) # cast_var = _create_var(main_block, grad_var,
# cast_var.persistable = False # cast_grad_var_name)
# main_block._insert_op( # cast_var.persistable = False
# index=offset + 1, # main_block._insert_op(
# type='cast', # index=offset + 1,
# inputs={'X': fp32_grad_var}, # type='cast',
# outputs={'Out': cast_var}, # inputs={'X': fp32_grad_var},
# attrs={ # outputs={'Out': cast_var},
# 'in_dtype': fp32_grad_var.dtype, # attrs={
# 'out_dtype': cast_var.dtype, # 'in_dtype': fp32_grad_var.dtype,
# 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, # 'out_dtype': cast_var.dtype,
# # self._op_role_var_key: op_role_var # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward,
# }) # # self._op_role_var_key: op_role_var
# offset += 1 # })
# main_block._insert_op( # offset += 1
# index=offset + 1, # main_block._insert_op(
# type='sum', # index=offset + 1,
# inputs={'X': [grad_var, cast_var]}, # type='sum',
# outputs={'Out': grad_var}, # inputs={'X': [grad_var, cast_var]},
# attrs={ # outputs={'Out': grad_var},
# 'op_role': core.op_proto_and_checker_maker.OpRole.Backward, # attrs={
# 'op_role_var': op_role_var}) # 'op_role': core.op_proto_and_checker_maker.OpRole.Backward,
# 'op_role_var': op_role_var})
main_block._sync_with_cpp() main_block._sync_with_cpp()
# TODO(wangxi): add optimize offload
offload_helper = OffloadHelper()
offload_helper.offload(main_block, startup_block)
with open("start_sharding_%d" % self.role_maker._worker_index(), with open("start_sharding_%d" % self.role_maker._worker_index(),
'w') as f: 'w') as f:
f.writelines(str(startup_block.program)) f.writelines(str(startup_block.program))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册