提交 5821621d 编写于 作者: qq_22305325's avatar qq_22305325 提交者: GitHub

Merge branch 'master' into dev_compatible_Instruction_and_cfg_Instruction

Former-commit-id: 2c3d48b5c55558fe995dec48d338903dfadb426b
......@@ -101,8 +101,8 @@ class ComposeHob(BoolFunctor):
return ctx.composer2middle_op_arg_parallel_attr[self]
@bool_functor("MasterMachineOnly")
def MasterMachineOnly(ctx):
@bool_functor("SingleMachine")
def SingleMachine(ctx):
blob_device_ids = (
ctx.produced_blob_object.parallel_desc_symbol.machine_id2device_id_list
)
......
......@@ -232,6 +232,43 @@ def CopyHD(builder, produced_blob_object, consumer_op_arg_parallel_attr):
return BuildCopyHdInstruction(builder, produced_blob_object, op_device_tag)
BlobIsPartialSum = boxing_hob.producer_sbp_parallel.HasField("partial_sum_parallel")
OpArgIsBroadcast = boxing_hob.consumer_sbp_parallel.HasField("broadcast_parallel")
MatchInterNodeOneToMany = (
~boxing_hob.SingleMachine
& (boxing_hob.producer_parallel_desc.device_tag == "cpu")
& (boxing_hob.consumer_parallel_desc.device_tag == "cpu")
& (boxing_hob.producer_parallel_desc.parallel_num == 1)
& (boxing_hob.consumer_parallel_desc.parallel_num > 1)
& OpArgIsBroadcast
)
@boxing_condition(MatchInterNodeOneToMany)
def InterNodeOneToMany(builder, produced_blob_object, consumer_op_arg_parallel_attr):
out_blobs = []
consumer_dev_ids = (
consumer_op_arg_parallel_attr.parallel_desc_symbol.machine_id2device_id_list
)
for machine_id, device_ids in consumer_dev_ids.items():
for device_id in device_ids:
parallel_conf = placement_pb.ParallelConf()
parallel_conf.device_tag = "cpu"
parallel_conf.device_name.append("%s:%s" % (machine_id, device_id))
parallel_desc_symbol = builder.GetParallelDescSymbol(parallel_conf)
out_blob = builder.Build121To(produced_blob_object, parallel_desc_symbol)
out_blobs.append(out_blob)
return PackPhysicalBoxingBlobObjectsToLogical(
builder,
out_blobs,
consumer_op_arg_parallel_attr,
produced_blob_object.op_arg_blob_attr,
)
MatchInterNodeOneToOne = (
(boxing_hob.producer_parallel_desc.device_tag == "cpu")
& (boxing_hob.consumer_parallel_desc.device_tag == "cpu")
......@@ -250,13 +287,9 @@ MatchInterNodeOneToOne = (
@boxing_condition(MatchInterNodeOneToOne)
def InterNodeOneToOne(builder, produced_blob_object, consumer_op_arg_parallel_attr):
receive_blob_object = _MakeNewBlobObjectLike(
builder,
produced_blob_object,
consumer_op_arg_parallel_attr.parallel_desc_symbol,
return builder.Build121To(
produced_blob_object, consumer_op_arg_parallel_attr.parallel_desc_symbol
)
builder.Build121AssignInstruction(receive_blob_object, produced_blob_object)
return receive_blob_object
MatchCpuBroadcastOneToOne = (
......@@ -316,12 +349,8 @@ def VerboseOptionalBoxing(boxing_method):
return opt_boxing_method
BlobIsPartialSum = boxing_hob.producer_sbp_parallel.HasField("partial_sum_parallel")
OpArgIsBroadcast = boxing_hob.consumer_sbp_parallel.HasField("broadcast_parallel")
MatchNcclAllReduce = (
boxing_hob.MasterMachineOnly
boxing_hob.SingleMachine
& (boxing_hob.producer_parallel_desc.device_tag == "gpu")
& (boxing_hob.producer_parallel_desc == boxing_hob.consumer_parallel_desc)
& (boxing_hob.consumer_parallel_desc.parallel_num > 1)
......@@ -335,7 +364,7 @@ def GpuNcclAllReduce(builder, produced_blob_object, consumer_op_arg_parallel_att
parallel_conf = consumer_op_arg_parallel_attr.parallel_desc_symbol.parallel_conf
bn_in_op2blob_object = dict(in_0=produced_blob_object)
op_attribute = _GetEagerNcclAllReduce(parallel_conf, bn_in_op2blob_object)
builder.BoxingStatelessCall(
builder.NoBoxingStatelessCall(
op_attribute,
parallel_conf=parallel_conf,
bn_in_op2blob_object=bn_in_op2blob_object,
......@@ -373,8 +402,7 @@ MatchConcatManyToSplitMany = (
MatchNaiveCpuSplitToSplit = (
boxing_hob.MasterMachineOnly
& (boxing_hob.producer_parallel_desc.device_tag == "cpu")
(boxing_hob.producer_parallel_desc.device_tag == "cpu")
& (boxing_hob.consumer_parallel_desc.device_tag == "cpu")
& (MatchSplitOneToMany | MatchConcatManyToOne | MatchConcatManyToSplitMany)
)
......@@ -391,8 +419,7 @@ def NaiveCpuSplitToSplit(builder, produced_blob_object, consumer_op_arg_parallel
MatchNaiveCpuPartialSumToSplit = (
boxing_hob.MasterMachineOnly
& (boxing_hob.producer_parallel_desc.device_tag == "cpu")
(boxing_hob.producer_parallel_desc.device_tag == "cpu")
& (boxing_hob.consumer_parallel_desc.device_tag == "cpu")
& (boxing_hob.producer_parallel_desc.parallel_num > 1)
& boxing_hob.producer_sbp_parallel.HasField("partial_sum_parallel")
......@@ -424,17 +451,13 @@ def NaiveCpuRefPhysicalBlobObjectsScope(
physical_in_blob_objects = UnpackLogicalBoxingBlobObjectToPhysical(
builder, produced_blob_object
)
out_parallel_num = consumer_op_arg_parallel_attr.parallel_desc_symbol.parallel_num
consumer_parallel_desc_symbol = consumer_op_arg_parallel_attr.parallel_desc_symbol
out_parallel_num = consumer_parallel_desc_symbol.parallel_num
boxing_parallel_desc_symbol = GetConcatSplitBoxingParallelDescSymbol(
builder,
produced_blob_object.parallel_desc_symbol,
consumer_parallel_desc_symbol,
max(len(physical_in_blob_objects), out_parallel_num),
)
physical_in_blob_objects = RefBlobObjectWithParallelDesc(
builder,
physical_in_blob_objects,
[boxing_parallel_desc_symbol] * len(physical_in_blob_objects),
)
physical_output_blob_objects = get_physical_out_blob_objects(
builder=builder,
produced_blob_object=produced_blob_object,
......@@ -520,7 +543,7 @@ def BuildNaiveCpuBoxing(
bn_in_op2blob_object = {}
for i in range(len(physical_in_blob_objects)):
bn_in_op2blob_object["in_%s" % i] = physical_in_blob_objects[i]
builder.BoxingStatelessCall(
builder.NoBoxingStatelessCall(
op_attribute,
parallel_conf=boxing_parallel_desc_symbol.parallel_conf,
bn_in_op2blob_object=bn_in_op2blob_object,
......@@ -585,7 +608,7 @@ def UnpackLogicalBoxingBlobObjectToPhysical(builder, produced_blob_object):
MatchCpuBroadcastOneToMany = (
boxing_hob.MasterMachineOnly
boxing_hob.SingleMachine
& (boxing_hob.producer_parallel_desc.device_tag == "cpu")
& (boxing_hob.consumer_parallel_desc.device_tag == "cpu")
& boxing_hob.ProducerDevicesContainedInConsumerDevices
......@@ -605,8 +628,7 @@ def CpuBroadcastOneToMany(builder, produced_blob_object, consumer_op_arg_paralle
MatchBroadcastManyToOne = (
boxing_hob.MasterMachineOnly
& (
(
boxing_hob.producer_parallel_desc.device_tag
== boxing_hob.consumer_parallel_desc.device_tag
)
......@@ -673,7 +695,7 @@ def _BuildCopyInstruction(builder, produced_blob_object, op_conf, to_device_tag)
assert to_device_tag != x_device_tag, (to_device_tag, x_device_tag)
if to_device_tag == "cpu" and x_device_tag == "gpu":
x_parallel_conf = produced_blob_object.parallel_desc_symbol.parallel_conf
builder.BoxingCudaD2HStatelessCall(
builder.NoBoxingCudaD2HStatelessCall(
op_attribute, x_parallel_conf, bn_in_op2blob_object=bn_in_op2blob_object
)
elif to_device_tag == "gpu" and x_device_tag == "cpu":
......@@ -682,7 +704,7 @@ def _BuildCopyInstruction(builder, produced_blob_object, op_conf, to_device_tag)
)
out_parallel_conf = out_parallel_desc_symbol.parallel_conf
with builder.CudaHostPinBlob(produced_blob_object):
builder.BoxingCudaH2DStatelessCall(
builder.NoBoxingCudaH2DStatelessCall(
op_attribute,
out_parallel_conf,
bn_in_op2blob_object=bn_in_op2blob_object,
......@@ -721,19 +743,19 @@ def BuildAssignInstruction(builder, ref_blob_object, value_blob_object, op_conf)
bn_in_op2blob_object = {"ref": ref_blob_object, "value": value_blob_object}
op_attribute = op_infer_util.Infer(op_conf, bn_in_op2blob_object)
if ref_device_tag == value_device_tag:
builder.BoxingStatelessCall(
builder.NoBoxingStatelessCall(
op_attribute,
parallel_conf=ref_parallel_conf,
bn_in_op2blob_object=bn_in_op2blob_object,
)
elif ref_device_tag == "cpu" and value_device_tag == "gpu":
value_parallel_conf = value_blob_object.parallel_desc_symbol.parallel_conf
builder.BoxingCudaD2HStatelessCall(
builder.NoBoxingCudaD2HStatelessCall(
op_attribute, value_parallel_conf, bn_in_op2blob_object=bn_in_op2blob_object
)
elif ref_device_tag == "gpu" and value_device_tag == "cpu":
with builder.CudaHostPinBlob(value_blob_object):
builder.BoxingCudaH2DStatelessCall(
builder.NoBoxingCudaH2DStatelessCall(
op_attribute,
ref_parallel_conf,
bn_in_op2blob_object=bn_in_op2blob_object,
......@@ -766,26 +788,6 @@ def _GetEagerNcclAllReduce(parallel_conf, ibn2blob_object):
return op_infer_util.Infer(op_conf, ibn2blob_object)
def _MakeNewBlobObjectLike(builder, blob_object, new_parallel_desc_symbol):
op_conf = op_conf_pb.OperatorConf()
op_conf.name = id_util.UniqueStr("Input")
op_conf.device_tag = new_parallel_desc_symbol.device_tag
op_conf.input_conf.out = "out"
blob_object.op_arg_parallel_attr.DumpToToInterfaceBlobConf(
op_conf.input_conf.blob_conf
)
blob_object.op_arg_blob_attr.DumpToToInterfaceBlobConf(op_conf.input_conf.blob_conf)
op_conf.scope_symbol_id = oneflow.current_scope().symbol_id
upstream_signature = op_attribute_pb.OpNodeSignature()
op_attribute = c_api_util.InferOpConf(op_conf, upstream_signature)
parallel_conf = new_parallel_desc_symbol.parallel_conf
bn_in_op2blob_object = {}
builder.BoxingStatelessCall(
op_attribute, parallel_conf, bn_in_op2blob_object=bn_in_op2blob_object
)
return bn_in_op2blob_object["out"]
NcclAllReduce = Sequential(
boxing_middle.BoxingToMiddle(
GpuNcclAllReduce,
......@@ -823,6 +825,20 @@ BoxingInterNodeOneToOne = Sequential(
OptionalBoxing(CopyH2D),
)
BoxingInterNodeOneToMany = Sequential(
boxing_middle.BoxingToMiddle(
OptionalBoxing(CopyD2H),
boxing_middle.ReplaceProducerDeviceTag("cpu"),
boxing_middle.ProducerSbpParallel,
),
boxing_middle.BoxingToMiddle(
InterNodeOneToMany,
boxing_middle.ReplaceConsumerDeviceTag("cpu"),
boxing_middle.ConsumerSbpParallel,
),
OptionalBoxing(CopyH2D),
)
conditional_function_table = [
CopyH2D,
CopyD2H,
......@@ -830,6 +846,7 @@ conditional_function_table = [
# one to one
BoxingIntraNodeOneToOne,
BoxingInterNodeOneToOne,
BoxingInterNodeOneToMany,
# B -> B
BroadcastManyToOne,
Sequential(
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册