提交 45c3c5b9 编写于 作者: C cheng cheng 提交者: Li Xinqi

Dev cc comm net empty (#2554)

* copy_comm_net pass when regst dynamic data is empty

* refine name and remove useless back insert in blob; and unittest scripts

* more strict check shape and add log

* Regster::GetSoleBlob()
上级 426e8aa8
......@@ -4,6 +4,24 @@
namespace oneflow {
namespace {
bool IsSoleBlobAndDynamicEmpty(Regst* regst) {
if (regst == nullptr) { return false; }
if (regst->GetBlobSize() != 1) { return false; }
Blob* sole_blob = regst->GetMutSoleBlob();
if (sole_blob->num_of_tensor_list_slices() != 1) { return false; }
if (sole_blob->total_num_of_tensors() != 1) { return false; }
if (!regst->GetSoleBlob()->IsBodyEmpty()) { return false; }
const auto& shape = sole_blob->shape();
for (int i = 0; i < shape.NumAxes(); ++i) {
if (shape.At(i) != 0) { return false; }
}
return true;
}
} // namespace
ActorMsg ActorMsg::BuildRegstMsgToConsumer(int64_t producer, int64_t consumer,
Regst* regst_raw_ptr) {
ActorMsg msg;
......@@ -18,6 +36,8 @@ ActorMsg ActorMsg::BuildRegstMsgToConsumer(int64_t producer, int64_t consumer,
msg.regst_wrapper_.comm_net_token = regst_raw_ptr->comm_net_token();
}
msg.regst_wrapper_.regst_status = regst_raw_ptr->status();
msg.regst_wrapper_.has_sole_empty_tensor_in_sole_tensor_list =
IsSoleBlobAndDynamicEmpty(regst_raw_ptr);
return msg;
}
......@@ -29,6 +49,8 @@ ActorMsg ActorMsg::BuildRegstMsgToProducer(int64_t consumer, int64_t producer,
msg.msg_type_ = ActorMsgType::kRegstMsg;
msg.regst_wrapper_.regst = regst_raw_ptr;
msg.regst_wrapper_.comm_net_token = nullptr;
// you can NOT access the regst ptr when multi nodes, because the address is in another machine
msg.regst_wrapper_.has_sole_empty_tensor_in_sole_tensor_list = false;
return msg;
}
......@@ -89,6 +111,11 @@ void* ActorMsg::comm_net_token() const {
return regst_wrapper_.comm_net_token;
}
bool ActorMsg::has_sole_empty_tensor_in_sole_tensor_list() const {
CHECK_EQ(msg_type_, ActorMsgType::kRegstMsg);
return regst_wrapper_.has_sole_empty_tensor_in_sole_tensor_list;
}
int64_t ActorMsg::eord_regst_desc_id() const {
CHECK_EQ(msg_type_, ActorMsgType::kEordMsg);
return eord_regst_desc_id_;
......
......@@ -39,6 +39,7 @@ class ActorMsg final {
int64_t piece_id() const;
int64_t act_id() const;
void* comm_net_token() const;
bool has_sole_empty_tensor_in_sole_tensor_list() const;
int64_t eord_regst_desc_id() const;
// Serialize
......@@ -56,6 +57,7 @@ class ActorMsg final {
Regst* regst;
void* comm_net_token;
RegstStatus regst_status;
bool has_sole_empty_tensor_in_sole_tensor_list;
};
int64_t src_actor_id_;
......
......@@ -55,6 +55,8 @@ bool CopyCommNetActor::NormalTryProcessReadableMsgFromOtherMachine(const ActorMs
regst_ctx.regst_raw_ptr = msg.regst();
regst_ctx.producer = msg.src_actor_id();
regst_ctx.act_id = msg.act_id();
regst_ctx.has_sole_empty_tensor_in_sole_tensor_list =
msg.has_sole_empty_tensor_in_sole_tensor_list();
CHECK(piece_id2regst_ctx_.emplace(msg.piece_id(), regst_ctx).second);
return true;
}
......@@ -66,9 +68,23 @@ void CopyCommNetActor::Act() {
int64_t src_actor_id = readable_it->second.producer;
int64_t src_machine_id = Global<IDMgr>::Get()->MachineId4ActorId(src_actor_id);
// writeable
void* writeable_token = GetNaiveCurWriteable("copy_out")->comm_net_token();
// Async
Global<CommNet>::Get()->Read(actor_read_id_, src_machine_id, readable_token, writeable_token);
Regst* writeable_regst = GetNaiveCurWriteable("copy_out");
if (readable_it->second.has_sole_empty_tensor_in_sole_tensor_list) {
// pass if regst dynamic body is emtpy
Blob* data_blob = writeable_regst->GetMutSoleBlob();
TensorBackInserter back_inserter(data_blob);
back_inserter.ReserveOneEmptyTensorList();
FullyMutTensorView* tensor_view = back_inserter.add_tensor();
Shape empty_shape = data_blob->static_shape();
for (int i = 0; i < empty_shape.NumAxes(); ++i) { empty_shape.Set(i, 0); }
tensor_view->set_shape(empty_shape);
LOG(INFO) << "cclog: PASS";
} else {
void* writeable_token = writeable_regst->comm_net_token();
// Async
Global<CommNet>::Get()->Read(actor_read_id_, src_machine_id, readable_token, writeable_token);
LOG(INFO) << "cclog: READ";
}
}
void CopyCommNetActor::VirtualAsyncSendNaiveProducedRegstMsgToConsumer() {
......
......@@ -18,6 +18,7 @@ class CopyCommNetActor final : public Actor {
Regst* regst_raw_ptr;
int64_t producer;
int64_t act_id;
bool has_sole_empty_tensor_in_sole_tensor_list;
};
void VirtualActorInit(const TaskProto&) override;
......
......@@ -14,6 +14,7 @@ class ReluKernel final : public KernelIf<device_type> {
~ReluKernel() = default;
private:
bool IsStateless() const override { return true; }
void ForwardDataContent(const KernelCtx&,
std::function<Blob*(const std::string&)>) const override;
};
......
......@@ -67,7 +67,6 @@ void Blob::Init(const MemoryCase& mem_case, const RtBlobDesc* blob_desc, char* h
new TensorView(this, header_field<FieldKey::kTensorShapeList>(), dptr<char>()));
begin_mut_tensor_.reset(new DataOnlyMutTensorView(
this, mut_header_field<FieldKey::kTensorShapeList>(), mut_dptr<char>()));
tensor_back_inserter_.reset(new TensorBackInserter(this));
int64_t* shape_ptr = mut_header_field<FieldKey::kTensorShapeList>();
shape_view_.reset(new ShapeView(shape_ptr, static_shape().NumAxes()));
if (blob_desc->is_dynamic()) {
......
......@@ -54,7 +54,6 @@ class Blob final {
bool IsEndTensor(const DataOnlyMutTensorView& tensor) const;
friend class TensorBackInserter;
const TensorBackInserter& tensor_back_inserter() { return *tensor_back_inserter_; }
// tensor list slice
size_t num_of_tensor_list_slices() const;
......@@ -128,7 +127,6 @@ class Blob final {
std::unique_ptr<PodPtr> header_ptr_;
std::unique_ptr<TensorView> begin_tensor_;
std::unique_ptr<DataOnlyMutTensorView> begin_mut_tensor_;
std::unique_ptr<TensorBackInserter> tensor_back_inserter_;
// TODO(); remove this ugly code
int32_t record_num_;
};
......
......@@ -38,4 +38,14 @@ void Regst::set_regst_desc(const RtRegstDesc* regst_desc) {
status_.regst_desc_id = regst_desc_->regst_desc_id();
}
Blob* Regst::GetMutSoleBlob() {
CHECK_EQ(GetBlobSize(), 1);
return lbi2blob_.begin()->second.get();
}
const Blob* Regst::GetSoleBlob() const {
CHECK_EQ(GetBlobSize(), 1);
return lbi2blob_.begin()->second.get();
}
} // namespace oneflow
......@@ -34,6 +34,9 @@ class Regst final {
const std::vector<int64_t>& consumers_actor_id() const;
const RtRegstDesc* regst_desc() const { return regst_desc_; }
Blob* GetBlobByLbi(const LogicalBlobId& lbi);
const Blob* GetSoleBlob() const;
Blob* GetMutSoleBlob();
int64_t GetBlobSize() const { return lbi2blob_.size(); }
const HashMap<LogicalBlobId, std::unique_ptr<Blob>>& lbi2blob() const { return lbi2blob_; }
Blob* packed_blob() { return packed_blob_.get(); }
bool IsMaxCol() const { return col_id() == max_col_id(); }
......
import oneflow as flow
import numpy as np
def ccrelu(x, name):
return flow.user_op_builder(name)\
.Op("ccrelu")\
.Input("in",[x])\
.Output("out")\
.Build().RemoteBlobList()[0]
@flow.unittest.num_nodes_required(2)
def test_multi_node_comm_net(test_case):
func_config = flow.FunctionConfig()
func_config.default_distribute_strategy(flow.distribute.consistent_strategy())
func_config.default_data_type(flow.float)
flow.config.gpu_device_num(1)
@flow.function(func_config)
def ReluJob(x = flow.FixedTensorDef((10, 2))):
with flow.fixed_placement("gpu", "0:0"):
out0 = ccrelu(x, "my_op_0_0")
with flow.fixed_placement("gpu", "1:0"):
out1 = ccrelu(out0, "my_op_1_0")
with flow.fixed_placement("gpu", "0:0"):
out2 = ccrelu(out1, "my_op_print")
return out2
index = [-2, -1, 0, 1, 2]
data = []
for i in index: data.append(np.ones((10, 2,), dtype=np.float32) * i)
for i in range(5):
ret = ReluJob(data[i]).get().ndarray()
print(ret)
if index[i] > 0 :
test_case.assertTrue(np.array_equal(ret, np.ones((10, 2,), dtype=np.float32) * index[i]))
else:
test_case.assertTrue(np.array_equal(ret, np.zeros((10, 2,), dtype=np.float32)))
@flow.unittest.num_nodes_required(2)
def test_multi_node_comm_net_dynamic(test_case):
func_config = flow.FunctionConfig()
func_config.default_distribute_strategy(flow.distribute.mirrored_strategy())
func_config.default_placement_scope(flow.fixed_placement("gpu", "0:0"))
func_config.default_data_type(flow.float)
flow.config.machine_num(2)
flow.config.gpu_device_num(1)
@flow.function(func_config)
def ReluJob(x = flow.MirroredTensorDef((10, 2))):
with flow.fixed_placement("gpu", "0:0"):
out0 = flow.keras.activations.relu(x)
with flow.fixed_placement("gpu", "1:0"):
out1 = flow.keras.activations.relu(out0)
with flow.fixed_placement("gpu", "0:0"):
out2 = flow.keras.activations.relu(out1)
return out2
index = [-2, -1, 0, 1, 2]
data = []
for i in index: data.append(np.ones((5, 2,), dtype=np.float32) * i)
for i in range(5):
ret = ReluJob([data[i]]).get().ndarray_list()[0]
print(ret)
if index[i] > 0 :
test_case.assertTrue(np.array_equal(ret, np.ones((5, 2,), dtype=np.float32) * index[i]))
else:
test_case.assertTrue(np.array_equal(ret, np.zeros((5, 2,), dtype=np.float32)))
@flow.unittest.num_nodes_required(2)
def test_multi_node_comm_net_dynamic_empty(test_case):
func_config = flow.FunctionConfig()
func_config.default_distribute_strategy(flow.distribute.mirrored_strategy())
func_config.default_placement_scope(flow.fixed_placement("cpu", "0:0"))
func_config.default_data_type(flow.float)
flow.config.machine_num(2)
flow.config.gpu_device_num(1)
@flow.function(func_config)
def ReluJob(x = flow.MirroredTensorDef((10, 2))):
with flow.fixed_placement("cpu", "0:0"):
out0 = flow.keras.activations.relu(x)
with flow.fixed_placement("cpu", "1:0"):
out1 = flow.keras.activations.relu(out0)
with flow.fixed_placement("cpu", "0:0"):
out2 = flow.keras.activations.relu(out1)
return out2
index = [-2, -1, 0, 1, 2]
data = []
for i in index: data.append(np.ones((0,0,), dtype=np.float32) * i)
for i in range(5):
ret = ReluJob([data[i]]).get().ndarray_list()[0]
print(ret)
if index[i] > 0 :
test_case.assertTrue(np.array_equal(ret, np.ones((0, 0,), dtype=np.float32) * index[i]))
else:
test_case.assertTrue(np.array_equal(ret, np.zeros((0, 0,), dtype=np.float32)))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册