From e8d5868e9720ee7b5bd8c9ff1cf2e6e7b7f1998a Mon Sep 17 00:00:00 2001 From: Thunderbrook <52529258+Thunderbrook@users.noreply.github.com> Date: Wed, 12 Aug 2020 11:54:17 +0800 Subject: [PATCH] cherry-pick fix dataset py3; fixlogger; add timeout of gloo; modify datanorm op (#26046) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix dataset py3 (#25012) * fix dataset py3 error * test=develop * fix logger (#24682) * fix logger of FetchHandler,which may print log twice * test=develop * add timeout and http store in communication (#23436) * add timeout and http store in communication, add revert and confirm in fleet * test=develop * modify datanorm op test=develop (#23030) Co-authored-by: xujiaqi01 <173596896@qq.com> Co-authored-by: yaoxuefeng --- paddle/fluid/framework/fleet/fleet_wrapper.cc | 22 ++ paddle/fluid/framework/fleet/fleet_wrapper.h | 4 + paddle/fluid/framework/fleet/gloo_wrapper.cc | 221 ++++++++++++---- paddle/fluid/framework/fleet/gloo_wrapper.h | 83 +++++- paddle/fluid/framework/fleet/test_fleet.cc | 19 +- paddle/fluid/framework/unused_var_check.cc | 4 +- paddle/fluid/operators/data_norm_op.cc | 247 +++++++++++++++++- paddle/fluid/pybind/fleet_wrapper_py.cc | 2 + paddle/fluid/pybind/gloo_wrapper_py.cc | 9 + paddle/fluid/string/string_helper.cc | 13 + paddle/fluid/string/string_helper.h | 3 + .../fluid/incubate/fleet/base/role_maker.py | 102 ++++++-- .../fleet/parameter_server/pslib/__init__.py | 18 ++ .../fluid/incubate/fleet/utils/http_server.py | 187 +++++++++++++ python/paddle/fluid/layers/nn.py | 48 +++- .../tests/unittests/test_data_norm_op.py | 214 +++++++++++++-- .../tests/unittests/test_fleet_nocvm_1.py | 2 + .../tests/unittests/test_fleet_rolemaker_2.py | 2 +- .../tests/unittests/test_fleet_rolemaker_3.py | 84 ++++++ .../tests/unittests/test_fleet_rolemaker_4.py | 196 ++++++++++++++ python/paddle/fluid/trainer_desc.py | 2 + python/paddle/fluid/trainer_factory.py | 6 +- 22 files changed, 1366 insertions(+), 122 deletions(-) create mode 100644 python/paddle/fluid/incubate/fleet/utils/http_server.py create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_rolemaker_3.py create mode 100644 python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index 4d632d73761..135fc407943 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -1166,6 +1166,28 @@ int32_t FleetWrapper::CopyTable(const uint64_t src_table_id, #endif } +void FleetWrapper::Confirm() { +#ifdef PADDLE_WITH_PSLIB + // FIXME(xujiaqi01): will later support confirm + // auto ret = pslib_ptr_->_worker_ptr->confirm(); + // ret.wait(); + VLOG(0) << "disable FleetWrapper::Confirm temporarily"; +#else + VLOG(0) << "FleetWrapper::Confirm does nothing when no pslib"; +#endif +} + +void FleetWrapper::Revert() { +#ifdef PADDLE_WITH_PSLIB + // FIXME(xujiaqi01): will later support revert + // auto ret = pslib_ptr_->_worker_ptr->revert(); + // ret.wait(); + VLOG(0) << "disable FleetWrapper::Revert temporarily"; +#else + VLOG(0) << "FleetWrapper::Revert does nothing when no pslib"; +#endif +} + int32_t FleetWrapper::CopyTableByFeasign( const uint64_t src_table_id, const uint64_t dest_table_id, const std::vector& feasign_list) { diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index 933b0a8bd85..4c0564f87d4 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -268,6 +268,10 @@ class FleetWrapper { // send client to client message std::future SendClientToClientMsg(int msg_type, int to_client_id, const std::string& msg); + // confirm all the updated params in the current pass + void Confirm(); + // revert all the updated params in the current pass + void Revert(); // FleetWrapper singleton static std::shared_ptr GetInstance() { if (NULL == s_instance_) { diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.cc b/paddle/fluid/framework/fleet/gloo_wrapper.cc index c599432ff19..45512d6adc4 100644 --- a/paddle/fluid/framework/fleet/gloo_wrapper.cc +++ b/paddle/fluid/framework/fleet/gloo_wrapper.cc @@ -10,16 +10,18 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/framework/fleet/gloo_wrapper.h" +#include // NOLINT #include #include "paddle/fluid/framework/io/fs.h" #include "paddle/fluid/platform/errors.h" +#include "paddle/fluid/string/string_helper.h" namespace gloo { namespace rendezvous { HdfsStore::HdfsStore(const std::string& path) { path_ = path; - wait_sleep_ms_ = 3000; + wait_sleep_ms_ = 10000; wait_timeout_ = std::chrono::seconds(999999999); retry_times_ = 100; } @@ -35,49 +37,86 @@ void HdfsStore::set(const std::string& key, const std::vector& data) { } int err_no = 0; for (int i = 1; i <= retry_times_; ++i) { + err_no = 0; std::shared_ptr fp = paddle::framework::fs_open_write(tmp, &err_no, ""); - if (err_no != 0) { - VLOG(0) << "fs_open_write failed, retry times " << i << " err no " - << err_no; - fp.reset(); - sleep(wait_sleep_ms_ / 1000); - continue; - } size_t write_count = fwrite_unlocked(data.data(), 1, data.size(), fp.get()); if (write_count != data.size()) { VLOG(0) << "fwrite_unlocked failed, retry times " << i << " write_count " << write_count << " data.size() " << data.size(); - fp.reset(); - sleep(2); - continue; + err_no = -1; } fp.reset(); - break; + if (err_no != 0) { + VLOG(0) << "fs_open_write failed, retry times " << i << " err no " + << err_no; + sleep(wait_sleep_ms_ / 1000); + paddle::framework::fs_remove(tmp); + if (i == retry_times_) { + VLOG(0) << "fs_open_write failed, retry times reaches limit"; + PADDLE_THROW(platform::errors::PreconditionNotMet( + "fs_open_write failed, retry times reaches" + " limit ", + retry_times_)); + } + } else { + break; + } } paddle::framework::fs_mv(tmp, path); #endif } +#ifdef PADDLE_WITH_GLOO +int retry_do_func(std::function func, uint32_t max_try_time, + uint32_t retry_interval_ms) { + for (uint32_t i = 0; i < max_try_time; ++i) { + if (func() == 0) { + return 0; + } +#ifdef _LINUX + usleep(retry_interval_ms * 1000); +#endif + } + return -1; +} +#endif + std::vector HdfsStore::get(const std::string& key) { auto path = ObjectPath(key); std::vector result; #ifdef PADDLE_WITH_GLOO // block until key is set wait({key}); - bool is_exists = paddle::framework::fs_exists(path); + int ret = retry_do_func( + [&path]() { return paddle::framework::fs_exists(path) ? 0 : -1; }, 5, + wait_sleep_ms_); + bool is_exists = (ret == 0); PADDLE_ENFORCE_EQ(is_exists, true, paddle::platform::errors::NotFound( "HdfsStore::get, path not exists: " + path)); - int err_no = 0; - std::shared_ptr fp = paddle::framework::fs_open_read(path, &err_no, ""); - char buffer = '\0'; - size_t read_count = 0; - while (fread(&buffer, 1, 1, fp.get()) == 1) { - ++read_count; - result.push_back(buffer); - } - VLOG(3) << "HdfsStore::get read_count " << read_count; + + int read_status = retry_do_func( + [&path, &result]() { + result.clear(); + int err_no = 0; + { + std::shared_ptr fp = + paddle::framework::fs_open_read(path, &err_no, ""); + char buffer = '\0'; + size_t read_count = 0; + while (fread(&buffer, 1, 1, fp.get()) == 1) { + ++read_count; + result.push_back(buffer); + } + VLOG(3) << "HdfsStore::get read_count " << read_count; + } + return err_no; + }, + 5, wait_sleep_ms_); + PADDLE_ENFORCE_EQ(read_status, 0, + paddle::platform::errors::Fatal( + "HdfsStore::get, path read faied: " + path)); #endif return result; } @@ -92,22 +131,33 @@ void HdfsStore::wait(const std::vector& keys, const std::chrono::milliseconds&) { // NOLINT #ifdef PADDLE_WITH_GLOO auto start = std::chrono::steady_clock::now(); - while (!Check(keys)) { + std::vector check_key_status(keys.size(), false); + while (!Check(keys, &check_key_status)) { auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start); if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) { - PADDLE_ENFORCE_EQ(0, 1, paddle::platform::errors::ExecutionTimeout( - "HdfsStore::wait, Wait timeout for key(s): " + - ::gloo::MakeString(keys))); + int32_t last_check_rank = -1; + for (size_t i = 0; i < check_key_status.size(); ++i) { + if (!check_key_status[i]) { + last_check_rank = i; + break; + } + } + PADDLE_THROW(platform::errors::ExecutionTimeout( + "TIMEOUT self_rank = %d pair_rank = %d", self_rank_, + last_check_rank)); } std::this_thread::sleep_for(std::chrono::milliseconds(wait_sleep_ms_)); } #endif } +void HdfsStore::SetTimeoutSeconds(int timeout_seconds) { + wait_timeout_ = std::chrono::seconds(timeout_seconds); +} + std::string HdfsStore::EncodeName(const std::string& name) { - thread_local std::hash hash_func; - return std::to_string(hash_func(name)); + return ::paddle::string::erase_spaces(name); } std::string HdfsStore::TmpPath(const std::string& name) { @@ -118,50 +168,124 @@ std::string HdfsStore::ObjectPath(const std::string& name) { return path_ + "/" + EncodeName(name); } -bool HdfsStore::Check(const std::vector& keys) { +bool HdfsStore::Check(const std::vector& keys, + std::vector* keys_check_status) { #ifdef PADDLE_WITH_GLOO + bool ret = true; std::vector paths; for (const auto& key : keys) { paths.push_back(ObjectPath(key)); } - for (const auto& path : paths) { + for (size_t i = 0; i < paths.size(); ++i) { + if ((*keys_check_status)[i]) { + continue; + } + const auto& path = paths[i]; bool is_exists = paddle::framework::fs_exists(path); VLOG(3) << "HdfsStore::Check " << is_exists << " path " << path; if (!is_exists) { - return false; + ret = false; } + (*keys_check_status)[i] = is_exists; } + return ret; +#else + VLOG(0) << "HdfsStore::Check does nothing when no gloo"; #endif return true; } +#ifdef PADDLE_WITH_GLOO +void ParallelConnectContext::connectFullMesh( + Store& store, std::shared_ptr& dev) { + std::vector allBytes; + // Create pairs + auto transportContext = dev->createContext(rank, size); + transportContext->setTimeout(getTimeout()); + for (int i = 0; i < size; i++) { + if (i == rank) { + continue; + } + auto& pair = transportContext->createPair(i); + auto addrBytes = pair->address().bytes(); + allBytes.insert(allBytes.end(), addrBytes.begin(), addrBytes.end()); + } + std::ostringstream storeKey; + storeKey << rank; + store.set(storeKey.str(), allBytes); + + std::vector> connect_threads(thread_num_); + // Connect every pair + for (uint32_t i = 0; i < connect_threads.size(); ++i) { + connect_threads[i].reset(new std::thread( + [&store, &transportContext, this](size_t thread_idx, + size_t thread_num) -> void { + for (int i = thread_idx; i < size; i += thread_num) { + if (i == rank) { + continue; + } + // Wait for address of other side of this pair to become available + std::string key = std::to_string(i); + store.wait({key}, getTimeout()); + // Connect to other side of this pair + auto allAddrs = store.get(key); + auto addr = extractAddress(allAddrs, i); + transportContext->getPair(i)->connect(addr); + } + }, + i, connect_threads.size())); + } + for (uint32_t i = 0; i < connect_threads.size(); ++i) { + connect_threads[i]->join(); + } + device_ = dev; + transportContext_ = std::move(transportContext); +} +#endif } // namespace rendezvous } // namespace gloo namespace paddle { namespace framework { -void GlooWrapper::Init(int rank, int size, const std::string& path, - const std::string& fs_name, const std::string& fs_ugi, - const std::string& iface, const std::string& prefix) { +void GlooWrapper::Init() { if (is_initialized_) { return; } - rank_ = rank; - size_ = size; - std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs"); - cmd += " -D fs.default.name=" + fs_name; - cmd += " -D hadoop.job.ugi=" + fs_ugi; - paddle::framework::hdfs_set_command(cmd); #ifdef PADDLE_WITH_GLOO gloo::transport::tcp::attr attr; - attr.iface = iface; - auto file_store = gloo::rendezvous::HdfsStore(path); - auto prefix_store = gloo::rendezvous::PrefixStore(prefix, file_store); + attr.iface = iface_; + std::shared_ptr file_store = nullptr; + std::shared_ptr http_store = nullptr; + auto context = + std::make_shared(rank_, size_); + context->setTimeout(run_timeout_); auto dev = gloo::transport::tcp::CreateDevice(attr); - auto context = std::make_shared(rank, size); - context->setTimeout(file_store.wait_timeout_); - context->connectFullMesh(prefix_store, dev); + switch (store_type_) { + case GlooStoreType::HDFS: { + std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs"); + cmd += " -D fs.default.name=" + hdfs_name_; + cmd += " -D hadoop.job.ugi=" + hdfs_ugi_; + paddle::framework::hdfs_set_command(cmd); + file_store = std::make_shared(hdfs_path_); + file_store->SetTimeoutSeconds(init_timeout_.count()); + auto prefix_store = + std::make_shared(prefix_, *file_store); + context->connectFullMesh(*prefix_store, dev); + break; + } + case GlooStoreType::HTTP: { + http_store = std::make_shared( + http_ip_, http_port_, prefix_ + "_" + http_scope_, rank_); + http_store->SetTimeoutSeconds(init_timeout_.count()); + context->connectFullMesh(*http_store, dev); + http_store->Finalize(); + break; + } + default: + LOG(ERROR) << "unknown store type " << store_type_; + exit(-1); + } context_ = std::move(context); #endif is_initialized_ = true; @@ -170,6 +294,9 @@ void GlooWrapper::Init(int rank, int size, const std::string& path, template std::vector GlooWrapper::AllReduce( std::vector& sendbuf, // NOLINT const std::string& mode); +template std::vector GlooWrapper::AllReduce( + std::vector& sendbuf, // NOLINT + const std::string& mode); template std::vector GlooWrapper::AllReduce( std::vector& sendbuf, // NOLINT const std::string& mode); @@ -180,6 +307,8 @@ template std::vector GlooWrapper::AllGather( int64_t& input); // NOLINT template std::vector GlooWrapper::AllGather( uint64_t& input); // NOLINT +template std::vector GlooWrapper::AllGather( + float& input); // NOLINT template std::vector GlooWrapper::AllGather( double& input); // NOLINT diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.h b/paddle/fluid/framework/fleet/gloo_wrapper.h index 528c91be6d4..3f932ee226c 100644 --- a/paddle/fluid/framework/fleet/gloo_wrapper.h +++ b/paddle/fluid/framework/fleet/gloo_wrapper.h @@ -31,6 +31,7 @@ limitations under the License. */ #include #include #include +#include #include #include #include @@ -59,44 +60,87 @@ class HdfsStore { virtual void wait(const std::vector& keys, const std::chrono::milliseconds& timeout); + virtual void SetTimeoutSeconds(int timeout_seconds); + std::string EncodeName(const std::string& name); std::string TmpPath(const std::string& name); std::string ObjectPath(const std::string& name); - bool Check(const std::vector& keys); + bool Check(const std::vector& keys, + std::vector* keys_check_status); + + void SetRank(int rank) { self_rank_ = rank; } std::string path_; int wait_sleep_ms_; std::chrono::seconds wait_timeout_; int retry_times_; + int self_rank_; }; +#ifdef PADDLE_WITH_GLOO +class ParallelConnectContext : public gloo::rendezvous::Context { + public: + ParallelConnectContext(int rank, int size, int base = 2) + : gloo::rendezvous::Context(rank, size, base) {} + virtual ~ParallelConnectContext() {} + // in gloo::rendezvous::Context wait&get one by one, + // slowly in case big size, especialy in HdfsStore + void connectFullMesh(Store& store, // NOLINT + std::shared_ptr& dev); // NOLINT + + protected: + int thread_num_ = 6; +}; +#endif } // namespace rendezvous } // namespace gloo namespace paddle { namespace framework { +enum GlooStoreType { HDFS, HTTP }; + class GlooWrapper { public: GlooWrapper() {} virtual ~GlooWrapper() {} - void Init(int rank, int size, const std::string& path, - const std::string& fs_name, const std::string& fs_ugi, - const std::string& iface, const std::string& prefix); + void Init(); - int Rank() { - CHECK_EQ(is_initialized_, true); - return rank_; + void SetTimeoutSeconds(int init_seconds, int run_seconds) { + init_timeout_ = std::chrono::seconds(init_seconds); + run_timeout_ = std::chrono::seconds(run_seconds); } - int Size() { - CHECK_EQ(is_initialized_, true); - return size_; + int Rank() { return rank_; } + + int Size() { return size_; } + + void SetRank(int rank) { rank_ = rank; } + + void SetSize(int size) { size_ = size; } + + void SetIface(const std::string& iface) { iface_ = iface; } + + void SetPrefix(const std::string& prefix) { prefix_ = prefix; } + + void SetHdfsStore(const std::string& path, const std::string& fs_name, + const std::string& fs_ugi) { + store_type_ = GlooStoreType::HDFS; + hdfs_path_ = path; + hdfs_name_ = fs_name; + hdfs_ugi_ = fs_ugi; + } + + void SetHttpStore(const std::string& ip, int port, const std::string& scope) { + store_type_ = GlooStoreType::HTTP; + http_ip_ = ip; + http_port_ = port; + http_scope_ = scope; } void Barrier() { @@ -104,6 +148,8 @@ class GlooWrapper { #ifdef PADDLE_WITH_GLOO gloo::BarrierOptions opts(context_); gloo::barrier(opts); +#else + LOG(WARNING) << "Barrier does nothing when WITH_GLOO=OFF"; #endif } @@ -134,6 +180,8 @@ class GlooWrapper { "AllReduce mode not known: " + mode)); } gloo::allreduce(opts); +#else + LOG(WARNING) << "AllReduce does nothing when WITH_GLOO=OFF"; #endif return recvbuf; } @@ -147,6 +195,8 @@ class GlooWrapper { opts.setInput(&input, 1); opts.setOutput(ret.data(), size_); gloo::allgather(opts); +#else + LOG(WARNING) << "AllGather does nothing when WITH_GLOO=OFF"; #endif return std::move(ret); } @@ -158,6 +208,19 @@ class GlooWrapper { #endif int rank_ = 0; int size_ = 0; + std::chrono::seconds init_timeout_ = std::chrono::seconds(9999999); + std::chrono::seconds run_timeout_ = std::chrono::seconds(9999999); + std::string iface_ = "lo"; + std::string prefix_; + GlooStoreType store_type_ = GlooStoreType::HDFS; + // configs for hdfs store + std::string hdfs_path_; + std::string hdfs_name_; + std::string hdfs_ugi_; + std::string http_ip_; + // configs for http store + int http_port_; + std::string http_scope_; }; } // namespace framework diff --git a/paddle/fluid/framework/fleet/test_fleet.cc b/paddle/fluid/framework/fleet/test_fleet.cc index bf9928789ca..dac95dd268e 100644 --- a/paddle/fluid/framework/fleet/test_fleet.cc +++ b/paddle/fluid/framework/fleet/test_fleet.cc @@ -17,6 +17,7 @@ #include "paddle/fluid/framework/fleet/fleet_wrapper.h" #include "paddle/fluid/framework/fleet/gloo_wrapper.h" #include "paddle/fluid/framework/io/fs.h" +#include "paddle/fluid/string/string_helper.h" #if defined _WIN32 || defined __APPLE__ #else @@ -37,14 +38,23 @@ TEST(TEST_GLOO, store_1) { } store.wait(std::vector{"test"}); store.wait(std::vector{"test"}, std::chrono::milliseconds(0)); + store.SetTimeoutSeconds(100000); store.EncodeName("1"); store.TmpPath("1"); store.ObjectPath("1"); - store.Check(std::vector{"test"}); + std::vector status(1, false); + store.Check(std::vector{"test"}, &status); auto gw = paddle::framework::GlooWrapper(); - gw.Init(0, 1, "", "", "", "", ""); - gw.Init(0, 1, "", "", "", "", ""); + gw.SetTimeoutSeconds(1000, 1000); + gw.SetRank(0); + gw.SetSize(1); + gw.SetPrefix(""); + gw.SetIface("lo"); + gw.SetHdfsStore("", "", ""); + gw.Init(); + gw.SetHttpStore("", 8099, ""); + gw.Init(); gw.Rank(); gw.Size(); gw.Barrier(); @@ -63,5 +73,8 @@ TEST(TEST_FLEET, fleet_1) { fleet->RunServer("", 0); fleet->SaveModelOneTable(0, "", 0); fleet->SaveModelOneTablePrefix(0, "", 0, ""); + fleet->Confirm(); + fleet->Revert(); + paddle::string::erase_spaces("1 2"); #endif } diff --git a/paddle/fluid/framework/unused_var_check.cc b/paddle/fluid/framework/unused_var_check.cc index 5eb80113852..7a81bc15b89 100644 --- a/paddle/fluid/framework/unused_var_check.cc +++ b/paddle/fluid/framework/unused_var_check.cc @@ -53,7 +53,9 @@ const std::unordered_set op_has_unsed_vars_white_list = { "precision_recall", // 1 "fusion_seqpool_cvm_concat", // 2 "fused_batch_norm_act", // 2 - "fused_batch_norm_act_grad" // 2 + "fused_batch_norm_act_grad", // 2 + "data_norm", // 0 + "data_norm_grad", // 0 }; namespace paddle { diff --git a/paddle/fluid/operators/data_norm_op.cc b/paddle/fluid/operators/data_norm_op.cc index 88438c9e979..394feba78ed 100644 --- a/paddle/fluid/operators/data_norm_op.cc +++ b/paddle/fluid/operators/data_norm_op.cc @@ -51,6 +51,17 @@ class DataNormOp : public framework::OperatorWithKernel { PADDLE_ENFORCE(ctx->HasOutput("Means"), ""); PADDLE_ENFORCE(ctx->HasOutput("Scales"), ""); PADDLE_ENFORCE(ctx->HasOutput("Y"), ""); + bool enable_scale_and_shift = + ctx->Attrs().Get("enable_scale_and_shift"); + if (enable_scale_and_shift) { + PADDLE_ENFORCE_EQ( + ctx->HasInput("scale_w"), true, + platform::errors::InvalidArgument( + "Input(scale_w) of DataNormOp should not be null.")); + PADDLE_ENFORCE_EQ(ctx->HasInput("bias"), true, + platform::errors::InvalidArgument( + "Input(bias) of DataNormOp should not be null.")); + } const auto x_dims = ctx->GetInputDim("X"); const DataLayout data_layout = framework::StringToDataLayout( @@ -72,6 +83,45 @@ class DataNormOp : public framework::OperatorWithKernel { PADDLE_ENFORCE_EQ(ctx->GetInputDim("BatchSquareSum")[0], C); } + if (enable_scale_and_shift) { + auto scale_dim = ctx->GetInputDim("scale_w"); + auto bias_dim = ctx->GetInputDim("bias"); + + PADDLE_ENFORCE_EQ( + scale_dim.size(), 1UL, + platform::errors::InvalidArgument("the dimensionof scale" + "must equal to 1. But received: " + "the shape of scale is [%s], " + "the dimensionof scale is [%d]", + scale_dim, scale_dim.size())); + PADDLE_ENFORCE_EQ( + bias_dim.size(), 1UL, + platform::errors::InvalidArgument("the dimension of bias" + "must equal to 1. But received: " + "the shape of bias is [%s]," + "the dimension of bias is [%d]", + bias_dim, bias_dim.size())); + + bool check = true; + if ((!ctx->IsRuntime()) && (framework::product(scale_dim) <= 0 || + framework::product(bias_dim) <= 0)) { + check = false; + } + + if (check) { + PADDLE_ENFORCE_EQ(scale_dim[0], C, + platform::errors::InvalidArgument( + "the shape of scale must equal to [%d]" + "But received: the shape of scale is [%d]", + C, scale_dim[0])); + PADDLE_ENFORCE_EQ(bias_dim[0], C, + platform::errors::InvalidArgument( + "the shape of bias must equal to [%d]" + "But received: the shape of bias is [%d]", + C, bias_dim[0])); + } + } + ctx->SetOutputDim("Y", x_dims); ctx->SetOutputDim("Means", {C}); ctx->SetOutputDim("Scales", {C}); @@ -99,6 +149,17 @@ class DataNormOp : public framework::OperatorWithKernel { ctx, "BatchSquareSum"), "BatchSquareSum input should be of float type"); + bool enable_scale_and_shift = ctx.Attr("enable_scale_and_shift"); + if (enable_scale_and_shift) { + PADDLE_ENFORCE_EQ(dn_param_type, + OperatorWithKernel::IndicateVarDataType(ctx, "scale_w"), + platform::errors::InvalidArgument( + "scale_w input should be of float type")); + PADDLE_ENFORCE_EQ(dn_param_type, + OperatorWithKernel::IndicateVarDataType(ctx, "bias"), + platform::errors::InvalidArgument( + "bias input should be of float type")); + } // TODO(pzelazko-intel): enable MKLDNN layout when it's ready framework::LibraryType library = framework::LibraryType::kPlain; framework::DataLayout layout = framework::DataLayout::kAnyLayout; @@ -133,6 +194,19 @@ class DataNormOpMaker : public framework::OpProtoAndCheckerMaker { "summary_decay_rate", "(float, default 0.9999999) The decay rate when update the summary") .SetDefault(0.9999999); + AddAttr( + "enable_scale_and_shift", + "(bool, default false) Set to true to enable scale and shift such as " + "batch_norm op") + .SetDefault(false); + AddInput("scale_w", + "scale_w is a 1-dimensional tensor of size C " + "that is applied to the output") + .AsDispensable(); + AddInput("bias", + "bias is a 1-dimensional tensor of size C " + "that is applied to the output") + .AsDispensable(); AddAttr("data_layout", "").SetDefault("NCHW"); AddAttr("sync_stats", "(bool, default false) only used in multi-GPU") .SetDefault(false); @@ -194,7 +268,6 @@ class DataNormKernel // alloc memory T *y_data = y->mutable_data(ctx.GetPlace()); - Eigen::Array inv_std(C); ConstEigenVectorArrayMap b_size_arr( ctx.Input("BatchSize")->data(), C); ConstEigenVectorArrayMap b_sum_arr( @@ -210,6 +283,7 @@ class DataNormKernel const T *means_data = mean_out->data(); const T *x_data = x->data(); + const T *scales_data = scales->data(); const int slot_dim = ctx.Attr("slot_dim"); T min_precision = 1e-7f; @@ -218,7 +292,8 @@ class DataNormKernel case DataLayout::kNHWC: { // if slot_dim is set and batch size is larger than zero, we choose // to check if show number is zero, if so, skip normalization. - if (slot_dim > 0 && N > 0) { + if (slot_dim > 0 && N > 0 && + (!ctx.Attr("enable_scale_and_shift"))) { const int item_size = x->numel() / N; // location of show number in one embedding int offset = 0; @@ -239,10 +314,56 @@ class DataNormKernel offset += item_size; } } else { - EigenArrayMap(y_data, C, N) = - (ConstEigenArrayMap(x->data(), C, N).colwise() - means_arr) - .colwise() * - scales_arr; + if (!ctx.Attr("enable_scale_and_shift") && slot_dim <= 0) { + EigenArrayMap(y_data, C, N) = + (ConstEigenArrayMap(x->data(), C, N).colwise() - + means_arr) + .colwise() * + scales_arr; + } else if (ctx.Attr("enable_scale_and_shift") && + slot_dim <= 0) { + const auto *scale_w = ctx.Input("scale_w"); + const auto *bias = ctx.Input("bias"); + ConstEigenVectorArrayMap scale_w_arr(scale_w->data(), C); + ConstEigenVectorArrayMap bias_arr(bias->data(), C); + + Eigen::Array new_scale = + scales_arr * scale_w_arr; + Eigen::Array new_bias = + bias_arr - means_arr * scales_arr * scale_w_arr; + EigenArrayMap(y_data, C, N) = + (ConstEigenArrayMap(x->data(), C, N).colwise() * + new_scale) + .colwise() + + new_bias; + + } else { + const int item_size = x->numel() / N; + const auto *scale_w = ctx.Input("scale_w"); + const auto *bias = ctx.Input("bias"); + const T *scale_w_data = scale_w->data(); + const T *bias_data = bias->data(); + // location of show number in one embedding + int offset = 0; + for (int k = 0; k < N; ++k) { + for (int i = 0; i < item_size; i += slot_dim) { + if (x_data[offset + i] > -min_precision && + x_data[offset + i] < min_precision) { + // show = 0 + memset(y_data + offset + i, 0, sizeof(T) * slot_dim); + } else { + for (int j = i; j < i + slot_dim; ++j) { + y_data[offset + j] = ((x_data[offset + j] - means_data[j]) * + scales_data[j]) * + scale_w_data[j] + + bias_data[j]; + } + } + } // end for i + + offset += item_size; + } // end for k + } } break; } @@ -274,7 +395,8 @@ class DataNormGradOp : public framework::OperatorWithKernel { "Output(BatchSquareSum) of DataNormGradOp should not be null.")); PADDLE_ENFORCE(ctx->HasInput("Means"), ""); PADDLE_ENFORCE(ctx->HasInput("Scales"), ""); - + bool enable_scale_and_shift = + ctx->Attrs().Get("enable_scale_and_shift"); // check output PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("BatchSize")), ""); PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("BatchSum")), ""); @@ -294,6 +416,22 @@ class DataNormGradOp : public framework::OperatorWithKernel { ctx->SetOutputDim(framework::GradVarName("BatchSize"), {C}); ctx->SetOutputDim(framework::GradVarName("BatchSum"), {C}); ctx->SetOutputDim(framework::GradVarName("BatchSquareSum"), {C}); + if (enable_scale_and_shift) { + const bool has_scale_grad = + ctx->HasOutput(framework::GradVarName("scale_w")); + const bool has_bias_grad = ctx->HasOutput(framework::GradVarName("bias")); + + PADDLE_ENFORCE_EQ((has_scale_grad == has_bias_grad), true, + platform::errors::InvalidArgument( + "Output(Scale@GRAD) and Output(Bias@GRAD)" + "must be null or not be null at same time. " + "But now, has Scale@Grad=[%d], has Bias@GRAD=[%d]", + has_scale_grad, has_bias_grad)); + if (has_scale_grad) { + ctx->SetOutputDim(framework::GradVarName("scale_w"), {C}); + ctx->SetOutputDim(framework::GradVarName("bias"), {C}); + } + } } protected: @@ -353,18 +491,23 @@ class DataNormGradKernel const int C = (data_layout == DataLayout::kNCHW ? x_dims[1] : x_dims[x_dims.size() - 1]); - // init output Tensor *d_x = nullptr; if (ctx.HasOutput(framework::GradVarName("X"))) { d_x = ctx.Output(framework::GradVarName("X")); } + auto *d_batch_size = ctx.Output(framework::GradVarName("BatchSize")); auto *d_batch_sum = ctx.Output(framework::GradVarName("BatchSum")); auto *d_batch_square_sum = ctx.Output(framework::GradVarName("BatchSquareSum")); + const T *mean_data = means->data(); + const T *inv_var_data = scales->data(); + ConstEigenVectorArrayMap mean_arr(mean_data, C); + ConstEigenVectorArrayMap inv_var_arr(inv_var_data, C); + T *d_batch_size_data = d_batch_size->mutable_data(ctx.GetPlace()); T *d_batch_sum_data = d_batch_sum->mutable_data(ctx.GetPlace()); T *d_batch_square_sum_data = @@ -372,7 +515,6 @@ class DataNormGradKernel EigenVectorArrayMap d_batch_size_arr(d_batch_size_data, C); EigenVectorArrayMap d_batch_sum_arr(d_batch_sum_data, C); EigenVectorArrayMap d_batch_square_sum_arr(d_batch_square_sum_data, C); - d_batch_size_arr.setZero(); d_batch_sum_arr.setZero(); d_batch_square_sum_arr.setZero(); @@ -392,8 +534,86 @@ class DataNormGradKernel if (d_x != nullptr) { EigenArrayMap d_x_arr(d_x->mutable_data(ctx.GetPlace()), C, N); d_x_arr.setZero(); - for (int nc = 0; nc < N; ++nc) { - d_x_arr.col(nc) = d_y_arr.col(nc) * scales_arr; + if (!ctx.Attr("enable_scale_and_shift")) { + for (int nc = 0; nc < N; ++nc) { + d_x_arr.col(nc) = d_y_arr.col(nc) * scales_arr; + } + } else { + const auto *scale_w = ctx.Input("scale_w"); + auto *d_scale = + ctx.Output(framework::GradVarName("scale_w")); + auto *d_bias = ctx.Output(framework::GradVarName("bias")); + ConstEigenVectorArrayMap scale_arr(scale_w->data(), C); + T *d_bias_data = nullptr; + T *d_scale_data = nullptr; + + d_scale->mutable_data(ctx.GetPlace()); + d_bias->mutable_data(ctx.GetPlace()); + d_bias_data = d_bias->mutable_data(ctx.GetPlace()); + d_scale_data = d_scale->mutable_data(ctx.GetPlace()); + + EigenVectorArrayMap d_bias_arr(d_bias_data, C); + EigenVectorArrayMap d_scale_arr(d_scale_data, C); + Tensor dy_sum; + dy_sum.Resize({C}); + dy_sum.mutable_data(ctx.GetPlace()); + EigenVectorArrayMap dy_sum_arr( + dy_sum.mutable_data(ctx.GetPlace()), C); + Tensor dy_mul_x_sub_mean_mul_invstd_sum; + dy_mul_x_sub_mean_mul_invstd_sum.Resize({C}); + dy_mul_x_sub_mean_mul_invstd_sum.mutable_data(ctx.GetPlace()); + EigenVectorArrayMap dy_mul_x_sub_mean_mul_invstd_sum_arr( + dy_mul_x_sub_mean_mul_invstd_sum.mutable_data( + ctx.GetPlace()), + C); + + dy_sum_arr.setZero(); + dy_mul_x_sub_mean_mul_invstd_sum_arr.setZero(); + + if (slot_dim <= 0) { + for (int n = 0; n < N; ++n) { + dy_sum_arr += d_y_arr.col(n); + dy_mul_x_sub_mean_mul_invstd_sum_arr += + ((x_arr.col(n) - mean_arr) * inv_var_arr * d_y_arr.col(n)); + } + if (d_scale && d_bias) { + d_bias_arr = dy_sum_arr; + d_scale_arr = dy_mul_x_sub_mean_mul_invstd_sum_arr; + } + for (int nc = 0; nc < N; ++nc) { + d_x_arr.col(nc) = d_y_arr.col(nc) * scales_arr * scale_arr; + } + } else { + int offset = 0; + const int item_size = x->numel() / N; + T *d_x_data = d_x->mutable_data(ctx.GetPlace()); + T *d_scale_data = d_scale->mutable_data(ctx.GetPlace()); + T *d_bias_data = d_bias->mutable_data(ctx.GetPlace()); + const T *dy_data = d_y->data(); + const T *scales_data = scales->data(); + const T *scale_w_data = scale_w->data(); + const T *x_data = x->data(); + for (int i = 0; i < item_size; i++) { + d_bias_data[i] = 0; + d_scale_data[i] = 0; + } + for (int k = 0; k < N; ++k) { + for (int i = 0; i < item_size; i += slot_dim) { + if (!(x_data[offset + i] > -min_precision && + x_data[offset + i] < min_precision)) { + // show != 0 + for (int j = i; j < i + slot_dim; ++j) { + d_x_data[offset + j] = dy_data[offset + j] * + scales_data[j] * scale_w_data[j]; + d_bias_data[j] += dy_data[offset + j]; + d_scale_data[j] += (x_data[offset + j] - mean_data[j]) * + inv_var_data[j] * dy_data[offset + j]; + } + } + } + offset += item_size; + } + } } } @@ -466,6 +686,8 @@ class DataNormGradMaker : public framework::SingleGradOpMaker { op->SetInput("X", this->Input("X")); op->SetInput(framework::GradVarName("Y"), this->OutputGrad("Y")); + op->SetInput("scale_w", this->Input("scale_w")); + op->SetInput("bias", this->Input("bias")); op->SetOutput("BatchSize", this->Input("BatchSize")); op->SetOutput("BatchSum", this->Input("BatchSum")); op->SetOutput("BatchSquareSum", this->Input("BatchSquareSum")); @@ -481,6 +703,9 @@ class DataNormGradMaker : public framework::SingleGradOpMaker { this->InputGrad("BatchSum")); op->SetOutput(framework::GradVarName("BatchSquareSum"), this->InputGrad("BatchSquareSum")); + op->SetOutput(framework::GradVarName("scale_w"), + this->InputGrad("scale_w")); + op->SetOutput(framework::GradVarName("bias"), this->InputGrad("bias")); } }; diff --git a/paddle/fluid/pybind/fleet_wrapper_py.cc b/paddle/fluid/pybind/fleet_wrapper_py.cc index 3ae4eef4491..4b72b09addd 100644 --- a/paddle/fluid/pybind/fleet_wrapper_py.cc +++ b/paddle/fluid/pybind/fleet_wrapper_py.cc @@ -78,6 +78,8 @@ void BindFleetWrapper(py::module* m) { &framework::FleetWrapper::SetClient2ClientConfig) .def("set_pull_local_thread_num", &framework::FleetWrapper::SetPullLocalThreadNum) + .def("confirm", &framework::FleetWrapper::Confirm) + .def("revert", &framework::FleetWrapper::Revert) .def("save_model_one_table", &framework::FleetWrapper::SaveModelOneTable) .def("save_model_one_table_with_prefix", &framework::FleetWrapper::SaveModelOneTablePrefix) diff --git a/paddle/fluid/pybind/gloo_wrapper_py.cc b/paddle/fluid/pybind/gloo_wrapper_py.cc index 80260ae2f29..e570333d091 100644 --- a/paddle/fluid/pybind/gloo_wrapper_py.cc +++ b/paddle/fluid/pybind/gloo_wrapper_py.cc @@ -37,11 +37,20 @@ void BindGlooWrapper(py::module* m) { .def("rank", &framework::GlooWrapper::Rank) .def("size", &framework::GlooWrapper::Size) .def("barrier", &framework::GlooWrapper::Barrier) + .def("set_timeout_seconds", &framework::GlooWrapper::SetTimeoutSeconds) + .def("set_rank", &framework::GlooWrapper::SetRank) + .def("set_size", &framework::GlooWrapper::SetSize) + .def("set_iface", &framework::GlooWrapper::SetIface) + .def("set_prefix", &framework::GlooWrapper::SetPrefix) + .def("set_hdfs_store", &framework::GlooWrapper::SetHdfsStore) + .def("set_http_store", &framework::GlooWrapper::SetHttpStore) .def("all_reduce", &framework::GlooWrapper::AllReduce) .def("all_reduce", &framework::GlooWrapper::AllReduce) + .def("all_reduce", &framework::GlooWrapper::AllReduce) .def("all_reduce", &framework::GlooWrapper::AllReduce) .def("all_gather", &framework::GlooWrapper::AllGather) .def("all_gather", &framework::GlooWrapper::AllGather) + .def("all_gather", &framework::GlooWrapper::AllGather) .def("all_gather", &framework::GlooWrapper::AllGather); } // end BindGlooWrapper } // end namespace pybind diff --git a/paddle/fluid/string/string_helper.cc b/paddle/fluid/string/string_helper.cc index 27708b8eebd..712db90d2f4 100644 --- a/paddle/fluid/string/string_helper.cc +++ b/paddle/fluid/string/string_helper.cc @@ -61,6 +61,19 @@ std::string trim_spaces(const std::string& str) { return std::string(p, len); } +std::string erase_spaces(const std::string& str) { + std::string result; + result.reserve(str.size()); + const char* p = str.c_str(); + while (*p != 0) { + if (!isspace(*p)) { + result.append(p, 1); + } + ++p; + } + return result; +} + inline int str_to_float(const char* str, float* v) { const char* head = str; char* cursor = NULL; diff --git a/paddle/fluid/string/string_helper.h b/paddle/fluid/string/string_helper.h index cc09088c7ee..8bf379a6b34 100644 --- a/paddle/fluid/string/string_helper.h +++ b/paddle/fluid/string/string_helper.h @@ -62,6 +62,9 @@ std::string format_string(const std::string& fmt, ARGS&&... args) { // remove leading and tailing spaces std::string trim_spaces(const std::string& str); +// erase all spaces in str +std::string erase_spaces(const std::string& str); + int str_to_float(const char* str, float* v); // split string by delim diff --git a/python/paddle/fluid/incubate/fleet/base/role_maker.py b/python/paddle/fluid/incubate/fleet/base/role_maker.py index bada19abcc3..c8005dce44f 100644 --- a/python/paddle/fluid/incubate/fleet/base/role_maker.py +++ b/python/paddle/fluid/incubate/fleet/base/role_maker.py @@ -14,6 +14,7 @@ """Defination of Role Makers.""" from __future__ import print_function +from multiprocessing import Process, Manager import paddle.fluid as fluid import os import time @@ -556,7 +557,21 @@ class GeneralRoleMaker(RoleMakerBase): self._role_is_generated = False self._hdfs_name = kwargs.get("hdfs_name", "") self._hdfs_ugi = kwargs.get("hdfs_ugi", "") - self._hdfs_path = kwargs.get("path", "") + self._hdfs_path = kwargs.get("path", "").rstrip("/") + self._init_timeout_seconds = kwargs.get("init_timeout_seconds", 3600) + self._run_timeout_seconds = kwargs.get("run_timeout_seconds", 9999999) + ip_port = kwargs.get("http_ip_port", "") + self._http_ip_port = [] + self._http_server = None + # if ip_port is not empty, it will use http instead of hdfs + if ip_port != "": + self._http_ip_port = ip_port.split(":") + # it's for communication between processes + self._manager = Manager() + # global dict to store status + self._http_server_d = self._manager.dict() + # set running status of http server + self._http_server_d["running"] = False self._iface = self.__get_default_iface() # this environment variable can be empty self._prefix = os.getenv("SYS_JOB_ID", "") @@ -572,17 +587,41 @@ class GeneralRoleMaker(RoleMakerBase): trainers_num = len(worker_endpoints) if training_role not in ["TRAINER", "PSERVER"]: raise ValueError("TRAINING_ROLE must be PSERVER or TRAINER") + if training_role == "TRAINER": role = Role.WORKER current_id = int(os.environ["PADDLE_TRAINER_ID"]) + if current_id == 0 and len(self._http_ip_port) != 0: + size_d = { + "trainer": len(worker_endpoints), + "pserver": len(eplist), + "all": len(worker_endpoints) + len(eplist) + } + # child process for http server + self._http_server = Process( + target=self.__start_kv_server, + args=(self._http_server_d, size_d)) + self._http_server.daemon = True + # set running status to True + self._http_server_d["running"] = True + # start child process + self._http_server.start() self._node_type = 1 self._cur_endpoint = worker_endpoints[current_id] gloo = fluid.core.Gloo() - gloo.init(current_id, - len(worker_endpoints), - self._hdfs_path.rstrip("/") + "/trainer", - self._hdfs_name, self._hdfs_ugi, self._iface, - self._prefix) + gloo.set_rank(current_id) + gloo.set_size(len(worker_endpoints)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + if len(self._http_ip_port) != 0: + gloo.set_http_store(self._http_ip_port[0], + int(self._http_ip_port[1]), "trainer") + else: + gloo.set_hdfs_store(self._hdfs_path + "/trainer", + self._hdfs_name, self._hdfs_ugi) + gloo.init() self._node_type_comm = gloo elif training_role == "PSERVER": role = Role.SERVER @@ -598,20 +637,36 @@ class GeneralRoleMaker(RoleMakerBase): self._node_type = 0 self._cur_endpoint = cur_endpoint gloo = fluid.core.Gloo() - gloo.init(current_id, - len(eplist), - self._hdfs_path.rstrip("/") + "/pserver", - self._hdfs_name, self._hdfs_ugi, self._iface, - self._prefix) + gloo.set_rank(current_id) + gloo.set_size(len(eplist)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + if len(self._http_ip_port) != 0: + gloo.set_http_store(self._http_ip_port[0], + int(self._http_ip_port[1]), "pserver") + else: + gloo.set_hdfs_store(self._hdfs_path + "/pserver", + self._hdfs_name, self._hdfs_ugi) + gloo.init() self._node_type_comm = gloo gloo = fluid.core.Gloo() all_list = worker_endpoints + eplist - gloo.init( - all_list.index(self._cur_endpoint), - len(all_list), - self._hdfs_path.rstrip("/") + "/all", self._hdfs_name, - self._hdfs_ugi, self._iface, self._prefix) + gloo.set_rank(all_list.index(self._cur_endpoint)) + gloo.set_size(len(all_list)) + gloo.set_prefix(self._prefix) + gloo.set_iface(self._iface) + gloo.set_timeout_seconds(self._init_timeout_seconds, + self._run_timeout_seconds) + if len(self._http_ip_port) != 0: + gloo.set_http_store(self._http_ip_port[0], + int(self._http_ip_port[1]), "all") + else: + gloo.set_hdfs_store(self._hdfs_path + "/all", self._hdfs_name, + self._hdfs_ugi) + gloo.init() self._all_comm = gloo self._trainers_num = trainers_num self._server_endpoints = eplist @@ -620,6 +675,11 @@ class GeneralRoleMaker(RoleMakerBase): self._rank = all_list.index(self._cur_endpoint) self._size = len(all_list) self._worker_endpoints = worker_endpoints + if self._http_server is not None: + # set running status to False + self._http_server_d["running"] = False + # wait until child process exits + self._http_server.join() self._role_is_generated = True def all_gather(self, input): @@ -872,6 +932,16 @@ class GeneralRoleMaker(RoleMakerBase): return intf_name return "lo" + def __start_kv_server(self, http_server_d, size_d): + from paddle.fluid.incubate.fleet.utils.http_server import KVServer + http_server = KVServer(int(self._http_ip_port[1]), size_d) + http_server.start() + wait_seconds = 5 + while http_server_d.get("running", + False) and not http_server.shoud_stop(): + time.sleep(wait_seconds) + http_server.stop() + class UserDefinedRoleMaker(RoleMakerBase): """ diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 210640f64c1..c1ec749ac1f 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -567,6 +567,24 @@ class PSLib(Fleet): model_proto_file, table_var_names, load_combine) self._role_maker._barrier_worker() + def confirm(self): + """ + confirm all the updated params in current pass + """ + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + self._fleet_ptr.confirm() + self._role_maker._barrier_worker() + + def revert(self): + """ + revert all the updated params in current pass + """ + self._role_maker._barrier_worker() + if self._role_maker.is_first_worker(): + self._fleet_ptr.revert() + self._role_maker._barrier_worker() + def load_model(self, model_dir=None, **kwargs): """ load pslib model, there are at least 4 modes, these modes are the same diff --git a/python/paddle/fluid/incubate/fleet/utils/http_server.py b/python/paddle/fluid/incubate/fleet/utils/http_server.py new file mode 100644 index 00000000000..3573f417f35 --- /dev/null +++ b/python/paddle/fluid/incubate/fleet/utils/http_server.py @@ -0,0 +1,187 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Http Server.""" + +import logging +import BaseHTTPServer +import SimpleHTTPServer +import time +import threading +import socket + + +def get_logger(name, level, fmt): + logger = logging.getLogger(name) + logger.setLevel(level) + handler = logging.FileHandler('http.log', mode='w') + formatter = logging.Formatter(fmt=fmt) + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + +_http_server_logger = get_logger( + __name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s') + + +class KVHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): + """ + kv handler class for kv http server, + it defines the way to get/set kv in server. + """ + + def do_GET(self): + """ + get method for kv handler, get value according to key. + """ + log_str = "GET " + self.address_string() + self.path + paths = self.path.split('/') + if len(paths) < 3: + print('len of request path must be 3: ' + self.path) + self.send_status_code(400) + return + _, scope, key = paths + with self.server.kv_lock: + value = self.server.kv.get(scope, {}).get(key) + if value is None: + log_str += ' , key not found: ' + key + self.send_status_code(404) + else: + log_str += ' , key found: ' + key + self.send_response(200) + self.send_header("Content-Length", str(len(value))) + self.end_headers() + self.wfile.write(value) + _http_server_logger.info(log_str) + + def do_PUT(self): + """ + put method for kv handler, set value according to key. + """ + log_str = "PUT " + self.address_string() + self.path + paths = self.path.split('/') + if len(paths) < 3: + print('len of request path must be 3: ' + self.path) + self.send_status_code(400) + return + _, scope, key = paths + content_length = int(self.headers['Content-Length']) + try: + value = self.rfile.read(content_length) + except: + print("receive error invalid request") + self.send_status_code(404) + return + with self.server.kv_lock: + if self.server.kv.get(scope) is None: + self.server.kv[scope] = {} + self.server.kv[scope][key] = value + self.send_status_code(200) + _http_server_logger.info(log_str) + + def do_DELETE(self): + """ + delete method for kv handler, set value according to key. + """ + log_str = "DELETE " + self.address_string() + self.path + paths = self.path.split('/') + if len(paths) < 3: + print('len of request path must be 3: ' + self.path) + self.send_status_code(400) + return + _, scope, key = paths + with self.server.delete_kv_lock: + if self.server.delete_kv.get(scope) is None: + self.server.delete_kv[scope] = [] + self.server.delete_kv[scope].append(key) + self.send_status_code(200) + _http_server_logger.info(log_str) + + def log_message(self, format, *args): + """ + ignore all logging messages in kv handler. + """ + pass + + def send_status_code(self, code): + """ + send status code back to client. + """ + self.send_response(code) + self.send_header("Content-Length", 0) + self.end_headers() + + +class KVHTTPServer(BaseHTTPServer.HTTPServer, object): + """ + it is a http server storing kv pairs. + """ + + def __init__(self, port, handler): + """Init.""" + super(KVHTTPServer, self).__init__(('', port), handler) + self.delete_kv_lock = threading.Lock() + self.delete_kv = {} + self.kv_lock = threading.Lock() + self.kv = {} + + def get_deleted_size(self, key): + """ + get deleted size in key. + """ + ret = 0 + with self.delete_kv_lock: + ret = self.delete_kv.get(key, 0) + return ret + + +class KVServer: + """ + it is a server storing kv pairs, has a http server inside. + """ + + def __init__(self, port, size={}): + """Init.""" + self.http_server = KVHTTPServer(port, KVHandler) + self.listen_thread = None + self.size = {} + + def start(self): + """ + start server until user calls stop to let it quit. + """ + self.listen_thread = threading.Thread( + target=lambda: self.http_server.serve_forever()) + self.listen_thread.start() + + def stop(self): + """ + stop server and clear its resources. + """ + self.http_server.shutdown() + self.listen_thread.join() + self.http_server.server_close() + + def shoud_stop(self): + """ + return whether the server should stop. + + Returns: + ret(bool): whether the server should stop + """ + for key in self.size: + s = self.http_server.get_deleted_size(key) + if s != self.size.get(key, 0): + return False + return True diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index eda1054bbf1..c3c8009b9e8 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -4540,7 +4540,8 @@ def data_norm(input, do_model_average_for_mean_and_var=True, slot_dim=-1, sync_stats=False, - summary_decay_rate=0.9999999): + summary_decay_rate=0.9999999, + enable_scale_and_shift=False): """ **Data Normalization Layer** @@ -4589,6 +4590,7 @@ def data_norm(input, sync_stats(bool, Default False): When running with multiple GPU cards, using allreduce to sync the summary messages. summary_decay_rate(float, Default 0.9999999): The decay rate when updating summary. + enable_scale_and_shift(bool, Default False): do scale&shift after normalization. Returns: Variable: A tensor variable which is the result after applying data normalization on the input. @@ -4619,12 +4621,35 @@ def data_norm(input, batch_size_default = 1e4 batch_sum_default = 0.0 batch_square_sum_default = 1e4 + scale_w_default = 1.0 + bias_default = 0.0 if param_attr and isinstance(param_attr, dict): batch_size_default = param_attr.get("batch_size", 1e4) batch_sum_default = param_attr.get("batch_sum", 0.0) batch_square_sum_default = param_attr.get("batch_square", 1e4) - + if enable_scale_and_shift: + scale_w_default = param_attr.get("scale_w", 1.0) + bias_default = param_attr.get("bias", 0.0) + + # create scale and shift(bias) when enable_scale_and_shift is True + if name == None: + name = "dn" + if enable_scale_and_shift: + scale_w = helper.create_parameter( + attr=ParamAttr( + name=name + '.scale_w', + initializer=Constant(value=float(scale_w_default)), + trainable=True), + shape=param_shape, + dtype=input.dtype) + bias = helper.create_parameter( + attr=ParamAttr( + name=name + '.bias', + initializer=Constant(value=float(bias_default)), + trainable=True), + shape=param_shape, + dtype=input.dtype) # create parameter batch_size = helper.create_parameter( attr=ParamAttr( @@ -4655,14 +4680,18 @@ def data_norm(input, data_norm_out = input if in_place else helper.create_variable(dtype=dtype) + inputs = { + "X": input, + "BatchSize": batch_size, + "BatchSum": batch_sum, + "BatchSquareSum": batch_square_sum + } + if enable_scale_and_shift: + inputs["scale_w"] = scale_w + inputs["bias"] = bias helper.append_op( type="data_norm", - inputs={ - "X": input, - "BatchSize": batch_size, - "BatchSum": batch_sum, - "BatchSquareSum": batch_square_sum - }, + inputs=inputs, outputs={ "Y": data_norm_out, "Means": means, @@ -4675,7 +4704,8 @@ def data_norm(input, "epsilon": epsilon, "slot_dim": slot_dim, "sync_stats": sync_stats, - "summary_decay_rate": summary_decay_rate + "summary_decay_rate": summary_decay_rate, + "enable_scale_and_shift": enable_scale_and_shift }) return helper.append_activation(data_norm_out) diff --git a/python/paddle/fluid/tests/unittests/test_data_norm_op.py b/python/paddle/fluid/tests/unittests/test_data_norm_op.py index e2bbf8a077f..0b7ed20f4b1 100644 --- a/python/paddle/fluid/tests/unittests/test_data_norm_op.py +++ b/python/paddle/fluid/tests/unittests/test_data_norm_op.py @@ -24,6 +24,7 @@ import paddle.fluid.layers as layers import os from op_test import OpTest from paddle.fluid.framework import grad_var_name +from paddle.fluid import Program, program_guard def _reference_testing(x, batch_size, batch_sum, batch_square_sum, slot_dim=-1): @@ -72,7 +73,13 @@ class TestDataNormOpInference(unittest.TestCase): def __assert_close(self, tensor, np_array, msg, atol=1e-4): self.assertTrue(np.allclose(np.array(tensor), np_array, atol=atol), msg) - def check_with_place(self, place, data_layout, dtype, shape, slot_dim=-1): + def check_with_place(self, + place, + data_layout, + dtype, + shape, + slot_dim=-1, + enable_scale_and_shift=False): """ do forward and check @@ -82,7 +89,7 @@ class TestDataNormOpInference(unittest.TestCase): dtype(dtype): np.float32 shape(list): input shape slot_dim(int): dimension of one slot. Refer to data_norm api. - + enable_scale_and_shift(bool): if enable scale and shift after normalization. """ epsilon = 0.00001 @@ -127,21 +134,49 @@ class TestDataNormOpInference(unittest.TestCase): mean_tensor = create_or_get_tensor(scope, "mean", None, place) scales_tensor = create_or_get_tensor(scope, "scales", None, place) - data_norm_op = Operator( - "data_norm", - # inputs - X="x_val", - BatchSize="batch_size", - BatchSum="batch_sum", - BatchSquareSum="batch_square_sum", - # outputs - Y="y_out", - Means="mean", - Scales="scales", - # attrs - epsilon=epsilon, - use_mkldnn=self.use_mkldnn, - slot_dim=slot_dim) + if not enable_scale_and_shift: + data_norm_op = Operator( + "data_norm", + # inputs + X="x_val", + BatchSize="batch_size", + BatchSum="batch_sum", + BatchSquareSum="batch_square_sum", + # outputs + Y="y_out", + Means="mean", + Scales="scales", + # attrs + epsilon=epsilon, + use_mkldnn=self.use_mkldnn, + slot_dim=slot_dim, + enable_scale_and_shift=False) + else: + scale_w = np.ones(scale_shape).astype(np.float32) + bias = np.zeros(scale_shape).astype(np.float32) + scale_w_tensor = create_or_get_tensor( + scope, "scale_w", + OpTest.np_dtype_to_fluid_dtype(scale_w), place) + bias_tensor = create_or_get_tensor( + scope, "bias", OpTest.np_dtype_to_fluid_dtype(bias), place) + data_norm_op = Operator( + "data_norm", + # inputs + X="x_val", + BatchSize="batch_size", + BatchSum="batch_sum", + BatchSquareSum="batch_square_sum", + scale_w="scale_w", + bias="bias", + # outputs + Y="y_out", + Means="mean", + Scales="scales", + # attrs + epsilon=epsilon, + use_mkldnn=self.use_mkldnn, + slot_dim=slot_dim, + enable_scale_and_shift=True) data_norm_op.run(scope, place) @@ -162,11 +197,13 @@ class TestDataNormOpInference(unittest.TestCase): for place in places: for data_format in ["NCHW", "NHWC"]: for slot_dim in [-1, 1]: - self.check_with_place( - place, - data_format, - self.dtype, [2, 3], - slot_dim=slot_dim) + for enable_scale_and_shift in [False, True]: + self.check_with_place( + place, + data_format, + self.dtype, [2, 3], + slot_dim=slot_dim, + enable_scale_and_shift=enable_scale_and_shift) class TestDataNormOp(OpTest): @@ -220,6 +257,130 @@ class TestDataNormOp(OpTest): self.check_grad(['X'], 'Y', no_grad_set=set([])) +class TestDataNormOpWithEnableScaleAndShift(OpTest): + """ + test class for data norm op + test forward and backward + """ + + def setUp(self): + """ + init data norm op test env + """ + self.op_type = 'data_norm' + self.use_mkldnn = False + epsilon = 0.00001 + slot_dim = -1 + enable_scale_and_shitf = True + x_shape = [2, 50] + scale_shape = [50] + tp = np.float32 + + x_val = np.random.uniform(-1, 1, x_shape).astype(tp) + batch_size = np.ones(scale_shape).astype(tp) + batch_size *= 1e4 + batch_sum = np.zeros(scale_shape).astype(tp) + batch_square_sum = np.ones(scale_shape).astype(tp) + batch_square_sum *= 1e4 + scale_w = np.ones(scale_shape).astype(tp) + bias = np.zeros(scale_shape).astype(tp) + + y = np.array(x_val) + + mean = np.zeros(x_shape).astype(tp) + scale = np.ones(x_shape).astype(tp) + + self.inputs = { + "X": x_val, + "BatchSize": batch_size, + "BatchSum": batch_sum, + "BatchSquareSum": batch_square_sum, + "scale_w": scale_w, + "bias": bias + } + self.outputs = {"Y": y, "Means": mean, "Scales": scale} + self.attrs = { + "epsilon": epsilon, + "use_mkldnn": self.use_mkldnn, + "slot_dim": slot_dim, + "enable_scale_and_shift": True + } + + def test_check_output(self): + """ + test check forward, check output + """ + self.check_output() + + def test_check_grad(self): + """ + test check backward, check grad + """ + self.check_grad(['X'], 'Y', no_grad_set=set([])) + + +class TestDataNormOpWithEnableScaleAndShift_1(OpTest): + """ + test class for data norm op + test forward and backward + """ + + def setUp(self): + """ + init data norm op test env + """ + self.op_type = 'data_norm' + self.use_mkldnn = False + epsilon = 0.00001 + slot_dim = 1 + enable_scale_and_shitf = True + x_shape = [2, 50] + scale_shape = [50] + tp = np.float32 + + x_val = np.random.uniform(-1, 1, x_shape).astype(tp) + batch_size = np.ones(scale_shape).astype(tp) + batch_size *= 1e4 + batch_sum = np.zeros(scale_shape).astype(tp) + batch_square_sum = np.ones(scale_shape).astype(tp) + batch_square_sum *= 1e4 + scale_w = np.ones(scale_shape).astype(tp) + bias = np.zeros(scale_shape).astype(tp) + + y = np.array(x_val) + + mean = np.zeros(x_shape).astype(tp) + scale = np.ones(x_shape).astype(tp) + + self.inputs = { + "X": x_val, + "BatchSize": batch_size, + "BatchSum": batch_sum, + "BatchSquareSum": batch_square_sum, + "scale_w": scale_w, + "bias": bias + } + self.outputs = {"Y": y, "Means": mean, "Scales": scale} + self.attrs = { + "epsilon": epsilon, + "use_mkldnn": self.use_mkldnn, + "slot_dim": slot_dim, + "enable_scale_and_shift": True + } + + def test_check_output(self): + """ + test check forward, check output + """ + self.check_output() + + def test_check_grad(self): + """ + test check backward, check grad + """ + self.check_grad(['X'], 'Y', no_grad_set=set([])) + + class TestDataNormOpWithSlotDim(OpTest): """ test class for data norm op @@ -399,5 +560,14 @@ class TestDataNormOpWithSyncStats(unittest.TestCase): os.remove(f) +class TestDataNormOpErrorr(unittest.TestCase): + def test_errors(self): + with program_guard(Program(), Program()): + x2 = fluid.layers.data(name='x2', shape=[3, 4], dtype="int32") + #self.assertRaises(TypeError, fluid.data_norm, x2) + fluid.layers.data_norm( + input=x2, param_attr={}, enable_scale_and_shift=True) + + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py b/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py index 5aa28673437..a3038d1fb88 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_nocvm_1.py @@ -96,6 +96,8 @@ class TestFleet1(unittest.TestCase): fleet.save_one_table(0, "./model_002", prefix="hahaha") fleet.load_model("./model_0003") fleet.load_one_table(0, "./model_004") + fleet.confirm() + fleet.revert() except: print("do not support pslib test, skip") return diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py index 5fee4458a71..4e7de7c6ba0 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py @@ -97,7 +97,7 @@ class TestCloudRoleMaker2(unittest.TestCase): role4._worker_gather(1) role4._get_rank() role4._get_size() - role4._all_comm.init(0, 0, "", "", "", "", "") + role4._all_comm.init() role5 = GeneralRoleMaker(path="./test_gloo_5") role5.get_local_endpoint() role5.get_local_endpoint() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_3.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_3.py new file mode 100644 index 00000000000..be1e228f436 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_3.py @@ -0,0 +1,84 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test cloud role maker.""" + +from __future__ import print_function +import os +import unittest +import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + +class TestCloudRoleMaker(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMaker. + """ + + def setUp(self): + """Set up, set envs.""" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" + + def test_pslib_1(self): + """Test cases for pslib.""" + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib + from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker + try: + import netifaces + except: + print("warning: no netifaces, skip test_pslib_1") + return + os.environ["POD_IP"] = "127.0.0.1" + os.environ["PADDLE_PORT"] = "36001" + os.environ["TRAINING_ROLE"] = "TRAINER" + os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001" + os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002" + os.environ["PADDLE_TRAINER_ID"] = "0" + role_maker = GeneralRoleMaker( + init_timeout_seconds=100, + run_timeout_seconds=100, + http_ip_port="127.0.0.1:36003") + role_maker.generate_role() + place = fluid.CPUPlace() + exe = fluid.Executor(place) + fleet.init(role_maker) + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + with fluid.program_guard(train_program, startup_program): + show = fluid.layers.data(name="show", shape=[-1, 1], \ + dtype="float32", lod_level=1, append_batch_size=False) + fc = fluid.layers.fc(input=show, size=1, act=None) + label = fluid.layers.data(name="click", shape=[-1, 1], \ + dtype="int64", lod_level=1, append_batch_size=False) + label_cast = fluid.layers.cast(label, dtype='float32') + cost = fluid.layers.log_loss(fc, label_cast) + try: + adam = fluid.optimizer.Adam(learning_rate=0.000005) + adam = fleet.distributed_optimizer(adam) + adam.minimize([cost], [scope]) + fleet.run_server() + http_server_d = {} + http_server_d["running"] = False + size_d = {} + role_maker._GeneralRoleMaker__start_kv_server(http_server_d, size_d) + except: + print("do not support pslib test, skip") + return + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py new file mode 100644 index 00000000000..dd5cd715ecd --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_4.py @@ -0,0 +1,196 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test cloud role maker.""" + +from __future__ import print_function +import os +import unittest +import paddle.fluid.incubate.fleet.base.role_maker as role_maker + + +class TestCloudRoleMaker(unittest.TestCase): + """ + Test cases for PaddleCloudRoleMaker. + """ + + def setUp(self): + """Set up, set envs.""" + os.environ["PADDLE_TRAINERS_NUM"] = "2" + os.environ[ + "PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001" + + def test_pslib_1(self): + """Test cases for pslib.""" + import sys + import threading + import paddle.fluid as fluid + try: + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib + from paddle.fluid.incubate.fleet.base.role_maker import \ + GeneralRoleMaker + from paddle.fluid.incubate.fleet.utils.http_server import KVHandler + from paddle.fluid.incubate.fleet.utils.http_server import KVServer + from paddle.fluid.incubate.fleet.utils.http_server import \ + KVHTTPServer + except: + print("warning: no fleet, skip test_pslib_4") + return + + try: + import netifaces + except: + print("warning: no netifaces, skip test_pslib_4") + return + + class FakeStream(): + """ + it is a fake stream only for test. + """ + + def write(self, a): + """ + write a to stream, do nothing + + Args: + a(str): the string to write + """ + pass + + def read(self, b): + """ + read data of len b from stream, do nothing + + Args: + b(str): the len to read + + Returns: + c(str): the result + """ + if b == 0: + raise ValueError("this is only for test") + return "fake" + + import os + + try: + + class TmpKVHander(KVHandler): + """ + it is a fake handler only for this test case. + """ + + def __init__(self, server): + """Init.""" + self.path = "a/b/c" + self.server = server + self.wfile = FakeStream() + self.rfile = FakeStream() + self.headers = {} + self.headers['Content-Length'] = 0 + + def address_string(self): + """ + fake address string, it will do nothing. + """ + return "123" + + def send_response(self, code): + """ + fake send response, it will do nothing. + + Args: + code(int): error code + """ + pass + + def send_header(self, a, b): + """ + fake send header, it will do nothing. + + Args: + a(str): some header + b(str): some header + """ + pass + + def end_headers(self): + """ + fake end header, it will do nothing. + """ + pass + except: + print("warning: no KVHandler, skip test_pslib_4") + return + + import sys + + try: + + class TmpServer(KVHTTPServer): + """ + it is a fake server only for this test case. + """ + + def __init__(self): + """Init.""" + self.delete_kv_lock = threading.Lock() + self.delete_kv = {} + self.kv_lock = threading.Lock() + self.kv = {} + except: + print("warning: no KVHTTPServer, skip test_pslib_4") + return + + try: + + class TmpS(KVServer): + """ + it is a fake server only for this test case. + """ + + def __init__(self): + """Init.""" + self.http_server = TmpServer() + self.listen_thread = None + self.size = {} + self.size["a"] = 999 + except: + print("warning: no KVServer, skip test_pslib_4") + return + + s = TmpServer() + h = TmpKVHander(s) + h.do_GET() + h.path = "a/b" + h.do_GET() + h.do_PUT() + h.do_DELETE() + h.path = "a/b/c" + s.kv["b"] = {} + s.kv["b"]["c"] = "456" + h.do_GET() + h.path = "a/d/e" + h.do_PUT() + h.headers['Content-Length'] = 1 + h.do_PUT() + h.do_DELETE() + h.log_message("666") + s.get_deleted_size("haha") + s1 = TmpS() + s1.shoud_stop() + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/trainer_desc.py b/python/paddle/fluid/trainer_desc.py index 2a206e7fa2f..c70d4f7b731 100644 --- a/python/paddle/fluid/trainer_desc.py +++ b/python/paddle/fluid/trainer_desc.py @@ -49,6 +49,8 @@ class TrainerDesc(object): self._infer = False def _set_fetch_var_and_info(self, fetch_vars, fetch_info, print_period): + # convert fetch_info to list + fetch_info = list(fetch_info) for i, v in enumerate(fetch_vars): self.proto_desc.fetch_config.fetch_var_names.extend([v.name]) self.proto_desc.fetch_config.fetch_var_str_format.extend( diff --git a/python/paddle/fluid/trainer_factory.py b/python/paddle/fluid/trainer_factory.py index 0e071251bb2..f0bc4a90292 100644 --- a/python/paddle/fluid/trainer_factory.py +++ b/python/paddle/fluid/trainer_factory.py @@ -17,10 +17,10 @@ import threading import time import logging import numpy as np +from paddle.fluid.log_helper import get_logger -FORMAT = '%(asctime)s-%(levelname)s: %(message)s' -logging.basicConfig(level=logging.INFO, format=FORMAT) -local_logger = logging.getLogger(__name__) +local_logger = get_logger( + __name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s') from .trainer_desc import MultiTrainer, DistMultiTrainer, PipelineTrainer from .device_worker import Hogwild, DownpourSGD, Section, DownpourSGDOPT -- GitLab