diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index 64d181f4083dfcd43a59cad1cca21ec63df4d85f..f196fc9862d2374583d50820a6c3b63c866bf048 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -127,8 +127,8 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, boost::get(tensor.place()), reinterpret_cast(tensor.data()), copy_size, gpu_dev_ctx.stream()); + ctx.Wait(); destroy_callback = [](void* backing) { - std::cout << "destroy payload" << std::endl; platform::CPUPlace cpu; memory::Free(cpu, backing); }; @@ -137,12 +137,6 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, payload = tensor.data(); } payload_size = tensor.memory_size(); - - std::string tmp(reinterpret_cast(payload), payload_size); - for (int i = 0; i < tmp.size(); ++i) { - printf("%02X ", tmp.data()[i]); - } - printf("\n"); e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size); } break; case framework::proto::VarType_Type_SELECTED_ROWS: { @@ -167,14 +161,9 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, reinterpret_cast(tensor->data()), copy_size, gpu_dev_ctx.stream()); ctx.Wait(); - float* ttt = reinterpret_cast(payload); - for (int i = 0; i < copy_size / 4; i++) { - std::cout << "copied to cpu: " << ttt[i] << std::endl; - } destroy_callback = [](void* backing) { - std::cout << "destroy..." << std::endl; - // platform::CPUPlace cpu; - // memory::Free(cpu, backing); + platform::CPUPlace cpu; + memory::Free(cpu, backing); }; #endif } else { @@ -270,6 +259,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, tensor_data, cpu, reinterpret_cast(meta.serialized().data()), meta.serialized().size(), gpu_dev_ctx.stream()); + ctx.Wait(); #endif } else { memcpy(tensor_data, @@ -292,6 +282,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, tensor_data, cpu, reinterpret_cast(meta.serialized().data()), meta.serialized().size(), gpu_dev_ctx.stream()); + ctx.Wait(); #endif } else { memcpy(tensor_data, diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.h b/paddle/fluid/operators/detail/sendrecvop_utils.h index 65704db5ae2604c8e462ffc2828085ecb2893e43..5208091e54b4da2bb0265f84827ce23b57e954dc 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.h +++ b/paddle/fluid/operators/detail/sendrecvop_utils.h @@ -35,12 +35,6 @@ namespace detail { typedef void (*DestroyCallback)(void*); -inline int64_t GetTimestamp() { - return std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); -} - void SerializeToMessage(const std::string& name, const framework::Variable* var, const platform::DeviceContext& ctx, sendrecv::VariableMessage* msg); diff --git a/paddle/fluid/operators/detail/test_serde.cc b/paddle/fluid/operators/detail/test_serde.cc index 8054c89ecfe7ba8273564c9d480ae6f20c5b4286..2f06e5a686b996858d21930a1afa2861efca4a9b 100644 --- a/paddle/fluid/operators/detail/test_serde.cc +++ b/paddle/fluid/operators/detail/test_serde.cc @@ -42,7 +42,7 @@ void RunSerdeTestTensor(platform::Place place) { int tensor_numel = 4 * 8 * 4 * 2; platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); auto& ctx = *pool.Get(place); - float* orig_tensor_data = tensor->mutable_data(place); + tensor->mutable_data(place); math::set_constant(ctx, tensor, 31.9); ::grpc::ByteBuffer msg; @@ -71,16 +71,9 @@ void RunSerdeTestTensor(platform::Place place) { const float* tensor_data = reinterpret_cast(varmsg.serialized().data()); - for (int i = 0; i < varmsg.serialized().size(); ++i) { - printf("%02X ", varmsg.serialized().data()[i]); - } - printf("\n"); for (int i = 0; i < tensor_numel; ++i) { - std::cout << "#####tensor data: " << tensor_data[i] << std::endl; - EXPECT_EQ(tensor_data[i], orig_tensor_data[i]); - std::cout << "test end 1 " << std::endl; + EXPECT_FLOAT_EQ(tensor_data[i], 31.9); } - std::cout << "tensor data end " << std::endl; // deserialize zero-copy framework::Variable var2; @@ -101,8 +94,7 @@ void RunSerdeTestTensor(platform::Place place) { EXPECT_EQ(varmsg.lod(0).lod_data(0), 1); EXPECT_EQ(varmsg.lod(0).lod_data(1), 3); EXPECT_EQ(varmsg.lod(0).lod_data(2), 8); - for (int i = 0; i < tensor_numel; ++i) - EXPECT_EQ(tensor_data2[i], orig_tensor_data[i]); + for (int i = 0; i < tensor_numel; ++i) EXPECT_FLOAT_EQ(tensor_data2[i], 31.9); } void RunSerdeTestSelectedRows(platform::Place place) { @@ -114,10 +106,9 @@ void RunSerdeTestSelectedRows(platform::Place place) { auto* slr = var.GetMutable(); auto* tensor = slr->mutable_value(); auto* rows = slr->mutable_rows(); - tensor->Resize(framework::make_ddim({2, 10})); + tensor->mutable_data(place); int tensor_numel = 2 * 10; - float* orig_tensor_data = tensor->mutable_data(place); math::set_constant(ctx, tensor, 32.7); rows->push_back(3); rows->push_back(10); @@ -144,7 +135,7 @@ void RunSerdeTestSelectedRows(platform::Place place) { const int64_t* rows_data = reinterpret_cast(varmsg.rows().data()); for (int i = 0; i < tensor_numel; ++i) { - EXPECT_EQ(tensor_data[i], orig_tensor_data[i]); + EXPECT_FLOAT_EQ(tensor_data[i], 32.7); } EXPECT_EQ(rows_data[0], 3); EXPECT_EQ(rows_data[1], 10); @@ -168,21 +159,21 @@ void RunSerdeTestSelectedRows(platform::Place place) { const int64_t* rows_data2 = rows2->data(); for (int i = 0; i < tensor_numel; ++i) { - EXPECT_EQ(tensor_data2[i], orig_tensor_data[i]); + EXPECT_FLOAT_EQ(tensor_data2[i], 32.7); } EXPECT_EQ(rows_data2[0], 3); EXPECT_EQ(rows_data2[1], 10); } -// TEST(SelectedRows, CPU) { -// platform::CPUPlace place; -// RunSerdeTestSelectedRows(place); -// } +TEST(SelectedRows, CPU) { + platform::CPUPlace place; + RunSerdeTestSelectedRows(place); +} -// TEST(SelectedRows, GPU) { -// platform::CUDAPlace place; -// RunSerdeTestSelectedRows(place); -// } +TEST(SelectedRows, GPU) { + platform::CUDAPlace place; + RunSerdeTestSelectedRows(place); +} TEST(Tensor, CPU) { platform::CPUPlace place;