提交 9dd34e41 编写于 作者: 武毅 提交者: gongweibao

update unpushed commits for zerocopy grpc (#8900)

上级 b825c792
...@@ -127,8 +127,8 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, ...@@ -127,8 +127,8 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
boost::get<platform::CUDAPlace>(tensor.place()), boost::get<platform::CUDAPlace>(tensor.place()),
reinterpret_cast<const void*>(tensor.data<void>()), reinterpret_cast<const void*>(tensor.data<void>()),
copy_size, gpu_dev_ctx.stream()); copy_size, gpu_dev_ctx.stream());
ctx.Wait();
destroy_callback = [](void* backing) { destroy_callback = [](void* backing) {
std::cout << "destroy payload" << std::endl;
platform::CPUPlace cpu; platform::CPUPlace cpu;
memory::Free(cpu, backing); memory::Free(cpu, backing);
}; };
...@@ -137,12 +137,6 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, ...@@ -137,12 +137,6 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
payload = tensor.data<void>(); payload = tensor.data<void>();
} }
payload_size = tensor.memory_size(); payload_size = tensor.memory_size();
std::string tmp(reinterpret_cast<char*>(payload), payload_size);
for (int i = 0; i < tmp.size(); ++i) {
printf("%02X ", tmp.data()[i]);
}
printf("\n");
e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size);
} break; } break;
case framework::proto::VarType_Type_SELECTED_ROWS: { case framework::proto::VarType_Type_SELECTED_ROWS: {
...@@ -167,14 +161,9 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, ...@@ -167,14 +161,9 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
reinterpret_cast<const void*>(tensor->data<void>()), reinterpret_cast<const void*>(tensor->data<void>()),
copy_size, gpu_dev_ctx.stream()); copy_size, gpu_dev_ctx.stream());
ctx.Wait(); ctx.Wait();
float* ttt = reinterpret_cast<float*>(payload);
for (int i = 0; i < copy_size / 4; i++) {
std::cout << "copied to cpu: " << ttt[i] << std::endl;
}
destroy_callback = [](void* backing) { destroy_callback = [](void* backing) {
std::cout << "destroy..." << std::endl; platform::CPUPlace cpu;
// platform::CPUPlace cpu; memory::Free(cpu, backing);
// memory::Free(cpu, backing);
}; };
#endif #endif
} else { } else {
...@@ -270,6 +259,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, ...@@ -270,6 +259,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg,
tensor_data, cpu, tensor_data, cpu,
reinterpret_cast<const void*>(meta.serialized().data()), reinterpret_cast<const void*>(meta.serialized().data()),
meta.serialized().size(), gpu_dev_ctx.stream()); meta.serialized().size(), gpu_dev_ctx.stream());
ctx.Wait();
#endif #endif
} else { } else {
memcpy(tensor_data, memcpy(tensor_data,
...@@ -292,6 +282,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, ...@@ -292,6 +282,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg,
tensor_data, cpu, tensor_data, cpu,
reinterpret_cast<const void*>(meta.serialized().data()), reinterpret_cast<const void*>(meta.serialized().data()),
meta.serialized().size(), gpu_dev_ctx.stream()); meta.serialized().size(), gpu_dev_ctx.stream());
ctx.Wait();
#endif #endif
} else { } else {
memcpy(tensor_data, memcpy(tensor_data,
......
...@@ -35,12 +35,6 @@ namespace detail { ...@@ -35,12 +35,6 @@ namespace detail {
typedef void (*DestroyCallback)(void*); typedef void (*DestroyCallback)(void*);
inline int64_t GetTimestamp() {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
}
void SerializeToMessage(const std::string& name, const framework::Variable* var, void SerializeToMessage(const std::string& name, const framework::Variable* var,
const platform::DeviceContext& ctx, const platform::DeviceContext& ctx,
sendrecv::VariableMessage* msg); sendrecv::VariableMessage* msg);
......
...@@ -42,7 +42,7 @@ void RunSerdeTestTensor(platform::Place place) { ...@@ -42,7 +42,7 @@ void RunSerdeTestTensor(platform::Place place) {
int tensor_numel = 4 * 8 * 4 * 2; int tensor_numel = 4 * 8 * 4 * 2;
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place); auto& ctx = *pool.Get(place);
float* orig_tensor_data = tensor->mutable_data<float>(place); tensor->mutable_data<float>(place);
math::set_constant(ctx, tensor, 31.9); math::set_constant(ctx, tensor, 31.9);
::grpc::ByteBuffer msg; ::grpc::ByteBuffer msg;
...@@ -71,16 +71,9 @@ void RunSerdeTestTensor(platform::Place place) { ...@@ -71,16 +71,9 @@ void RunSerdeTestTensor(platform::Place place) {
const float* tensor_data = const float* tensor_data =
reinterpret_cast<const float*>(varmsg.serialized().data()); reinterpret_cast<const float*>(varmsg.serialized().data());
for (int i = 0; i < varmsg.serialized().size(); ++i) {
printf("%02X ", varmsg.serialized().data()[i]);
}
printf("\n");
for (int i = 0; i < tensor_numel; ++i) { for (int i = 0; i < tensor_numel; ++i) {
std::cout << "#####tensor data: " << tensor_data[i] << std::endl; EXPECT_FLOAT_EQ(tensor_data[i], 31.9);
EXPECT_EQ(tensor_data[i], orig_tensor_data[i]);
std::cout << "test end 1 " << std::endl;
} }
std::cout << "tensor data end " << std::endl;
// deserialize zero-copy // deserialize zero-copy
framework::Variable var2; framework::Variable var2;
...@@ -101,8 +94,7 @@ void RunSerdeTestTensor(platform::Place place) { ...@@ -101,8 +94,7 @@ void RunSerdeTestTensor(platform::Place place) {
EXPECT_EQ(varmsg.lod(0).lod_data(0), 1); EXPECT_EQ(varmsg.lod(0).lod_data(0), 1);
EXPECT_EQ(varmsg.lod(0).lod_data(1), 3); EXPECT_EQ(varmsg.lod(0).lod_data(1), 3);
EXPECT_EQ(varmsg.lod(0).lod_data(2), 8); EXPECT_EQ(varmsg.lod(0).lod_data(2), 8);
for (int i = 0; i < tensor_numel; ++i) for (int i = 0; i < tensor_numel; ++i) EXPECT_FLOAT_EQ(tensor_data2[i], 31.9);
EXPECT_EQ(tensor_data2[i], orig_tensor_data[i]);
} }
void RunSerdeTestSelectedRows(platform::Place place) { void RunSerdeTestSelectedRows(platform::Place place) {
...@@ -114,10 +106,9 @@ void RunSerdeTestSelectedRows(platform::Place place) { ...@@ -114,10 +106,9 @@ void RunSerdeTestSelectedRows(platform::Place place) {
auto* slr = var.GetMutable<framework::SelectedRows>(); auto* slr = var.GetMutable<framework::SelectedRows>();
auto* tensor = slr->mutable_value(); auto* tensor = slr->mutable_value();
auto* rows = slr->mutable_rows(); auto* rows = slr->mutable_rows();
tensor->Resize(framework::make_ddim({2, 10})); tensor->Resize(framework::make_ddim({2, 10}));
tensor->mutable_data<float>(place);
int tensor_numel = 2 * 10; int tensor_numel = 2 * 10;
float* orig_tensor_data = tensor->mutable_data<float>(place);
math::set_constant(ctx, tensor, 32.7); math::set_constant(ctx, tensor, 32.7);
rows->push_back(3); rows->push_back(3);
rows->push_back(10); rows->push_back(10);
...@@ -144,7 +135,7 @@ void RunSerdeTestSelectedRows(platform::Place place) { ...@@ -144,7 +135,7 @@ void RunSerdeTestSelectedRows(platform::Place place) {
const int64_t* rows_data = const int64_t* rows_data =
reinterpret_cast<const int64_t*>(varmsg.rows().data()); reinterpret_cast<const int64_t*>(varmsg.rows().data());
for (int i = 0; i < tensor_numel; ++i) { for (int i = 0; i < tensor_numel; ++i) {
EXPECT_EQ(tensor_data[i], orig_tensor_data[i]); EXPECT_FLOAT_EQ(tensor_data[i], 32.7);
} }
EXPECT_EQ(rows_data[0], 3); EXPECT_EQ(rows_data[0], 3);
EXPECT_EQ(rows_data[1], 10); EXPECT_EQ(rows_data[1], 10);
...@@ -168,21 +159,21 @@ void RunSerdeTestSelectedRows(platform::Place place) { ...@@ -168,21 +159,21 @@ void RunSerdeTestSelectedRows(platform::Place place) {
const int64_t* rows_data2 = rows2->data(); const int64_t* rows_data2 = rows2->data();
for (int i = 0; i < tensor_numel; ++i) { for (int i = 0; i < tensor_numel; ++i) {
EXPECT_EQ(tensor_data2[i], orig_tensor_data[i]); EXPECT_FLOAT_EQ(tensor_data2[i], 32.7);
} }
EXPECT_EQ(rows_data2[0], 3); EXPECT_EQ(rows_data2[0], 3);
EXPECT_EQ(rows_data2[1], 10); EXPECT_EQ(rows_data2[1], 10);
} }
// TEST(SelectedRows, CPU) { TEST(SelectedRows, CPU) {
// platform::CPUPlace place; platform::CPUPlace place;
// RunSerdeTestSelectedRows(place); RunSerdeTestSelectedRows(place);
// } }
// TEST(SelectedRows, GPU) { TEST(SelectedRows, GPU) {
// platform::CUDAPlace place; platform::CUDAPlace place;
// RunSerdeTestSelectedRows(place); RunSerdeTestSelectedRows(place);
// } }
TEST(Tensor, CPU) { TEST(Tensor, CPU) {
platform::CPUPlace place; platform::CPUPlace place;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册