diff --git a/core/general-server/op/general_dist_kv_infer_op.cpp b/core/general-server/op/general_dist_kv_infer_op.cpp index 2228ccb952b1a91a5e34f990ae4c186570b91f5d..8ee5033d976284b149a2a8bde4e64deea636311f 100644 --- a/core/general-server/op/general_dist_kv_infer_op.cpp +++ b/core/general-server/op/general_dist_kv_infer_op.cpp @@ -70,10 +70,13 @@ int GeneralDistKVInferOp::inference() { << ") Failed mutable depended argument, op:" << pre_name; return -1; } - + Timer timeline; + timeline.Start(); const TensorVector *in = &input_blob->tensor_vector; TensorVector *out = &output_blob->tensor_vector; std::vector keys; + std::vector unique_keys; + std::unordered_map key_map; std::vector values; int sparse_count = 0; // sparse inputs counts, sparse would seek cube int dense_count = 0; // dense inputs counts, dense would directly call paddle infer @@ -94,7 +97,8 @@ int GeneralDistKVInferOp::inference() { dataptr_size_pairs.push_back(std::make_pair(data_ptr, elem_num)); } keys.resize(key_len); - VLOG(3) << "(logid=" << log_id << ") cube number of keys to look up: " << key_len; + unique_keys.resize(key_len); + int key_idx = 0; for (size_t i = 0; i < dataptr_size_pairs.size(); ++i) { std::copy(dataptr_size_pairs[i].first, @@ -102,20 +106,35 @@ int GeneralDistKVInferOp::inference() { keys.begin() + key_idx); key_idx += dataptr_size_pairs[i].second; } + + int unique_keys_count = 0; + for (size_t i = 0; i < keys.size(); ++i) { + if (key_map.find(keys[i]) == key_map.end()) { + key_map[keys[i]] = nullptr; + unique_keys[unique_keys_count++] = keys[i]; + } + } + unique_keys.resize(unique_keys_count); + VLOG(1) << "(logid=" << log_id << ") cube number of keys to look up: " << key_len << " uniq keys: "<< unique_keys_count; rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance(); std::vector table_names = cube->get_table_names(); if (table_names.size() == 0) { LOG(ERROR) << "cube init error or cube config not given."; return -1; } - // gather keys and seek cube servers, put results in values - int ret = cube->seek(table_names[0], keys, &values); - VLOG(3) << "(logid=" << log_id << ") cube seek status: " << ret; + + int64_t seek_start = timeline.TimeStampUS(); + int ret = cube->seek(table_names[0], unique_keys, &values); + int64_t seek_end = timeline.TimeStampUS(); + VLOG(2) << "(logid=" << log_id << ") cube seek status: " << ret << " seek_time: " << seek_end - seek_start; + for (size_t i = 0; i < unique_keys.size(); ++i) { + key_map[unique_keys[i]] = &values[i]; + } if (values.size() != keys.size() || values[0].buff.size() == 0) { LOG(ERROR) << "cube value return null"; } - // EMBEDDING_SIZE means the length of sparse vector, user can define length here. - size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float); + //size_t EMBEDDING_SIZE = values[0].buff.size() / sizeof(float); + size_t EMBEDDING_SIZE = (values[0].buff.size() - 10) / sizeof(float); TensorVector sparse_out; sparse_out.resize(sparse_count); TensorVector dense_out; @@ -127,7 +146,9 @@ int GeneralDistKVInferOp::inference() { baidu::paddle_serving::predictor::Resource &resource = baidu::paddle_serving::predictor::Resource::instance(); std::shared_ptr model_config = resource.get_general_model_config().front(); - //copy data to tnsor + int cube_key_found = 0; + int cube_key_miss = 0; + for (size_t i = 0; i < in->size(); ++i) { if (in->at(i).dtype != paddle::PaddleDType::INT64) { dense_out[dense_idx] = in->at(i); @@ -150,20 +171,39 @@ int GeneralDistKVInferOp::inference() { float *dst_ptr = static_cast(sparse_out[sparse_idx].data.data()); for (int x = 0; x < sparse_out[sparse_idx].lod[0].back(); ++x) { float *data_ptr = dst_ptr + x * EMBEDDING_SIZE; - memcpy(data_ptr, - values[cube_val_idx].buff.data(), - values[cube_val_idx].buff.size()); - cube_val_idx++; + uint64_t cur_key = keys[cube_val_idx]; + rec::mcube::CubeValue* cur_val = key_map[cur_key]; + if (cur_val->buff.size() == 0) { + memset(data_ptr, (float)0.0, sizeof(float) * EMBEDDING_SIZE); + VLOG(3) << "(logid=" << log_id << ") cube key not found: " << keys[cube_val_idx]; + ++cube_key_miss; + ++cube_val_idx; + continue; + } + VLOG(2) << "(logid=" << log_id << ") key: " << keys[cube_val_idx] << " , cube value len:" << cur_val->buff.size(); + memcpy(data_ptr, cur_val->buff.data(), cur_val->buff.size()); + //VLOG(3) << keys[cube_val_idx] << ":" << data_ptr[0] << ", " << data_ptr[1] << ", " <_batch_size; output_blob->_batch_size = batch_size; - Timer timeline; int64_t start = timeline.TimeStampUS(); timeline.Start(); // call paddle inference here @@ -173,7 +213,12 @@ int GeneralDistKVInferOp::inference() { return -1; } int64_t end = timeline.TimeStampUS(); - + if (cube_fail) { + float *out_ptr = static_cast(out->at(0).data.data()); + out_ptr[0] = 0.0; + } + timeline.Pause(); + VLOG(2) << "dist kv, pure paddle infer time: " << timeline.ElapsedUS(); CopyBlobInfo(input_blob, output_blob); AddBlobInfo(output_blob, start); AddBlobInfo(output_blob, end);