diff --git a/paddle/fluid/framework/data_feed.cc b/paddle/fluid/framework/data_feed.cc index 939ca07d6fbf0561afebc3647314476862d104bc..0d66aeacbd22fe11d4d030366f8f8c952b0b903a 100644 --- a/paddle/fluid/framework/data_feed.cc +++ b/paddle/fluid/framework/data_feed.cc @@ -349,7 +349,7 @@ void InMemoryDataFeed::GlobalShuffle() { for (int64_t i = interval.first; i < interval.second; ++i) { // if get ins id, can also use hash // std::string ins_id = memory_data_[i].ins_id; - int64_t random_num = fleet_ptr->LocalRandomEngine()(); + int64_t random_num = rand_r(&rand_seed); int64_t node_id = random_num % trainer_num_; send_vec[node_id].push_back(&((*memory_data_)[i])); if (i % fleet_send_batch_size_ == 0 && i != 0) { diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index 2bc31e6c9b49deb2c01a84ce6491b31a37537b77..8ea09b65ddd569e8ca8e24ba3b2416666d0eec92 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -232,6 +232,7 @@ class InMemoryDataFeed : public PrivateQueueDataFeed { int thread_id_; int thread_num_; int trainer_num_; + uint32_t rand_seed; std::vector* memory_data_; std::mutex* mutex_for_update_memory_data_; // when read ins, we put ins from one channel to the other, diff --git a/paddle/fluid/framework/data_set.cc b/paddle/fluid/framework/data_set.cc index 774010e5e60647ab5fdf537a0e5c55f86efb8709..2fc30422b9309a5b07cf35051d6cccee3a3b667d 100644 --- a/paddle/fluid/framework/data_set.cc +++ b/paddle/fluid/framework/data_set.cc @@ -250,7 +250,7 @@ int DatasetImpl::ReceiveFromClient(int msg_type, int client_id, VLOG(3) << "ReceiveFromClient msg_type=" << msg_type << ", client_id=" << client_id << ", msg length=" << msg.length(); auto fleet_ptr = FleetWrapper::GetInstance(); - int64_t index = fleet_ptr->LocalRandomEngine()() % thread_num_; + int64_t index = rand_r(&rand_seed) % thread_num_; VLOG(3) << "ramdom index=" << index; readers_[index]->PutInsToChannel(msg); return 0; diff --git a/paddle/fluid/framework/data_set.h b/paddle/fluid/framework/data_set.h index e60ada1d5bd96768d9c0669a3f5e72fa73f32273..6fd3fcad28fa045326032200b7f26a18862454f4 100644 --- a/paddle/fluid/framework/data_set.h +++ b/paddle/fluid/framework/data_set.h @@ -136,6 +136,7 @@ class DatasetImpl : public Dataset { std::mutex mutex_for_pick_file_; std::string fs_name_; std::string fs_ugi_; + unsigned int rand_seed; }; // use std::vector as data type diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 59532562434fb56bd370f4c91189a1f97abb8309..6af8ba951895659120a40edc19b9218b515fb743 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -210,52 +210,20 @@ void FleetWrapper::PushDenseParamSync( const ProgramDesc& program, const uint64_t table_id, const std::vector& var_names) { #ifdef PADDLE_WITH_PSLIB - paddle::framework::Scope scope; - auto& block = program.Block(0); - for (auto& var : block.AllVars()) { - if (var->Persistable()) { - auto* ptr = scope.Var(var->Name()); - InitializeVariable(ptr, var->GetType()); - } else { - auto* ptr = scope.Var(var->Name()); - InitializeVariable(ptr, var->GetType()); - } - } auto place = platform::CPUPlace(); std::vector regions; for (auto& t : var_names) { Variable* var = scope.FindVar(t); - CHECK(var != nullptr) << "var[" << t << "] not found"; LoDTensor* tensor = var->GetMutable(); - std::vector dim; - for (auto& var : block.AllVars()) { - if (var->Name() == t) { - dim = var->GetShape(); - break; - } - } - int cnt = 1; - for (auto& i : dim) { - cnt *= i; - } - DDim d(std::vector{cnt}.data(), 1); - float* g = tensor->mutable_data(d, place); - CHECK(g != nullptr) << "var[" << t << "] value not initialized"; - float init_range = 0.2; - int rown = tensor->dims()[0]; - init_range /= sqrt(rown); - std::normal_distribution ndistr(0.0, 1.0); - for (auto i = 0u; i < tensor->numel(); ++i) { - g[i] = ndistr(LocalRandomEngine()) * init_range; - } + float* g = tensor->mutable_data(place); paddle::ps::Region reg(g, tensor->numel()); regions.emplace_back(std::move(reg)); - auto push_status = pslib_ptr_->_worker_ptr->push_dense_param( - regions.data(), regions.size(), table_id); - push_status.wait(); - auto status = push_status.get(); - CHECK(status == 0) << "push dense param failed, status[" << status << "]"; } + auto push_status = pslib_ptr_->_worker_ptr->push_dense_param( + regions.data(), regions.size(), table_id); + push_status.wait(); + auto status = push_status.get(); + CHECK(status == 0) << "push dense param failed, status[" << status << "]"; #endif } @@ -372,22 +340,6 @@ std::future FleetWrapper::SendClientToClientMsg( return std::future(); } -std::default_random_engine& FleetWrapper::LocalRandomEngine() { - struct engine_wrapper_t { - std::default_random_engine engine; - engine_wrapper_t() { - struct timespec tp; - clock_gettime(CLOCK_REALTIME, &tp); - double cur_time = tp.tv_sec + tp.tv_nsec * 1e-9; - static std::atomic x(0); - std::seed_seq sseq = {x++, x++, x++, (uint64_t)(cur_time * 1000)}; - engine.seed(sseq); - } - }; - thread_local engine_wrapper_t r; - return r.engine; -} - template void FleetWrapper::Serialize(const std::vector& t, std::string* str) { #ifdef PADDLE_WITH_PSLIB diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index 7a60686c24073395df833562781c7cc09b9d476c..40ed3c55113378976a1e0ed96d98aacdf70e79db 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -127,7 +127,6 @@ class FleetWrapper { std::future SendClientToClientMsg(int msg_type, int to_client_id, const std::string& msg); - std::default_random_engine& LocalRandomEngine(); template void Serialize(const std::vector& t, std::string* str); template diff --git a/paddle/fluid/string/string_helper.cc b/paddle/fluid/string/string_helper.cc index 1030eca36d6cc0c66285d0213ab634a903cdd9ec..27708b8eebd2131ebadcc310fd3521ad5ab824f3 100644 --- a/paddle/fluid/string/string_helper.cc +++ b/paddle/fluid/string/string_helper.cc @@ -79,7 +79,7 @@ inline int str_to_float(const char* str, float* v) { // A line buffer is maintained. It // doesn't need to know the maximum possible length of a line. char* LineFileReader::getdelim(FILE* f, char delim) { -#ifndef __WIN32 +#ifndef _WIN32 int32_t ret = ::getdelim(&_buffer, &_buf_size, delim, f); if (ret >= 0) {