diff --git a/paddle/fluid/distributed/ps/service/graph_brpc_client.cc b/paddle/fluid/distributed/ps/service/graph_brpc_client.cc index 770a38893a476bae95a2b65939203693b5fddbe5..ae4d12b5b629cb219bce5ac1dabc4f5295797061 100644 --- a/paddle/fluid/distributed/ps/service/graph_brpc_client.cc +++ b/paddle/fluid/distributed/ps/service/graph_brpc_client.cc @@ -83,7 +83,7 @@ std::future GraphBrpcClient::get_node_feat( request_call_num, [&, node_id_buckets, query_idx_buckets, request_call_num](void *done) { int ret = 0; - auto *closure = (DownpourBrpcClosure *)done; + auto *closure = reinterpret_cast(done); size_t fail_num = 0; for (size_t request_idx = 0; request_idx < request_call_num; ++request_idx) { @@ -97,7 +97,8 @@ std::future GraphBrpcClient::get_node_feat( size_t bytes_size = io_buffer_itr.bytes_left(); std::unique_ptr buffer_wrapper(new char[bytes_size]); char *buffer = buffer_wrapper.get(); - io_buffer_itr.copy_and_forward((void *)(buffer), bytes_size); + io_buffer_itr.copy_and_forward(reinterpret_cast(buffer), + bytes_size); for (size_t feat_idx = 0; feat_idx < feature_names.size(); ++feat_idx) { @@ -105,7 +106,7 @@ std::future GraphBrpcClient::get_node_feat( node_idx < query_idx_buckets.at(request_idx).size(); ++node_idx) { int query_idx = query_idx_buckets.at(request_idx).at(node_idx); - size_t feat_len = *(size_t *)(buffer); + size_t feat_len = *reinterpret_cast(buffer); buffer += sizeof(size_t); auto feature = std::string(buffer, feat_len); res[feat_idx][query_idx] = feature; @@ -132,10 +133,12 @@ std::future GraphBrpcClient::get_node_feat( closure->request(request_idx)->set_client_id(_client_id); size_t node_num = node_id_buckets[request_idx].size(); - closure->request(request_idx)->add_params((char *)&idx_, sizeof(int)); closure->request(request_idx) - ->add_params((char *)node_id_buckets[request_idx].data(), - sizeof(int64_t) * node_num); + ->add_params(reinterpret_cast(&idx_), sizeof(int)); + closure->request(request_idx) + ->add_params( + reinterpret_cast(node_id_buckets[request_idx].data()), + sizeof(int64_t) * node_num); std::string joint_feature_name = paddle::string::join_strings(feature_names, '\t'); closure->request(request_idx) @@ -158,7 +161,7 @@ std::future GraphBrpcClient::clear_nodes(uint32_t table_id, DownpourBrpcClosure *closure = new DownpourBrpcClosure( server_size, [&, server_size = this->server_size](void *done) { int ret = 0; - auto *closure = (DownpourBrpcClosure *)done; + auto *closure = reinterpret_cast(done); size_t fail_num = 0; for (size_t request_idx = 0; request_idx < server_size; ++request_idx) { if (closure->check_response(request_idx, PS_GRAPH_CLEAR) != 0) { @@ -177,8 +180,10 @@ std::future GraphBrpcClient::clear_nodes(uint32_t table_id, closure->request(server_index)->set_cmd_id(PS_GRAPH_CLEAR); closure->request(server_index)->set_table_id(table_id); closure->request(server_index)->set_client_id(_client_id); - closure->request(server_index)->add_params((char *)&type_id, sizeof(int)); - closure->request(server_index)->add_params((char *)&idx_, sizeof(int)); + closure->request(server_index) + ->add_params(reinterpret_cast(&type_id), sizeof(int)); + closure->request(server_index) + ->add_params(reinterpret_cast(&idx_), sizeof(int)); GraphPsService_Stub rpc_stub = getServiceStub(GetCmdChannel(server_index)); closure->cntl(server_index)->set_log_id(butil::gettimeofday_ms()); rpc_stub.service(closure->cntl(server_index), @@ -217,7 +222,7 @@ std::future GraphBrpcClient::add_graph_node( DownpourBrpcClosure *closure = new DownpourBrpcClosure( request_call_num, [&, request_call_num](void *done) { int ret = 0; - auto *closure = (DownpourBrpcClosure *)done; + auto *closure = reinterpret_cast(done); size_t fail_num = 0; for (size_t request_idx = 0; request_idx < request_call_num; ++request_idx) { @@ -239,16 +244,18 @@ std::future GraphBrpcClient::add_graph_node( closure->request(request_idx)->set_table_id(table_id); closure->request(request_idx)->set_client_id(_client_id); size_t node_num = request_bucket[request_idx].size(); - closure->request(request_idx)->add_params((char *)&idx_, sizeof(int)); closure->request(request_idx) - ->add_params((char *)request_bucket[request_idx].data(), - sizeof(int64_t) * node_num); + ->add_params(reinterpret_cast(&idx_), sizeof(int)); + closure->request(request_idx) + ->add_params( + reinterpret_cast(request_bucket[request_idx].data()), + sizeof(int64_t) * node_num); if (add_weight) { bool weighted[is_weighted_bucket[request_idx].size() + 1]; for (size_t j = 0; j < is_weighted_bucket[request_idx].size(); j++) weighted[j] = is_weighted_bucket[request_idx][j]; closure->request(request_idx) - ->add_params((char *)weighted, + ->add_params(reinterpret_cast(weighted), sizeof(bool) * is_weighted_bucket[request_idx].size()); } // PsService_Stub rpc_stub(GetCmdChannel(server_index)); @@ -280,7 +287,7 @@ std::future GraphBrpcClient::remove_graph_node( DownpourBrpcClosure *closure = new DownpourBrpcClosure( request_call_num, [&, request_call_num](void *done) { int ret = 0; - auto *closure = (DownpourBrpcClosure *)done; + auto *closure = reinterpret_cast(done); size_t fail_num = 0; for (size_t request_idx = 0; request_idx < request_call_num; ++request_idx) { @@ -303,10 +310,12 @@ std::future GraphBrpcClient::remove_graph_node( closure->request(request_idx)->set_client_id(_client_id); size_t node_num = request_bucket[request_idx].size(); - closure->request(request_idx)->add_params((char *)&idx_, sizeof(int)); closure->request(request_idx) - ->add_params((char *)request_bucket[request_idx].data(), - sizeof(int64_t) * node_num); + ->add_params(reinterpret_cast(&idx_), sizeof(int)); + closure->request(request_idx) + ->add_params( + reinterpret_cast(request_bucket[request_idx].data()), + sizeof(int64_t) * node_num); // PsService_Stub rpc_stub(GetCmdChannel(server_index)); GraphPsService_Stub rpc_stub = getServiceStub(GetCmdChannel(server_index)); closure->cntl(request_idx)->set_log_id(butil::gettimeofday_ms()); @@ -335,7 +344,7 @@ std::future GraphBrpcClient::batch_sample_neighbors( } DownpourBrpcClosure *closure = new DownpourBrpcClosure(1, [&](void *done) { int ret = 0; - auto *closure = (DownpourBrpcClosure *)done; + auto *closure = reinterpret_cast(done); if (closure->check_response(0, PS_GRAPH_SAMPLE_NODES_FROM_ONE_SERVER) != 0) { ret = -1; @@ -345,10 +354,11 @@ std::future GraphBrpcClient::batch_sample_neighbors( size_t bytes_size = io_buffer_itr.bytes_left(); std::unique_ptr buffer_wrapper(new char[bytes_size]); char *buffer = buffer_wrapper.get(); - io_buffer_itr.copy_and_forward((void *)(buffer), bytes_size); + io_buffer_itr.copy_and_forward(reinterpret_cast(buffer), + bytes_size); - size_t node_num = *(size_t *)buffer; - int *actual_sizes = (int *)(buffer + sizeof(size_t)); + size_t node_num = *reinterpret_cast(buffer); + int *actual_sizes = reinterpret_cast(buffer + sizeof(size_t)); char *node_buffer = buffer + sizeof(size_t) + sizeof(int) * node_num; int offset = 0; @@ -357,11 +367,11 @@ std::future GraphBrpcClient::batch_sample_neighbors( int start = 0; while (start < actual_size) { res[node_idx].emplace_back( - *(int64_t *)(node_buffer + offset + start)); + *reinterpret_cast(node_buffer + offset + start)); start += GraphNode::id_size; if (need_weight) { res_weight[node_idx].emplace_back( - *(float *)(node_buffer + offset + start)); + *reinterpret_cast(node_buffer + offset + start)); start += GraphNode::weight_size; } } @@ -373,16 +383,19 @@ std::future GraphBrpcClient::batch_sample_neighbors( auto promise = std::make_shared>(); closure->add_promise(promise); std::future fut = promise->get_future(); - ; + closure->request(0)->set_cmd_id(PS_GRAPH_SAMPLE_NODES_FROM_ONE_SERVER); closure->request(0)->set_table_id(table_id); closure->request(0)->set_client_id(_client_id); - closure->request(0)->add_params((char *)&idx_, sizeof(int)); - closure->request(0)->add_params((char *)node_ids.data(), + closure->request(0)->add_params(reinterpret_cast(&idx_), + sizeof(int)); + closure->request(0)->add_params(reinterpret_cast(node_ids.data()), sizeof(int64_t) * node_ids.size()); - closure->request(0)->add_params((char *)&sample_size, sizeof(int)); - closure->request(0)->add_params((char *)&need_weight, sizeof(bool)); - ; + closure->request(0)->add_params(reinterpret_cast(&sample_size), + sizeof(int)); + closure->request(0)->add_params(reinterpret_cast(&need_weight), + sizeof(bool)); + // PsService_Stub rpc_stub(GetCmdChannel(server_index)); GraphPsService_Stub rpc_stub = getServiceStub(GetCmdChannel(server_index)); closure->cntl(0)->set_log_id(butil::gettimeofday_ms()); @@ -420,7 +433,7 @@ std::future GraphBrpcClient::batch_sample_neighbors( request_call_num, [&, node_id_buckets, query_idx_buckets, request_call_num](void *done) { int ret = 0; - auto *closure = (DownpourBrpcClosure *)done; + auto *closure = reinterpret_cast(done); size_t fail_num = 0; for (size_t request_idx = 0; request_idx < request_call_num; ++request_idx) { @@ -434,10 +447,12 @@ std::future GraphBrpcClient::batch_sample_neighbors( size_t bytes_size = io_buffer_itr.bytes_left(); std::unique_ptr buffer_wrapper(new char[bytes_size]); char *buffer = buffer_wrapper.get(); - io_buffer_itr.copy_and_forward((void *)(buffer), bytes_size); + io_buffer_itr.copy_and_forward(reinterpret_cast(buffer), + bytes_size); - size_t node_num = *(size_t *)buffer; - int *actual_sizes = (int *)(buffer + sizeof(size_t)); + size_t node_num = *reinterpret_cast(buffer); + int *actual_sizes = + reinterpret_cast(buffer + sizeof(size_t)); char *node_buffer = buffer + sizeof(size_t) + sizeof(int) * node_num; @@ -448,11 +463,11 @@ std::future GraphBrpcClient::batch_sample_neighbors( int start = 0; while (start < actual_size) { res[query_idx].emplace_back( - *(int64_t *)(node_buffer + offset + start)); + *reinterpret_cast(node_buffer + offset + start)); start += GraphNode::id_size; if (need_weight) { res_weight[query_idx].emplace_back( - *(float *)(node_buffer + offset + start)); + *reinterpret_cast(node_buffer + offset + start)); start += GraphNode::weight_size; } } @@ -477,14 +492,16 @@ std::future GraphBrpcClient::batch_sample_neighbors( closure->request(request_idx)->set_client_id(_client_id); size_t node_num = node_id_buckets[request_idx].size(); - closure->request(request_idx)->add_params((char *)&idx_, sizeof(int)); closure->request(request_idx) - ->add_params((char *)node_id_buckets[request_idx].data(), - sizeof(int64_t) * node_num); + ->add_params(reinterpret_cast(&idx_), sizeof(int)); closure->request(request_idx) - ->add_params((char *)&sample_size, sizeof(int)); + ->add_params( + reinterpret_cast(node_id_buckets[request_idx].data()), + sizeof(int64_t) * node_num); closure->request(request_idx) - ->add_params((char *)&need_weight, sizeof(bool)); + ->add_params(reinterpret_cast(&sample_size), sizeof(int)); + closure->request(request_idx) + ->add_params(reinterpret_cast(&need_weight), sizeof(bool)); // PsService_Stub rpc_stub(GetCmdChannel(server_index)); GraphPsService_Stub rpc_stub = getServiceStub(GetCmdChannel(server_index)); closure->cntl(request_idx)->set_log_id(butil::gettimeofday_ms()); @@ -505,7 +522,7 @@ std::future GraphBrpcClient::random_sample_nodes( std::vector &ids) { DownpourBrpcClosure *closure = new DownpourBrpcClosure(1, [&](void *done) { int ret = 0; - auto *closure = (DownpourBrpcClosure *)done; + auto *closure = reinterpret_cast(done); if (closure->check_response(0, PS_GRAPH_SAMPLE_NODES) != 0) { ret = -1; } else { @@ -515,7 +532,7 @@ std::future GraphBrpcClient::random_sample_nodes( char *buffer = new char[bytes_size]; size_t index = 0; while (index < bytes_size) { - ids.push_back(*(int64_t *)(buffer + index)); + ids.push_back(*reinterpret_cast(buffer + index)); index += GraphNode::id_size; } delete[] buffer; @@ -525,14 +542,16 @@ std::future GraphBrpcClient::random_sample_nodes( auto promise = std::make_shared>(); closure->add_promise(promise); std::future fut = promise->get_future(); - ; + closure->request(0)->set_cmd_id(PS_GRAPH_SAMPLE_NODES); closure->request(0)->set_table_id(table_id); closure->request(0)->set_client_id(_client_id); - closure->request(0)->add_params((char *)&type_id, sizeof(int)); - closure->request(0)->add_params((char *)&idx_, sizeof(int)); - closure->request(0)->add_params((char *)&sample_size, sizeof(int)); - ; + closure->request(0)->add_params(reinterpret_cast(&type_id), + sizeof(int)); + closure->request(0)->add_params(reinterpret_cast(&idx_), sizeof(int)); + closure->request(0)->add_params(reinterpret_cast(&sample_size), + sizeof(int)); + // PsService_Stub rpc_stub(GetCmdChannel(server_index)); GraphPsService_Stub rpc_stub = getServiceStub(GetCmdChannel(server_index)); closure->cntl(0)->set_log_id(butil::gettimeofday_ms()); @@ -552,7 +571,7 @@ std::future GraphBrpcClient::pull_graph_list( std::vector &res) { DownpourBrpcClosure *closure = new DownpourBrpcClosure(1, [&](void *done) { int ret = 0; - auto *closure = (DownpourBrpcClosure *)done; + auto *closure = reinterpret_cast(done); if (closure->check_response(0, PS_PULL_GRAPH_LIST) != 0) { ret = -1; } else { @@ -560,7 +579,8 @@ std::future GraphBrpcClient::pull_graph_list( butil::IOBufBytesIterator io_buffer_itr(res_io_buffer); size_t bytes_size = io_buffer_itr.bytes_left(); char *buffer = new char[bytes_size]; - io_buffer_itr.copy_and_forward((void *)(buffer), bytes_size); + io_buffer_itr.copy_and_forward(reinterpret_cast(buffer), + bytes_size); size_t index = 0; while (index < bytes_size) { FeatureNode node; @@ -578,11 +598,13 @@ std::future GraphBrpcClient::pull_graph_list( closure->request(0)->set_cmd_id(PS_PULL_GRAPH_LIST); closure->request(0)->set_table_id(table_id); closure->request(0)->set_client_id(_client_id); - closure->request(0)->add_params((char *)&type_id, sizeof(int)); - closure->request(0)->add_params((char *)&idx_, sizeof(int)); - closure->request(0)->add_params((char *)&start, sizeof(int)); - closure->request(0)->add_params((char *)&size, sizeof(int)); - closure->request(0)->add_params((char *)&step, sizeof(int)); + closure->request(0)->add_params(reinterpret_cast(&type_id), + sizeof(int)); + closure->request(0)->add_params(reinterpret_cast(&idx_), sizeof(int)); + closure->request(0)->add_params(reinterpret_cast(&start), + sizeof(int)); + closure->request(0)->add_params(reinterpret_cast(&size), sizeof(int)); + closure->request(0)->add_params(reinterpret_cast(&step), sizeof(int)); // PsService_Stub rpc_stub(GetCmdChannel(server_index)); GraphPsService_Stub rpc_stub = getServiceStub(GetCmdChannel(server_index)); closure->cntl(0)->set_log_id(butil::gettimeofday_ms()); @@ -629,7 +651,7 @@ std::future GraphBrpcClient::set_node_feat( request_call_num, [&, node_id_buckets, query_idx_buckets, request_call_num](void *done) { int ret = 0; - auto *closure = (DownpourBrpcClosure *)done; + auto *closure = reinterpret_cast(done); size_t fail_num = 0; for (size_t request_idx = 0; request_idx < request_call_num; ++request_idx) { @@ -655,10 +677,12 @@ std::future GraphBrpcClient::set_node_feat( closure->request(request_idx)->set_client_id(_client_id); size_t node_num = node_id_buckets[request_idx].size(); - closure->request(request_idx)->add_params((char *)&idx_, sizeof(int)); closure->request(request_idx) - ->add_params((char *)node_id_buckets[request_idx].data(), - sizeof(int64_t) * node_num); + ->add_params(reinterpret_cast(&idx_), sizeof(int)); + closure->request(request_idx) + ->add_params( + reinterpret_cast(node_id_buckets[request_idx].data()), + sizeof(int64_t) * node_num); std::string joint_feature_name = paddle::string::join_strings(feature_names, '\t'); closure->request(request_idx) @@ -670,7 +694,7 @@ std::future GraphBrpcClient::set_node_feat( for (size_t node_idx = 0; node_idx < node_num; ++node_idx) { size_t feat_len = features_idx_buckets[request_idx][feat_idx][node_idx].size(); - set_feature.append((char *)&feat_len, sizeof(size_t)); + set_feature.append(reinterpret_cast(&feat_len), sizeof(size_t)); set_feature.append( features_idx_buckets[request_idx][feat_idx][node_idx].data(), feat_len);