diff --git a/paddle/fluid/distributed/ps/table/common_graph_table.cc b/paddle/fluid/distributed/ps/table/common_graph_table.cc index a77209928563bbd5ec16c46ec690063a888a30e9..a8977ad58a5caa5d357f64fdde1d6b3fb968791d 100644 --- a/paddle/fluid/distributed/ps/table/common_graph_table.cc +++ b/paddle/fluid/distributed/ps/table/common_graph_table.cc @@ -78,7 +78,7 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea( paddle::framework::GpuPsFeaInfo x; std::vector feature_ids; for (size_t j = 0; j < bags[i].size(); j++) { - // TODO use FEATURE_TABLE instead + // TODO(danleifeng): use FEATURE_TABLE instead Node *v = find_node(1, bags[i][j]); node_id = bags[i][j]; if (v == NULL) { @@ -109,7 +109,7 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea( })); } } - for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get(); + for (size_t i = 0; i < tasks.size(); i++) tasks[i].get(); paddle::framework::GpuPsCommGraphFea res; uint64_t tot_len = 0; for (int i = 0; i < task_pool_size_; i++) { @@ -120,7 +120,7 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea( res.init_on_cpu(tot_len, (unsigned int)node_ids.size(), slot_num); unsigned int offset = 0, ind = 0; for (int i = 0; i < task_pool_size_; i++) { - for (int j = 0; j < (int)node_id_array[i].size(); j++) { + for (size_t j = 0; j < node_id_array[i].size(); j++) { res.node_list[ind] = node_id_array[i][j]; res.fea_info_list[ind] = node_fea_info_array[i][j]; res.fea_info_list[ind++].feature_offset += offset; @@ -177,7 +177,7 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph( })); } } - for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get(); + for (size_t i = 0; i < tasks.size(); i++) tasks[i].get(); int64_t tot_len = 0; for (int i = 0; i < task_pool_size_; i++) { @@ -188,7 +188,7 @@ paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph( res.init_on_cpu(tot_len, ids.size()); int64_t offset = 0, ind = 0; for (int i = 0; i < task_pool_size_; i++) { - for (int j = 0; j < (int)node_array[i].size(); j++) { + for (size_t j = 0; j < node_array[i].size(); j++) { res.node_list[ind] = node_array[i][j]; res.node_info_list[ind] = info_array[i][j]; res.node_info_list[ind++].neighbor_offset += offset; @@ -213,7 +213,7 @@ int32_t GraphTable::add_node_to_ssd( ch, sizeof(int) * 2 + sizeof(uint64_t), str) == 0) { - uint64_t *stored_data = ((uint64_t *)str.c_str()); + uint64_t *stored_data = ((uint64_t *)str.c_str()); // NOLINT int n = str.size() / sizeof(uint64_t); char *new_data = new char[n * sizeof(uint64_t) + len]; memcpy(new_data, stored_data, n * sizeof(uint64_t)); @@ -221,14 +221,14 @@ int32_t GraphTable::add_node_to_ssd( _db->put(src_id % shard_num % task_pool_size_, ch, sizeof(int) * 2 + sizeof(uint64_t), - (char *)new_data, + (char *)new_data, // NOLINT n * sizeof(uint64_t) + len); delete[] new_data; } else { _db->put(src_id % shard_num % task_pool_size_, ch, sizeof(int) * 2 + sizeof(uint64_t), - (char *)data, + (char *)data, // NOLINT len); } } @@ -254,7 +254,7 @@ char *GraphTable::random_sample_neighbor_from_ssd( ch, sizeof(int) * 2 + sizeof(uint64_t), str) == 0) { - uint64_t *data = ((uint64_t *)str.c_str()); + uint64_t *data = ((uint64_t *)str.c_str()); // NOLINT int n = str.size() / sizeof(uint64_t); std::unordered_map m; // std::vector res; @@ -281,7 +281,7 @@ char *GraphTable::random_sample_neighbor_from_ssd( // res.push_back(data[pos]); } for (int i = 0; i < actual_size; i += 8) { - VLOG(2) << "sampled an neighbor " << *(uint64_t *)&buff[i]; + VLOG(2) << "sampled an neighbor " << *(uint64_t *)&buff[i]; // NOLINT } return buff; } @@ -310,8 +310,8 @@ int64_t GraphTable::load_graph_to_memory_from_ssd(int idx, std::string str; if (_db->get(i, ch, sizeof(int) * 2 + sizeof(uint64_t), str) == 0) { count[i] += (int64_t)str.size(); - for (size_t j = 0; j < (int)str.size(); j += sizeof(uint64_t)) { - uint64_t id = *(uint64_t *)(str.c_str() + j); + for (size_t j = 0; j < str.size(); j += sizeof(uint64_t)) { + uint64_t id = *(uint64_t *)(str.c_str() + j); // NOLINT add_comm_edge(idx, v, id); } } @@ -321,7 +321,7 @@ int64_t GraphTable::load_graph_to_memory_from_ssd(int idx, } } - for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get(); + for (size_t i = 0; i < tasks.size(); i++) tasks[i].get(); int64_t tot = 0; for (auto x : count) tot += x; return tot; @@ -354,9 +354,9 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) { iters.push_back(_db->get_iterator(i)); iters[i]->SeekToFirst(); } - int next = 0; + size_t next = 0; while (iters.size()) { - if (next >= (int)iters.size()) { + if (next >= iters.size()) { next = 0; } if (!iters[next]->Valid()) { @@ -364,15 +364,16 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) { continue; } std::string key = iters[next]->key().ToString(); - int type_idx = *(int *)key.c_str(); - int temp_idx = *(int *)(key.c_str() + sizeof(int)); + int type_idx = *(int *)key.c_str(); // NOLINT + int temp_idx = *(int *)(key.c_str() + sizeof(int)); // NOLINT if (type_idx != 0 || temp_idx != idx) { iters[next]->Next(); next++; continue; } std::string value = iters[next]->value().ToString(); - std::uint64_t i_key = *(uint64_t *)(key.c_str() + sizeof(int) * 2); + std::uint64_t i_key = + *(uint64_t *)(key.c_str() + sizeof(int) * 2); // NOLINT for (int i = 0; i < part_len; i++) { if (memory_remaining[i] < (int64_t)value.size()) { score[i] = -100000.0; @@ -380,8 +381,8 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) { score[i] = 0; } } - for (size_t j = 0; j < (int)value.size(); j += sizeof(uint64_t)) { - uint64_t v = *((uint64_t *)(value.c_str() + j)); + for (size_t j = 0; j < value.size(); j += sizeof(uint64_t)) { + uint64_t v = *((uint64_t *)(value.c_str() + j)); // NOLINT int index = -1; if (id_map.find(v) != id_map.end()) { index = id_map[v]; @@ -398,9 +399,9 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) { int index = 0; for (int i = 0; i < part_len; i++) { base = gb_size_by_discount - memory_remaining[i] + value.size(); - if (has_weight) + if (has_weight) { weight_base = weight_cost[i] + w * weight_param; - else { + } else { weight_base = 0; } score[i] -= a * y * std::pow(1.0 * base, y - 1) + weight_base; @@ -434,7 +435,7 @@ void GraphTable::export_partition_files(int idx, std::string file_path) { int part_len = partitions[idx].size(); if (part_len == 0) return; if (file_path == "") file_path = "."; - if (file_path[(int)file_path.size() - 1] != '/') { + if (file_path[file_path.size() - 1] != '/') { file_path += "/"; } std::vector> tasks; @@ -459,7 +460,7 @@ void GraphTable::export_partition_files(int idx, std::string file_path) { })); } - for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get(); + for (size_t i = 0; i < tasks.size(); i++) tasks[i].get(); } void GraphTable::clear_graph(int idx) { for (auto p : edge_shards[idx]) { @@ -472,7 +473,7 @@ void GraphTable::clear_graph(int idx) { } } int32_t GraphTable::load_next_partition(int idx) { - if (next_partition >= (int)partitions[idx].size()) { + if (next_partition >= static_cast(partitions[idx].size())) { VLOG(0) << "partition iteration is done"; return -1; } @@ -518,8 +519,8 @@ int32_t GraphTable::load_edges_to_ssd(const std::string &path, add_node_to_ssd(0, idx, src_id, - (char *)dist_data.data(), - (int)(dist_data.size() * sizeof(uint64_t))); + (char *)dist_data.data(), // NOLINT + static_cast(dist_data.size() * sizeof(uint64_t))); } } VLOG(0) << "total memory cost = " << total_memory_cost << " bytes"; @@ -537,14 +538,14 @@ int32_t GraphTable::dump_edges_to_ssd(int idx) { std::vector &v = shards[i]->get_bucket(); for (size_t j = 0; j < v.size(); j++) { std::vector s; - for (size_t k = 0; k < (int)v[j]->get_neighbor_size(); k++) { + for (size_t k = 0; k < v[j]->get_neighbor_size(); k++) { s.push_back(v[j]->get_neighbor_id(k)); } cost += v[j]->get_neighbor_size() * sizeof(uint64_t); add_node_to_ssd(0, idx, v[j]->get_id(), - (char *)s.data(), + (char *)s.data(), // NOLINT s.size() * sizeof(uint64_t)); } return cost; @@ -901,7 +902,8 @@ void BasicBfsGraphSampler::init(size_t gpu_num, GraphTable *graph_table, std::vector GraphShard::get_batch(int start, int end, int step) { if (start < 0) start = 0; std::vector res; - for (int pos = start; pos < std::min(end, (int)bucket.size()); pos += step) { + for (int pos = start; pos < std::min(end, (int)bucket.size()); // NOLINT + pos += step) { res.push_back(bucket[pos]); } return res; @@ -990,7 +992,7 @@ void GraphShard::delete_node(uint64_t id) { if (iter == node_location.end()) return; int pos = iter->second; delete bucket[pos]; - if (pos != (int)bucket.size() - 1) { + if (pos != static_cast(bucket.size()) - 1) { bucket[pos] = bucket.back(); node_location[bucket.back()->get_id()] = pos; } @@ -1002,7 +1004,7 @@ GraphNode *GraphShard::add_graph_node(uint64_t id) { node_location[id] = bucket.size(); bucket.push_back(new GraphNode(id)); } - return (GraphNode *)bucket[node_location[id]]; + return (GraphNode *)bucket[node_location[id]]; // NOLINT } GraphNode *GraphShard::add_graph_node(Node *node) { @@ -1011,17 +1013,17 @@ GraphNode *GraphShard::add_graph_node(Node *node) { node_location[id] = bucket.size(); bucket.push_back(node); } - return (GraphNode *)bucket[node_location[id]]; + return (GraphNode *)bucket[node_location[id]]; // NOLINT } FeatureNode *GraphShard::add_feature_node(uint64_t id, bool is_overlap) { if (node_location.find(id) == node_location.end()) { node_location[id] = bucket.size(); bucket.push_back(new FeatureNode(id)); - return (FeatureNode *)bucket[node_location[id]]; + return (FeatureNode *)bucket[node_location[id]]; // NOLINT } if (is_overlap) { - return (FeatureNode *)bucket[node_location[id]]; + return (FeatureNode *)bucket[node_location[id]]; // NOLINT } return NULL; @@ -1037,14 +1039,14 @@ Node *GraphShard::find_node(uint64_t id) { } GraphTable::~GraphTable() { - for (int i = 0; i < (int)edge_shards.size(); i++) { + for (size_t i = 0; i < edge_shards.size(); i++) { for (auto p : edge_shards[i]) { delete p; } edge_shards[i].clear(); } - for (int i = 0; i < (int)feature_shards.size(); i++) { + for (size_t i = 0; i < feature_shards.size(); i++) { for (auto p : feature_shards[i]) { delete p; } @@ -1070,7 +1072,7 @@ int32_t GraphTable::Load(const std::string &path, const std::string ¶m) { std::string GraphTable::get_inverse_etype(std::string &etype) { auto etype_split = paddle::string::split_string(etype, "2"); std::string res; - if ((int)etype_split.size() == 3) { + if (etype_split.size() == 3) { res = etype_split[2] + "2" + etype_split[1] + "2" + etype_split[0]; } else { res = etype_split[1] + "2" + etype_split[0]; @@ -1099,7 +1101,8 @@ int32_t GraphTable::load_node_and_edge_file(std::string etype, std::string etype_path = epath + "/" + etypes[i]; auto etype_path_list = paddle::framework::localfs_list(etype_path); std::string etype_path_str; - if (part_num > 0 && part_num < (int)etype_path_list.size()) { + if (part_num > 0 && + part_num < (int)etype_path_list.size()) { // NOLINT std::vector sub_etype_path_list( etype_path_list.begin(), etype_path_list.begin() + part_num); etype_path_str = @@ -1116,7 +1119,7 @@ int32_t GraphTable::load_node_and_edge_file(std::string etype, } else { auto npath_list = paddle::framework::localfs_list(npath); std::string npath_str; - if (part_num > 0 && part_num < (int)npath_list.size()) { + if (part_num > 0 && part_num < (int)npath_list.size()) { // NOLINT std::vector sub_npath_list( npath_list.begin(), npath_list.begin() + part_num); npath_str = paddle::string::join_strings(sub_npath_list, delim); @@ -1140,7 +1143,7 @@ int32_t GraphTable::load_node_and_edge_file(std::string etype, return 0; })); } - for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get(); + for (size_t i = 0; i < tasks.size(); i++) tasks[i].get(); return 0; } @@ -1154,13 +1157,14 @@ int32_t GraphTable::get_nodes_ids_by_ranges( res.clear(); auto &shards = type_id == 0 ? edge_shards[idx] : feature_shards[idx]; std::vector> tasks; - for (size_t i = 0; i < shards.size() && index < (int)ranges.size(); i++) { + for (size_t i = 0; i < shards.size() && index < (int)ranges.size(); // NOLINT + i++) { end = total_size + shards[i]->get_size(); start = total_size; - while (start < end && index < (int)ranges.size()) { - if (ranges[index].second <= start) + while (start < end && index < static_cast(ranges.size())) { + if (ranges[index].second <= start) { index++; - else if (ranges[index].first >= end) { + } else if (ranges[index].first >= end) { break; } else { int first = std::max(ranges[index].first, start); @@ -1178,7 +1182,8 @@ int32_t GraphTable::get_nodes_ids_by_ranges( res.reserve(res.size() + num); for (auto &id : keys) { res.push_back(id); - std::swap(res[rand() % res.size()], res[(int)res.size() - 1]); + std::swap(res[rand() % res.size()], + res[(int)res.size() - 1]); // NOLINT } mutex.unlock(); @@ -1291,7 +1296,7 @@ std::pair GraphTable::parse_node_file( return {local_count, local_valid_count}; } -// TODO opt load all node_types in once reading +// // TODO(danleifeng): opt load all node_types in once reading int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) { auto paths = paddle::string::split_string(path, ";"); uint64_t count = 0; @@ -1308,7 +1313,7 @@ int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) { return parse_node_file(paths[i]); })); } - for (int i = 0; i < (int)tasks.size(); i++) { + for (size_t i = 0; i < tasks.size(); i++) { auto res = tasks[i].get(); count += res.first; valid_count += res.second; @@ -1434,13 +1439,13 @@ int32_t GraphTable::load_edges(const std::string &path, VLOG(0) << "Begin GraphTable::load_edges() edge_type[" << edge_type << "]"; if (FLAGS_graph_load_in_parallel) { std::vector>> tasks; - for (int i = 0; i < paths.size(); i++) { + for (size_t i = 0; i < paths.size(); i++) { tasks.push_back(load_node_edge_task_pool->enqueue( [&, i, idx, this]() -> std::pair { return parse_edge_file(paths[i], idx, reverse_edge); })); } - for (int j = 0; j < (int)tasks.size(); j++) { + for (size_t j = 0; j < tasks.size(); j++) { auto res = tasks[j].get(); count += res.first; valid_count += res.second; @@ -1543,7 +1548,7 @@ int32_t GraphTable::random_sample_nodes(int type_id, int &actual_size) { int total_size = 0; auto &shards = type_id == 0 ? edge_shards[idx] : feature_shards[idx]; - for (int i = 0; i < (int)shards.size(); i++) { + for (size_t i = 0; i < shards.size(); i++) { total_size += shards[i]->get_size(); } if (sample_size > total_size) sample_size = total_size; @@ -1554,9 +1559,11 @@ int32_t GraphTable::random_sample_nodes(int type_id, int remain = sample_size, last_pos = -1, num; std::set separator_set; for (int i = 0; i < range_num - 1; i++) { - while (separator_set.find(num = rand() % (sample_size - 1)) != - separator_set.end()) - ; + unsigned int seed = time(0); + while (separator_set.find(num = rand_r(&seed) % (sample_size - 1)) != + separator_set.end()) { + continue; + } separator_set.insert(num); } for (auto p : separator_set) { @@ -1567,8 +1574,11 @@ int32_t GraphTable::random_sample_nodes(int type_id, remain = total_size - sample_size + range_num; separator_set.clear(); for (int i = 0; i < range_num; i++) { - while (separator_set.find(num = rand() % remain) != separator_set.end()) - ; + unsigned int seed = time(0); + while (separator_set.find(num = rand_r(&seed) % remain) != + separator_set.end()) { + continue; + } separator_set.insert(num); } int used = 0, index = 0; @@ -1580,12 +1590,13 @@ int32_t GraphTable::random_sample_nodes(int type_id, used += ranges_len[index++]; } std::vector> first_half, second_half; - int start_index = rand() % total_size; + unsigned int seed = time(0); + int start_index = rand_r(&seed) % total_size; for (size_t i = 0; i < ranges_len.size() && i < ranges_pos.size(); i++) { - if (ranges_pos[i] + ranges_len[i] - 1 + start_index < total_size) + if (ranges_pos[i] + ranges_len[i] - 1 + start_index < total_size) { first_half.push_back({ranges_pos[i] + start_index, ranges_pos[i] + ranges_len[i] + start_index}); - else if (ranges_pos[i] + start_index >= total_size) { + } else if (ranges_pos[i] + start_index >= total_size) { second_half.push_back( {ranges_pos[i] + start_index - total_size, ranges_pos[i] + ranges_len[i] + start_index - total_size}); @@ -1623,7 +1634,7 @@ int32_t GraphTable::random_sample_neighbors( id_list[index].emplace_back(idx, node_ids[idy], sample_size, need_weight); } - for (int i = 0; i < (int)seq_id.size(); i++) { + for (size_t i = 0; i < seq_id.size(); i++) { if (seq_id[i].size() == 0) continue; tasks.push_back(_shards_task_pool[i]->enqueue([&, i, this]() -> int { uint64_t node_id; @@ -1633,12 +1644,12 @@ int32_t GraphTable::random_sample_neighbors( response = scaled_lru->query(i, id_list[i].data(), id_list[i].size(), r); } - int index = 0; + size_t index = 0; std::vector sample_res; std::vector sample_keys; auto &rng = _shards_task_rng_pool[i]; for (size_t k = 0; k < id_list[i].size(); k++) { - if (index < (int)r.size() && + if (index < r.size() && r[index].first.node_key == id_list[i][k].node_key) { int idy = seq_id[i][k]; actual_sizes[idy] = r[index].second.actual_size; @@ -1722,7 +1733,7 @@ int32_t GraphTable::get_node_feat(int idx, if (node == nullptr) { return 0; } - for (int feat_idx = 0; feat_idx < (int)feature_names.size(); + for (size_t feat_idx = 0; feat_idx < feature_names.size(); ++feat_idx) { const std::string &feature_name = feature_names[feat_idx]; if (feat_id_map[idx].find(feature_name) != feat_id_map[idx].end()) { @@ -1755,7 +1766,7 @@ int32_t GraphTable::set_node_feat( size_t index = node_id % this->shard_num - this->shard_start; auto node = feature_shards[idx][index]->add_feature_node(node_id); node->set_feature_size(this->feat_name[idx].size()); - for (int feat_idx = 0; feat_idx < (int)feature_names.size(); + for (size_t feat_idx = 0; feat_idx < feature_names.size(); ++feat_idx) { const std::string &feature_name = feature_names[feat_idx]; if (feat_id_map[idx].find(feature_name) != feat_id_map[idx].end()) { @@ -1893,8 +1904,8 @@ int GraphTable::get_all_id(int type_id, MergeShardVector shard_merge(output, slice_num); auto &search_shards = type_id == 0 ? edge_shards : feature_shards; std::vector> tasks; - for (int idx = 0; idx < search_shards.size(); idx++) { - for (int j = 0; j < search_shards[idx].size(); j++) { + for (size_t idx = 0; idx < search_shards.size(); idx++) { + for (size_t j = 0; j < search_shards[idx].size(); j++) { tasks.push_back(_shards_task_pool[j % task_pool_size_]->enqueue( [&search_shards, idx, j, slice_num, &shard_merge]() -> size_t { std::vector> shard_keys; @@ -1917,8 +1928,8 @@ int GraphTable::get_all_neighbor_id( MergeShardVector shard_merge(output, slice_num); auto &search_shards = type_id == 0 ? edge_shards : feature_shards; std::vector> tasks; - for (int idx = 0; idx < search_shards.size(); idx++) { - for (int j = 0; j < search_shards[idx].size(); j++) { + for (size_t idx = 0; idx < search_shards.size(); idx++) { + for (size_t j = 0; j < search_shards[idx].size(); j++) { tasks.push_back(_shards_task_pool[j % task_pool_size_]->enqueue( [&search_shards, idx, j, slice_num, &shard_merge]() -> size_t { std::vector> shard_keys; @@ -1970,7 +1981,7 @@ int GraphTable::get_all_neighbor_id( auto &search_shards = type_id == 0 ? edge_shards[idx] : feature_shards[idx]; std::vector> tasks; VLOG(3) << "begin task, task_pool_size_[" << task_pool_size_ << "]"; - for (int i = 0; i < search_shards.size(); i++) { + for (size_t i = 0; i < search_shards.size(); i++) { tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue( [&search_shards, i, slice_num, &shard_merge]() -> size_t { std::vector> shard_keys; @@ -1996,7 +2007,7 @@ int GraphTable::get_all_feature_ids( MergeShardVector shard_merge(output, slice_num); auto &search_shards = type_id == 0 ? edge_shards[idx] : feature_shards[idx]; std::vector> tasks; - for (int i = 0; i < search_shards.size(); i++) { + for (size_t i = 0; i < search_shards.size(); i++) { tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue( [&search_shards, i, slice_num, &shard_merge]() -> size_t { std::vector> shard_keys; @@ -2139,7 +2150,8 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) { if (use_cache) { cache_size_limit = graph.cache_size_limit(); cache_ttl = graph.cache_ttl(); - make_neighbor_sample_cache((size_t)cache_size_limit, (size_t)cache_ttl); + make_neighbor_sample_cache((size_t)cache_size_limit, // NOLINT + (size_t)cache_ttl); // NOLINT } _shards_task_pool.resize(task_pool_size_); for (size_t i = 0; i < _shards_task_pool.size(); ++i) { @@ -2205,14 +2217,14 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) { #ifdef PADDLE_WITH_HETERPS partitions.resize(id_to_edge.size()); #endif - for (int k = 0; k < (int)edge_shards.size(); k++) { + for (size_t k = 0; k < edge_shards.size(); k++) { for (size_t i = 0; i < shard_num_per_server; i++) { edge_shards[k].push_back(new GraphShard()); } } node_weight[1].resize(id_to_feature.size()); feature_shards.resize(id_to_feature.size()); - for (int k = 0; k < (int)feature_shards.size(); k++) { + for (size_t k = 0; k < feature_shards.size(); k++) { for (size_t i = 0; i < shard_num_per_server; i++) { feature_shards[k].push_back(new GraphShard()); } diff --git a/paddle/fluid/distributed/ps/table/memory_dense_table.cc b/paddle/fluid/distributed/ps/table/memory_dense_table.cc index 9bad2113d170fcb3ccf156145461eec3dccd2c90..31978b4f584a381d63a95844a927186f0e36986b 100644 --- a/paddle/fluid/distributed/ps/table/memory_dense_table.cc +++ b/paddle/fluid/distributed/ps/table/memory_dense_table.cc @@ -21,8 +21,8 @@ namespace distributed { int FLAGS_pslib_table_save_max_retry_dense = 3; -void MemoryDenseTable::CreateInitializer(const std::string& attr, - const std::string& name) { +void MemoryDenseTable::CreateInitializer(const std::string &attr, + const std::string &name) { auto slices = string::split_string(attr, "&"); if (slices[0] == "gaussian_random") { @@ -60,14 +60,14 @@ int32_t MemoryDenseTable::InitializeValue() { values_.resize(size); total_dim_ = 0; for (int x = 0; x < size; ++x) { - auto& varname = common.params()[x]; - auto& dim = common.dims()[x]; + auto &varname = common.params()[x]; + auto &dim = common.dims()[x]; if (varname == "Param") { param_dim_ = dim; param_idx_ = x; } - auto& initializer = common.initializers()[x]; + auto &initializer = common.initializers()[x]; total_dim_ += dim; CreateInitializer(initializer, varname); @@ -81,7 +81,7 @@ int32_t MemoryDenseTable::InitializeValue() { fixed_len_params_dim_ = 0; for (int x = 0; x < size; ++x) { - auto& dim = common.dims()[x]; + auto &dim = common.dims()[x]; if (static_cast(dim) != param_dim_) { fixed_len_params_dim_ += dim; } else { @@ -124,19 +124,19 @@ int32_t MemoryDenseTable::InitializeOptimizer() { return 0; } -int32_t MemoryDenseTable::SetGlobalLR(float* lr) { +int32_t MemoryDenseTable::SetGlobalLR(float *lr) { _global_lr = lr; optimizer_->SetGlobalLR(_global_lr); return 0; } -int32_t MemoryDenseTable::Pull(TableContext& context) { +int32_t MemoryDenseTable::Pull(TableContext &context) { CHECK(context.value_type == Dense); - float* pull_values = context.pull_context.values; + float *pull_values = context.pull_context.values; return PullDense(pull_values, context.num); } -int32_t MemoryDenseTable::Push(TableContext& context) { +int32_t MemoryDenseTable::Push(TableContext &context) { CHECK(context.value_type == Dense); if (context.push_context.values != nullptr) { if (!context.push_context.is_param) { @@ -148,13 +148,13 @@ int32_t MemoryDenseTable::Push(TableContext& context) { return 0; } -int32_t MemoryDenseTable::PullDense(float* pull_values, size_t num) { +int32_t MemoryDenseTable::PullDense(float *pull_values, size_t num) { std::copy( values_[param_idx_].begin(), values_[param_idx_].end(), pull_values); return 0; } -int32_t MemoryDenseTable::PushDenseParam(const float* values, size_t num) { +int32_t MemoryDenseTable::PushDenseParam(const float *values, size_t num) { PADDLE_ENFORCE_GE( num, param_dim_, @@ -171,7 +171,7 @@ int32_t MemoryDenseTable::Pour() { return 0; } -int32_t MemoryDenseTable::PushDense(const float* values, size_t num) { +int32_t MemoryDenseTable::PushDense(const float *values, size_t num) { if (sync) { std::future task = _shards_task_pool[0]->enqueue([this, &values]() -> int { @@ -185,7 +185,7 @@ int32_t MemoryDenseTable::PushDense(const float* values, size_t num) { return 0; } -int32_t MemoryDenseTable::_PushDense(const float* values, size_t num) { +int32_t MemoryDenseTable::_PushDense(const float *values, size_t num) { PADDLE_ENFORCE_GE( num, param_dim_, @@ -212,8 +212,8 @@ int32_t MemoryDenseTable::_PushDense(const float* values, size_t num) { return 0; } -int32_t MemoryDenseTable::Load(const std::string& path, - const std::string& param) { +int32_t MemoryDenseTable::Load(const std::string &path, + const std::string ¶m) { if (param_dim_ <= 0) { return 0; } @@ -249,7 +249,7 @@ int32_t MemoryDenseTable::Load(const std::string& path, try { int dim_idx = 0; float data_buffer[5]; - float* data_buff_ptr = data_buffer; + float *data_buff_ptr = data_buffer; std::string line_data; auto common = _config.common(); @@ -319,8 +319,8 @@ int32_t MemoryDenseTable::Load(const std::string& path, return 0; } -int32_t MemoryDenseTable::Save(const std::string& path, - const std::string& param) { +int32_t MemoryDenseTable::Save(const std::string &path, + const std::string ¶m) { int save_param = atoi(param.c_str()); uint32_t feasign_size; VLOG(0) << "MemoryDenseTable::save path " << path; @@ -353,7 +353,7 @@ int32_t MemoryDenseTable::Save(const std::string& path, os.clear(); os.str(""); os << values_[param_col_ids_[0]][y] << " 0"; - for (int x = 2; x < param_col_ids_.size(); ++x) { + for (size_t x = 2; x < param_col_ids_.size(); ++x) { os << " "; os << values_[param_col_ids_[x]][y]; } @@ -365,7 +365,7 @@ int32_t MemoryDenseTable::Save(const std::string& path, os.clear(); os.str(""); os << values_[param_col_ids_[0]][y]; - for (int x = 1; x < param_col_ids_.size(); ++x) { + for (size_t x = 1; x < param_col_ids_.size(); ++x) { os << " "; os << values_[param_col_ids_[x]][y]; } @@ -383,7 +383,7 @@ int32_t MemoryDenseTable::Save(const std::string& path, auto write_channel = _afs_client.open_w(channel_config, 1024 * 1024 * 40, &err_no); - for (auto& t : result_buffer_param) { + for (auto &t : result_buffer_param) { if (0 != write_channel->write_line(t)) { ++retry_num; is_write_failed = true; diff --git a/paddle/fluid/distributed/ps/table/memory_sparse_table.cc b/paddle/fluid/distributed/ps/table/memory_sparse_table.cc index da78849d40cde067c6f271fa5f8348f8356c4ab0..f33112353963ab21b205756a7f01bc1775ddd567 100644 --- a/paddle/fluid/distributed/ps/table/memory_sparse_table.cc +++ b/paddle/fluid/distributed/ps/table/memory_sparse_table.cc @@ -41,12 +41,12 @@ namespace paddle { namespace distributed { int32_t MemorySparseTable::Initialize() { - auto& profiler = CostProfiler::instance(); + auto &profiler = CostProfiler::instance(); profiler.register_profiler("pserver_sparse_update_all"); profiler.register_profiler("pserver_sparse_select_all"); InitializeValue(); _shards_task_pool.resize(_task_pool_size); - for (int i = 0; i < _shards_task_pool.size(); ++i) { + for (size_t i = 0; i < _shards_task_pool.size(); ++i) { _shards_task_pool[i].reset(new ::ThreadPool(1)); } VLOG(0) << "initalize MemorySparseTable succ"; @@ -102,8 +102,8 @@ int32_t MemorySparseTable::InitializeValue() { return 0; } -int32_t MemorySparseTable::Load(const std::string& path, - const std::string& param) { +int32_t MemorySparseTable::Load(const std::string &path, + const std::string ¶m) { std::string table_path = TableDir(path); auto file_list = _afs_client.list(table_path); @@ -157,13 +157,13 @@ int32_t MemorySparseTable::Load(const std::string& path, err_no = 0; std::string line_data; auto read_channel = _afs_client.open_r(channel_config, 0, &err_no); - char* end = NULL; - auto& shard = _local_shards[i]; + char *end = NULL; + auto &shard = _local_shards[i]; try { while (read_channel->read_line(line_data) == 0 && line_data.size() > 1) { uint64_t key = std::strtoul(line_data.data(), &end, 10); - auto& value = shard[key]; + auto &value = shard[key]; value.resize(feature_value_size); int parse_size = _value_accesor->ParseFromString(++end, value.data()); value.resize(parse_size); @@ -200,7 +200,7 @@ int32_t MemorySparseTable::Load(const std::string& path, return 0; } -int32_t MemorySparseTable::LoadPatch(const std::vector& file_list, +int32_t MemorySparseTable::LoadPatch(const std::vector &file_list, int load_param) { if (!_config.enable_revert()) { LOG(INFO) << "MemorySparseTable should be enabled revert."; @@ -213,7 +213,7 @@ int32_t MemorySparseTable::LoadPatch(const std::vector& file_list, int o_start_idx = _shard_idx * _avg_local_shard_num; int o_end_idx = o_start_idx + _real_local_shard_num; - if (start_idx >= file_list.size()) { + if (start_idx >= static_cast(file_list.size())) { return 0; } size_t feature_value_size = @@ -224,7 +224,7 @@ int32_t MemorySparseTable::LoadPatch(const std::vector& file_list, omp_set_num_threads(thread_num); #pragma omp parallel for schedule(dynamic) - for (size_t i = start_idx; i < end_idx; ++i) { + for (int i = start_idx; i < end_idx; ++i) { FsChannelConfig channel_config; channel_config.path = file_list[i]; channel_config.converter = _value_accesor->Converter(load_param).converter; @@ -239,11 +239,11 @@ int32_t MemorySparseTable::LoadPatch(const std::vector& file_list, err_no = 0; std::string line_data; auto read_channel = _afs_client.open_r(channel_config, 0, &err_no); - char* end = NULL; + char *end = NULL; int m_local_shard_id = i % _m_avg_local_shard_num; std::unordered_set global_shard_idx; std::string global_shard_idx_str; - for (size_t j = o_start_idx; j < o_end_idx; ++j) { + for (int j = o_start_idx; j < o_end_idx; ++j) { if ((j % _avg_local_shard_num) % _m_real_local_shard_num == m_local_shard_id) { global_shard_idx.insert(j); @@ -267,9 +267,9 @@ int32_t MemorySparseTable::LoadPatch(const std::vector& file_list, continue; } size_t local_shard_idx = *index_iter % _avg_local_shard_num; - auto& shard = _local_shards[local_shard_idx]; + auto &shard = _local_shards[local_shard_idx]; - auto& value = shard[key]; + auto &value = shard[key]; value.resize(feature_value_size); int parse_size = _value_accesor->ParseFromString(++end, value.data()); value.resize(parse_size); @@ -300,7 +300,7 @@ int32_t MemorySparseTable::LoadPatch(const std::vector& file_list, } void MemorySparseTable::Revert() { - for (size_t i = 0; i < _real_local_shard_num; ++i) { + for (int i = 0; i < _real_local_shard_num; ++i) { _local_shards_new[i].clear(); } } @@ -309,8 +309,8 @@ void MemorySparseTable::CheckSavePrePatchDone() { _save_patch_model_thread.join(); } -int32_t MemorySparseTable::Save(const std::string& dirname, - const std::string& param) { +int32_t MemorySparseTable::Save(const std::string &dirname, + const std::string ¶m) { if (_real_local_shard_num == 0) { _local_show_threshold = -1; return 0; @@ -368,7 +368,7 @@ int32_t MemorySparseTable::Save(const std::string& dirname, int feasign_size = 0; int retry_num = 0; int err_no = 0; - auto& shard = _local_shards[i]; + auto &shard = _local_shards[i]; do { err_no = 0; feasign_size = 0; @@ -426,7 +426,7 @@ int32_t MemorySparseTable::Save(const std::string& dirname, return 0; } -int32_t MemorySparseTable::SavePatch(const std::string& path, int save_param) { +int32_t MemorySparseTable::SavePatch(const std::string &path, int save_param) { if (!_config.enable_revert()) { LOG(INFO) << "MemorySparseTable should be enabled revert."; return 0; @@ -441,7 +441,7 @@ int32_t MemorySparseTable::SavePatch(const std::string& path, int save_param) { omp_set_num_threads(thread_num); #pragma omp parallel for schedule(dynamic) - for (size_t i = 0; i < _m_real_local_shard_num; ++i) { + for (int i = 0; i < _m_real_local_shard_num; ++i) { FsChannelConfig channel_config; channel_config.path = paddle::string::format_string("%s/part-%03d-%05d", table_path.c_str(), @@ -463,9 +463,9 @@ int32_t MemorySparseTable::SavePatch(const std::string& path, int save_param) { auto write_channel = _afs_client.open_w(channel_config, 1024 * 1024 * 40, &err_no); - for (size_t j = 0; j < _real_local_shard_num; ++j) { + for (int j = 0; j < _real_local_shard_num; ++j) { if (j % _m_real_local_shard_num == i) { - auto& shard = _local_shards_patch_model[j]; + auto &shard = _local_shards_patch_model[j]; for (auto it = shard.begin(); it != shard.end(); ++it) { if (_value_accesor->Save(it.value().data(), save_param)) { std::string format_value = _value_accesor->ParseToString( @@ -515,14 +515,14 @@ int32_t MemorySparseTable::SavePatch(const std::string& path, int save_param) { } int64_t MemorySparseTable::CacheShuffle( - const std::string& path, - const std::string& param, + const std::string &path, + const std::string ¶m, double cache_threshold, std::function( - int msg_type, int to_pserver_id, std::string& msg)> send_msg_func, - paddle::framework::Channel>& - shuffled_channel, - const std::vector& table_ptrs) { + int msg_type, int to_pserver_id, std::string &msg)> send_msg_func, + paddle::framework::Channel> + &shuffled_channel, + const std::vector &table_ptrs) { LOG(INFO) << "cache shuffle with cache threshold: " << cache_threshold; int save_param = atoi(param.c_str()); // batch_model:0 xbox:1 if (!_config.enable_sparse_table_cache() || cache_threshold < 0) { @@ -546,22 +546,22 @@ int64_t MemorySparseTable::CacheShuffle( int feasign_size = 0; std::vector>> tmp_channels; - for (size_t i = 0; i < _real_local_shard_num; ++i) { + for (int i = 0; i < _real_local_shard_num; ++i) { tmp_channels.push_back( paddle::framework::MakeChannel>()); } omp_set_num_threads(thread_num); #pragma omp parallel for schedule(dynamic) - for (size_t i = 0; i < _real_local_shard_num; ++i) { - paddle::framework::ChannelWriter>& writer = + for (int i = 0; i < _real_local_shard_num; ++i) { + paddle::framework::ChannelWriter> &writer = writers[i]; writer.Reset(tmp_channels[i].get()); for (size_t idx = 0; idx < table_ptrs.size(); idx++) { - Table* table_ptr = table_ptrs[idx]; + Table *table_ptr = table_ptrs[idx]; auto value_accesor = table_ptr->ValueAccesor(); - shard_type* shard_ptr = static_cast(table_ptr->GetShard(i)); + shard_type *shard_ptr = static_cast(table_ptr->GetShard(i)); for (auto it = shard_ptr->begin(); it != shard_ptr->end(); ++it) { if (value_accesor->SaveCache( @@ -581,14 +581,14 @@ int64_t MemorySparseTable::CacheShuffle( // size: " << feasign_size << " and start sparse cache data shuffle real local // shard num: " << _real_local_shard_num; std::vector> local_datas; - for (size_t idx_shard = 0; idx_shard < _real_local_shard_num; ++idx_shard) { - paddle::framework::ChannelWriter>& writer = + for (int idx_shard = 0; idx_shard < _real_local_shard_num; ++idx_shard) { + paddle::framework::ChannelWriter> &writer = writers[idx_shard]; auto channel = writer.channel(); - std::vector>& data = datas[idx_shard]; + std::vector> &data = datas[idx_shard]; std::vector ars(shuffle_node_num); while (channel->Read(data)) { - for (auto& t : data) { + for (auto &t : data) { auto pserver_id = paddle::distributed::local_random_engine()() % shuffle_node_num; if (pserver_id != _shard_idx) { @@ -604,9 +604,9 @@ int64_t MemorySparseTable::CacheShuffle( send_index[i] = i; } std::random_shuffle(send_index.begin(), send_index.end()); - for (auto index = 0u; index < shuffle_node_num; ++index) { + for (int index = 0; index < shuffle_node_num; ++index) { int i = send_index[index]; - if (i == _shard_idx) { + if (i == static_cast(_shard_idx)) { continue; } if (ars[i].Length() == 0) { @@ -617,7 +617,7 @@ int64_t MemorySparseTable::CacheShuffle( total_status.push_back(std::move(ret)); send_data_size[i] += ars[i].Length(); } - for (auto& t : total_status) { + for (auto &t : total_status) { t.wait(); } ars.clear(); @@ -630,10 +630,10 @@ int64_t MemorySparseTable::CacheShuffle( } int32_t MemorySparseTable::SaveCache( - const std::string& path, - const std::string& param, - paddle::framework::Channel>& - shuffled_channel) { + const std::string &path, + const std::string ¶m, + paddle::framework::Channel> + &shuffled_channel) { if (_shard_idx >= _config.sparse_table_cache_file_num()) { return 0; } @@ -656,7 +656,7 @@ int32_t MemorySparseTable::SaveCache( bool is_write_failed = false; shuffled_channel->Close(); while (shuffled_channel->Read(data)) { - for (auto& t : data) { + for (auto &t : data) { ++feasign_size; if (0 != write_channel->write_line(paddle::string::format_string( "%lu %s", t.first, t.second.c_str()))) { @@ -695,7 +695,7 @@ int64_t MemorySparseTable::LocalMFSize() { tasks[shard_id] = _shards_task_pool[shard_id % _shards_task_pool.size()]->enqueue( [this, shard_id, &size_arr]() -> int { - auto& local_shard = _local_shards[shard_id]; + auto &local_shard = _local_shards[shard_id]; for (auto it = local_shard.begin(); it != local_shard.end(); ++it) { if (_value_accesor->HasMF(it.value().size())) { @@ -720,20 +720,20 @@ std::pair MemorySparseTable::PrintTableStat() { return {feasign_size, mf_size}; } -int32_t MemorySparseTable::Pull(TableContext& context) { +int32_t MemorySparseTable::Pull(TableContext &context) { CHECK(context.value_type == Sparse); if (context.use_ptr) { - char** pull_values = context.pull_context.ptr_values; - const uint64_t* keys = context.pull_context.keys; + char **pull_values = context.pull_context.ptr_values; + const uint64_t *keys = context.pull_context.keys; return PullSparsePtr(pull_values, keys, context.num); } else { - float* pull_values = context.pull_context.values; - const PullSparseValue& pull_value = context.pull_context.pull_value; + float *pull_values = context.pull_context.values; + const PullSparseValue &pull_value = context.pull_context.pull_value; return PullSparse(pull_values, pull_value); } } -int32_t MemorySparseTable::Push(TableContext& context) { +int32_t MemorySparseTable::Push(TableContext &context) { CHECK(context.value_type == Sparse); if (!context.use_ptr) { return PushSparse( @@ -745,8 +745,8 @@ int32_t MemorySparseTable::Push(TableContext& context) { } } -int32_t MemorySparseTable::PullSparse(float* pull_values, - const PullSparseValue& pull_value) { +int32_t MemorySparseTable::PullSparse(float *pull_values, + const PullSparseValue &pull_value) { CostTimer timer("pserver_sparse_select_all"); std::vector> tasks(_real_local_shard_num); @@ -776,11 +776,11 @@ int32_t MemorySparseTable::PullSparse(float* pull_values, pull_values, mf_value_size, select_value_size]() -> int { - auto& local_shard = _local_shards[shard_id]; + auto &local_shard = _local_shards[shard_id]; float data_buffer[value_size]; // NOLINT - float* data_buffer_ptr = data_buffer; + float *data_buffer_ptr = data_buffer; - auto& keys = task_keys[shard_id]; + auto &keys = task_keys[shard_id]; for (size_t i = 0; i < keys.size(); i++) { uint64_t key = keys[i].first; auto itr = local_shard.find(key); @@ -790,9 +790,9 @@ int32_t MemorySparseTable::PullSparse(float* pull_values, if (FLAGS_pserver_create_value_when_push) { memset(data_buffer, 0, sizeof(float) * data_size); } else { - auto& feature_value = local_shard[key]; + auto &feature_value = local_shard[key]; feature_value.resize(data_size); - float* data_ptr = feature_value.data(); + float *data_ptr = feature_value.data(); _value_accesor->Create(&data_buffer_ptr, 1); memcpy( data_ptr, data_buffer_ptr, data_size * sizeof(float)); @@ -807,9 +807,9 @@ int32_t MemorySparseTable::PullSparse(float* pull_values, data_buffer[mf_idx] = 0.0; } auto offset = keys[i].second; - float* select_data = pull_values + select_value_size * offset; + float *select_data = pull_values + select_value_size * offset; _value_accesor->Select( - &select_data, (const float**)&data_buffer_ptr, 1); + &select_data, (const float **)&data_buffer_ptr, 1); } return 0; @@ -822,8 +822,8 @@ int32_t MemorySparseTable::PullSparse(float* pull_values, return 0; } -int32_t MemorySparseTable::PullSparsePtr(char** pull_values, - const uint64_t* keys, +int32_t MemorySparseTable::PullSparsePtr(char **pull_values, + const uint64_t *keys, size_t num) { CostTimer timer("pscore_sparse_select_all"); size_t value_size = _value_accesor->GetAccessorInfo().size / sizeof(float); @@ -847,20 +847,20 @@ int32_t MemorySparseTable::PullSparsePtr(char** pull_values, pull_values, value_size, mf_value_size]() -> int { - auto& keys = task_keys[shard_id]; - auto& local_shard = _local_shards[shard_id]; + auto &keys = task_keys[shard_id]; + auto &local_shard = _local_shards[shard_id]; float data_buffer[value_size]; // NOLINT - float* data_buffer_ptr = data_buffer; + float *data_buffer_ptr = data_buffer; for (size_t i = 0; i < keys.size(); ++i) { uint64_t key = keys[i].first; auto itr = local_shard.find(key); size_t data_size = value_size - mf_value_size; - FixedFeatureValue* ret = NULL; + FixedFeatureValue *ret = NULL; if (itr == local_shard.end()) { // ++missed_keys; - auto& feature_value = local_shard[key]; + auto &feature_value = local_shard[key]; feature_value.resize(data_size); - float* data_ptr = feature_value.data(); + float *data_ptr = feature_value.data(); _value_accesor->Create(&data_buffer_ptr, 1); memcpy(data_ptr, data_buffer_ptr, data_size * sizeof(float)); ret = &feature_value; @@ -868,7 +868,7 @@ int32_t MemorySparseTable::PullSparsePtr(char** pull_values, ret = itr.value_ptr(); } int pull_data_idx = keys[i].second; - pull_values[pull_data_idx] = reinterpret_cast(ret); + pull_values[pull_data_idx] = reinterpret_cast(ret); } return 0; }); @@ -879,8 +879,8 @@ int32_t MemorySparseTable::PullSparsePtr(char** pull_values, return 0; } -int32_t MemorySparseTable::PushSparse(const uint64_t* keys, - const float* values, +int32_t MemorySparseTable::PushSparse(const uint64_t *keys, + const float *values, size_t num) { CostTimer timer("pserver_sparse_update_all"); std::vector> tasks(_real_local_shard_num); @@ -907,15 +907,15 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys, update_value_col, values, &task_keys]() -> int { - auto& keys = task_keys[shard_id]; - auto& local_shard = _local_shards[shard_id]; - auto& local_shard_new = _local_shards_new[shard_id]; + auto &keys = task_keys[shard_id]; + auto &local_shard = _local_shards[shard_id]; + auto &local_shard_new = _local_shards_new[shard_id]; float data_buffer[value_col]; // NOLINT - float* data_buffer_ptr = data_buffer; + float *data_buffer_ptr = data_buffer; for (size_t i = 0; i < keys.size(); ++i) { uint64_t key = keys[i].first; uint64_t push_data_idx = keys[i].second; - const float* update_data = + const float *update_data = values + push_data_idx * update_value_col; auto itr = local_shard.find(key); if (itr == local_shard.end()) { @@ -924,7 +924,7 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys, continue; } auto value_size = value_col - mf_value_col; - auto& feature_value = local_shard[key]; + auto &feature_value = local_shard[key]; feature_value.resize(value_size); _value_accesor->Create(&data_buffer_ptr, 1); memcpy(feature_value.data(), @@ -933,8 +933,8 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys, itr = local_shard.find(key); } - auto& feature_value = itr.value(); - float* value_data = feature_value.data(); + auto &feature_value = itr.value(); + float *value_data = feature_value.data(); size_t value_size = feature_value.size(); if (value_size == value_col) { // 已拓展到最大size, 则就地update @@ -952,7 +952,7 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys, memcpy(value_data, data_buffer_ptr, value_size * sizeof(float)); } if (_config.enable_revert()) { - FixedFeatureValue* feature_value_new = &(local_shard_new[key]); + FixedFeatureValue *feature_value_new = &(local_shard_new[key]); auto new_size = feature_value.size(); feature_value_new->resize(new_size); memcpy(feature_value_new->data(), @@ -970,8 +970,8 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys, return 0; } -int32_t MemorySparseTable::PushSparse(const uint64_t* keys, - const float** values, +int32_t MemorySparseTable::PushSparse(const uint64_t *keys, + const float **values, size_t num) { std::vector> tasks(_real_local_shard_num); std::vector>> task_keys( @@ -996,14 +996,14 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys, update_value_col, values, &task_keys]() -> int { - auto& keys = task_keys[shard_id]; - auto& local_shard = _local_shards[shard_id]; + auto &keys = task_keys[shard_id]; + auto &local_shard = _local_shards[shard_id]; float data_buffer[value_col]; // NOLINT - float* data_buffer_ptr = data_buffer; + float *data_buffer_ptr = data_buffer; for (size_t i = 0; i < keys.size(); ++i) { uint64_t key = keys[i].first; uint64_t push_data_idx = keys[i].second; - const float* update_data = values[push_data_idx]; + const float *update_data = values[push_data_idx]; auto itr = local_shard.find(key); if (itr == local_shard.end()) { if (FLAGS_pserver_enable_create_feasign_randomly && @@ -1011,7 +1011,7 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys, continue; } auto value_size = value_col - mf_value_col; - auto& feature_value = local_shard[key]; + auto &feature_value = local_shard[key]; feature_value.resize(value_size); _value_accesor->Create(&data_buffer_ptr, 1); memcpy(feature_value.data(), @@ -1019,8 +1019,8 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys, value_size * sizeof(float)); itr = local_shard.find(key); } - auto& feature_value = itr.value(); - float* value_data = feature_value.data(); + auto &feature_value = itr.value(); + float *value_data = feature_value.data(); size_t value_size = feature_value.size(); if (value_size == value_col) { // 已拓展到最大size, 则就地update _value_accesor->Update(&value_data, &update_data, 1); @@ -1048,12 +1048,12 @@ int32_t MemorySparseTable::PushSparse(const uint64_t* keys, int32_t MemorySparseTable::Flush() { return 0; } -int32_t MemorySparseTable::Shrink(const std::string& param) { +int32_t MemorySparseTable::Shrink(const std::string ¶m) { VLOG(0) << "MemorySparseTable::Shrink"; // TODO(zhaocaibei123): implement with multi-thread for (int shard_id = 0; shard_id < _real_local_shard_num; ++shard_id) { // Shrink - auto& shard = _local_shards[shard_id]; + auto &shard = _local_shards[shard_id]; for (auto it = shard.begin(); it != shard.end();) { if (_value_accesor->Shrink(it.value().data())) { it = shard.erase(it); diff --git a/paddle/fluid/distributed/ps/table/sparse_sgd_rule.cc b/paddle/fluid/distributed/ps/table/sparse_sgd_rule.cc index 014d6e450ab4ac6ba5cdc323d8ec0a05dad6815a..6ab4506d29e4c661bccfa33d87ad867bdf336418 100644 --- a/paddle/fluid/distributed/ps/table/sparse_sgd_rule.cc +++ b/paddle/fluid/distributed/ps/table/sparse_sgd_rule.cc @@ -23,7 +23,7 @@ DEFINE_bool(enable_show_scale_gradient, true, "enable show scale gradient"); namespace paddle { namespace distributed { -void SparseNaiveSGDRule::LoadConfig(const SparseCommonSGDRuleParameter& param, +void SparseNaiveSGDRule::LoadConfig(const SparseCommonSGDRuleParameter ¶m, size_t emb_dim) { _embedding_dim = emb_dim; auto naive_param = param.naive(); @@ -41,9 +41,9 @@ void SparseNaiveSGDRule::LoadConfig(const SparseCommonSGDRuleParameter& param, } } -void SparseNaiveSGDRule::UpdateValueWork(float* w, - float* sgd, - const float* push_value, +void SparseNaiveSGDRule::UpdateValueWork(float *w, + float *sgd, + const float *push_value, float scale) { for (size_t i = 0; i < _embedding_dim; ++i) { w[i] -= learning_rate_ * push_value[i]; @@ -51,8 +51,8 @@ void SparseNaiveSGDRule::UpdateValueWork(float* w, } } -void SparseNaiveSGDRule::InitValueWork(float* value, - float* sgd, +void SparseNaiveSGDRule::InitValueWork(float *value, + float *sgd, bool zero_init) { if (zero_init) { for (size_t i = 0; i < _embedding_dim; ++i) { @@ -68,7 +68,7 @@ void SparseNaiveSGDRule::InitValueWork(float* value, } } } -void SparseAdaGradSGDRule::LoadConfig(const SparseCommonSGDRuleParameter& param, +void SparseAdaGradSGDRule::LoadConfig(const SparseCommonSGDRuleParameter ¶m, size_t emb_dim) { _embedding_dim = emb_dim; auto adagrad_param = param.adagrad(); @@ -88,11 +88,11 @@ void SparseAdaGradSGDRule::LoadConfig(const SparseCommonSGDRuleParameter& param, } } -void SparseAdaGradSGDRule::UpdateValueWork(float* w, - float* sgd, - const float* grad, +void SparseAdaGradSGDRule::UpdateValueWork(float *w, + float *sgd, + const float *grad, float scale) { - float& g2sum = sgd[G2SumIndex()]; + float &g2sum = sgd[G2SumIndex()]; double add_g2sum = 0; for (size_t i = 0; i < _embedding_dim; i++) { @@ -106,8 +106,8 @@ void SparseAdaGradSGDRule::UpdateValueWork(float* w, g2sum += add_g2sum / _embedding_dim; } -void SparseAdaGradSGDRule::InitValueWork(float* value, - float* sgd, +void SparseAdaGradSGDRule::InitValueWork(float *value, + float *sgd, bool zero_init) { for (size_t i = 0; i < _embedding_dim; ++i) { if (zero_init) { @@ -125,7 +125,7 @@ void SparseAdaGradSGDRule::InitValueWork(float* value, sgd[G2SumIndex()] = 0; } -void StdAdaGradSGDRule::LoadConfig(const SparseCommonSGDRuleParameter& param, +void StdAdaGradSGDRule::LoadConfig(const SparseCommonSGDRuleParameter ¶m, size_t emb_dim) { _embedding_dim = emb_dim; auto adagrad_param = param.adagrad(); @@ -145,12 +145,12 @@ void StdAdaGradSGDRule::LoadConfig(const SparseCommonSGDRuleParameter& param, } } -void StdAdaGradSGDRule::UpdateValueWork(float* w, - float* sgd, - const float* grad, +void StdAdaGradSGDRule::UpdateValueWork(float *w, + float *sgd, + const float *grad, float scale) { for (size_t i = 0; i < _embedding_dim; i++) { - float& g2sum = sgd[G2SumIndex() + i]; + float &g2sum = sgd[G2SumIndex() + i]; double scaled_grad = grad[i] / scale; w[i] -= learning_rate_ * scaled_grad * sqrt(_initial_g2sum / (_initial_g2sum + g2sum)); @@ -159,8 +159,8 @@ void StdAdaGradSGDRule::UpdateValueWork(float* w, } } -void StdAdaGradSGDRule::InitValueWork(float* value, - float* sgd, +void StdAdaGradSGDRule::InitValueWork(float *value, + float *sgd, bool zero_init) { for (size_t i = 0; i < _embedding_dim; ++i) { if (zero_init) { @@ -178,7 +178,7 @@ void StdAdaGradSGDRule::InitValueWork(float* value, } } -void SparseAdamSGDRule::LoadConfig(const SparseCommonSGDRuleParameter& param, +void SparseAdamSGDRule::LoadConfig(const SparseCommonSGDRuleParameter ¶m, size_t emb_dim) { _embedding_dim = emb_dim; auto adam_param = param.adam(); @@ -199,15 +199,15 @@ void SparseAdamSGDRule::LoadConfig(const SparseCommonSGDRuleParameter& param, } } -void SparseAdamSGDRule::UpdateValueWork(float* w, - float* sgd, - const float* grad, +void SparseAdamSGDRule::UpdateValueWork(float *w, + float *sgd, + const float *grad, float scale) { - float* gsum = sgd + GSumIndex(); - float* g2sum = sgd + G2SumIndex(); - float* beta1_pow = sgd + Beta1PowIndex(); - float* beta2_pow = sgd + Beta2PowIndex(); - const float* g = grad; + float *gsum = sgd + GSumIndex(); + float *g2sum = sgd + G2SumIndex(); + float *beta1_pow = sgd + Beta1PowIndex(); + float *beta2_pow = sgd + Beta2PowIndex(); + const float *g = grad; float lr = learning_rate_; float beta1_pow_ = *beta1_pow; @@ -227,8 +227,8 @@ void SparseAdamSGDRule::UpdateValueWork(float* w, (*beta2_pow) *= _beta2_decay_rate; } -void SparseAdamSGDRule::InitValueWork(float* value, - float* sgd, +void SparseAdamSGDRule::InitValueWork(float *value, + float *sgd, bool zero_init) { for (size_t i = 0; i < _embedding_dim; ++i) { if (zero_init) { @@ -253,7 +253,7 @@ void SparseAdamSGDRule::InitValueWork(float* value, } void SparseSharedAdamSGDRule::LoadConfig( - const SparseCommonSGDRuleParameter& param, size_t emb_dim) { + const SparseCommonSGDRuleParameter ¶m, size_t emb_dim) { _embedding_dim = emb_dim; auto adam_param = param.adam(); learning_rate_ = adam_param.learning_rate(); @@ -273,15 +273,15 @@ void SparseSharedAdamSGDRule::LoadConfig( } } -void SparseSharedAdamSGDRule::UpdateValueWork(float* w, - float* sgd, - const float* grad, +void SparseSharedAdamSGDRule::UpdateValueWork(float *w, + float *sgd, + const float *grad, float scale) { - float* gsum = sgd + GSumIndex(); - float* g2sum = sgd + G2SumIndex(); - float* beta1_pow = sgd + Beta1PowIndex(); - float* beta2_pow = sgd + Beta2PowIndex(); - const float* g = grad; + float *gsum = sgd + GSumIndex(); + float *g2sum = sgd + G2SumIndex(); + float *beta1_pow = sgd + Beta1PowIndex(); + float *beta2_pow = sgd + Beta2PowIndex(); + const float *g = grad; float lr = learning_rate_; float beta1_pow_ = *beta1_pow; @@ -292,7 +292,7 @@ void SparseSharedAdamSGDRule::UpdateValueWork(float* w, lr *= sqrt(1 - beta2_pow_) / (1 - beta1_pow_); double sum_gsum = 0.0; double sum_g2sum = 0.0; - for (int i = 0; i < _embedding_dim; i++) { + for (size_t i = 0; i < _embedding_dim; i++) { // Calculation double new_gsum = _beta1_decay_rate * gsum_ + (1 - _beta1_decay_rate) * g[i]; @@ -310,10 +310,10 @@ void SparseSharedAdamSGDRule::UpdateValueWork(float* w, (*beta2_pow) *= _beta2_decay_rate; } -void SparseSharedAdamSGDRule::InitValueWork(float* value, - float* sgd, +void SparseSharedAdamSGDRule::InitValueWork(float *value, + float *sgd, bool zero_init) { - for (int i = 0; i < _embedding_dim; ++i) { + for (size_t i = 0; i < _embedding_dim; ++i) { if (zero_init) { value[i] = 0.0; BoundValue(value[i]); @@ -327,7 +327,7 @@ void SparseSharedAdamSGDRule::InitValueWork(float* value, } } // init rule gsum and g2sum - for (int i = GSumIndex(); i < Beta1PowIndex(); i++) { + for (size_t i = GSumIndex(); i < Beta1PowIndex(); i++) { sgd[i] = 0.0; } // init beta1_pow and beta2_pow