未验证 提交 e8d5868e 编写于 作者: T Thunderbrook 提交者: GitHub

cherry-pick fix dataset py3; fixlogger; add timeout of gloo; modify datanorm op (#26046)

* fix dataset py3 (#25012)

* fix dataset py3 error
* test=develop

* fix logger (#24682)

* fix logger of FetchHandler,which may print log twice
* test=develop

* add timeout and http store in communication (#23436)

* add timeout and http store in communication, add revert and confirm in fleet
* test=develop

* modify datanorm op test=develop (#23030)
Co-authored-by: Nxujiaqi01 <173596896@qq.com>
Co-authored-by: Nyaoxuefeng <yaoxuefeng@baidu.com>
上级 5e555279
......@@ -1166,6 +1166,28 @@ int32_t FleetWrapper::CopyTable(const uint64_t src_table_id,
#endif
}
void FleetWrapper::Confirm() {
#ifdef PADDLE_WITH_PSLIB
// FIXME(xujiaqi01): will later support confirm
// auto ret = pslib_ptr_->_worker_ptr->confirm();
// ret.wait();
VLOG(0) << "disable FleetWrapper::Confirm temporarily";
#else
VLOG(0) << "FleetWrapper::Confirm does nothing when no pslib";
#endif
}
void FleetWrapper::Revert() {
#ifdef PADDLE_WITH_PSLIB
// FIXME(xujiaqi01): will later support revert
// auto ret = pslib_ptr_->_worker_ptr->revert();
// ret.wait();
VLOG(0) << "disable FleetWrapper::Revert temporarily";
#else
VLOG(0) << "FleetWrapper::Revert does nothing when no pslib";
#endif
}
int32_t FleetWrapper::CopyTableByFeasign(
const uint64_t src_table_id, const uint64_t dest_table_id,
const std::vector<uint64_t>& feasign_list) {
......
......@@ -268,6 +268,10 @@ class FleetWrapper {
// send client to client message
std::future<int32_t> SendClientToClientMsg(int msg_type, int to_client_id,
const std::string& msg);
// confirm all the updated params in the current pass
void Confirm();
// revert all the updated params in the current pass
void Revert();
// FleetWrapper singleton
static std::shared_ptr<FleetWrapper> GetInstance() {
if (NULL == s_instance_) {
......
......@@ -10,16 +10,18 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include <thread> // NOLINT
#include <vector>
#include "paddle/fluid/framework/io/fs.h"
#include "paddle/fluid/platform/errors.h"
#include "paddle/fluid/string/string_helper.h"
namespace gloo {
namespace rendezvous {
HdfsStore::HdfsStore(const std::string& path) {
path_ = path;
wait_sleep_ms_ = 3000;
wait_sleep_ms_ = 10000;
wait_timeout_ = std::chrono::seconds(999999999);
retry_times_ = 100;
}
......@@ -35,49 +37,86 @@ void HdfsStore::set(const std::string& key, const std::vector<char>& data) {
}
int err_no = 0;
for (int i = 1; i <= retry_times_; ++i) {
err_no = 0;
std::shared_ptr<FILE> fp =
paddle::framework::fs_open_write(tmp, &err_no, "");
if (err_no != 0) {
VLOG(0) << "fs_open_write failed, retry times " << i << " err no "
<< err_no;
fp.reset();
sleep(wait_sleep_ms_ / 1000);
continue;
}
size_t write_count = fwrite_unlocked(data.data(), 1, data.size(), fp.get());
if (write_count != data.size()) {
VLOG(0) << "fwrite_unlocked failed, retry times " << i << " write_count "
<< write_count << " data.size() " << data.size();
fp.reset();
sleep(2);
continue;
err_no = -1;
}
fp.reset();
break;
if (err_no != 0) {
VLOG(0) << "fs_open_write failed, retry times " << i << " err no "
<< err_no;
sleep(wait_sleep_ms_ / 1000);
paddle::framework::fs_remove(tmp);
if (i == retry_times_) {
VLOG(0) << "fs_open_write failed, retry times reaches limit";
PADDLE_THROW(platform::errors::PreconditionNotMet(
"fs_open_write failed, retry times reaches"
" limit ",
retry_times_));
}
} else {
break;
}
}
paddle::framework::fs_mv(tmp, path);
#endif
}
#ifdef PADDLE_WITH_GLOO
int retry_do_func(std::function<int(void)> func, uint32_t max_try_time,
uint32_t retry_interval_ms) {
for (uint32_t i = 0; i < max_try_time; ++i) {
if (func() == 0) {
return 0;
}
#ifdef _LINUX
usleep(retry_interval_ms * 1000);
#endif
}
return -1;
}
#endif
std::vector<char> HdfsStore::get(const std::string& key) {
auto path = ObjectPath(key);
std::vector<char> result;
#ifdef PADDLE_WITH_GLOO
// block until key is set
wait({key});
bool is_exists = paddle::framework::fs_exists(path);
int ret = retry_do_func(
[&path]() { return paddle::framework::fs_exists(path) ? 0 : -1; }, 5,
wait_sleep_ms_);
bool is_exists = (ret == 0);
PADDLE_ENFORCE_EQ(is_exists, true,
paddle::platform::errors::NotFound(
"HdfsStore::get, path not exists: " + path));
int err_no = 0;
std::shared_ptr<FILE> fp = paddle::framework::fs_open_read(path, &err_no, "");
char buffer = '\0';
size_t read_count = 0;
while (fread(&buffer, 1, 1, fp.get()) == 1) {
++read_count;
result.push_back(buffer);
}
VLOG(3) << "HdfsStore::get read_count " << read_count;
int read_status = retry_do_func(
[&path, &result]() {
result.clear();
int err_no = 0;
{
std::shared_ptr<FILE> fp =
paddle::framework::fs_open_read(path, &err_no, "");
char buffer = '\0';
size_t read_count = 0;
while (fread(&buffer, 1, 1, fp.get()) == 1) {
++read_count;
result.push_back(buffer);
}
VLOG(3) << "HdfsStore::get read_count " << read_count;
}
return err_no;
},
5, wait_sleep_ms_);
PADDLE_ENFORCE_EQ(read_status, 0,
paddle::platform::errors::Fatal(
"HdfsStore::get, path read faied: " + path));
#endif
return result;
}
......@@ -92,22 +131,33 @@ void HdfsStore::wait(const std::vector<std::string>& keys,
const std::chrono::milliseconds&) { // NOLINT
#ifdef PADDLE_WITH_GLOO
auto start = std::chrono::steady_clock::now();
while (!Check(keys)) {
std::vector<bool> check_key_status(keys.size(), false);
while (!Check(keys, &check_key_status)) {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start);
if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) {
PADDLE_ENFORCE_EQ(0, 1, paddle::platform::errors::ExecutionTimeout(
"HdfsStore::wait, Wait timeout for key(s): " +
::gloo::MakeString(keys)));
int32_t last_check_rank = -1;
for (size_t i = 0; i < check_key_status.size(); ++i) {
if (!check_key_status[i]) {
last_check_rank = i;
break;
}
}
PADDLE_THROW(platform::errors::ExecutionTimeout(
"TIMEOUT self_rank = %d pair_rank = %d", self_rank_,
last_check_rank));
}
std::this_thread::sleep_for(std::chrono::milliseconds(wait_sleep_ms_));
}
#endif
}
void HdfsStore::SetTimeoutSeconds(int timeout_seconds) {
wait_timeout_ = std::chrono::seconds(timeout_seconds);
}
std::string HdfsStore::EncodeName(const std::string& name) {
thread_local std::hash<std::string> hash_func;
return std::to_string(hash_func(name));
return ::paddle::string::erase_spaces(name);
}
std::string HdfsStore::TmpPath(const std::string& name) {
......@@ -118,50 +168,124 @@ std::string HdfsStore::ObjectPath(const std::string& name) {
return path_ + "/" + EncodeName(name);
}
bool HdfsStore::Check(const std::vector<std::string>& keys) {
bool HdfsStore::Check(const std::vector<std::string>& keys,
std::vector<bool>* keys_check_status) {
#ifdef PADDLE_WITH_GLOO
bool ret = true;
std::vector<std::string> paths;
for (const auto& key : keys) {
paths.push_back(ObjectPath(key));
}
for (const auto& path : paths) {
for (size_t i = 0; i < paths.size(); ++i) {
if ((*keys_check_status)[i]) {
continue;
}
const auto& path = paths[i];
bool is_exists = paddle::framework::fs_exists(path);
VLOG(3) << "HdfsStore::Check " << is_exists << " path " << path;
if (!is_exists) {
return false;
ret = false;
}
(*keys_check_status)[i] = is_exists;
}
return ret;
#else
VLOG(0) << "HdfsStore::Check does nothing when no gloo";
#endif
return true;
}
#ifdef PADDLE_WITH_GLOO
void ParallelConnectContext::connectFullMesh(
Store& store, std::shared_ptr<transport::Device>& dev) {
std::vector<char> allBytes;
// Create pairs
auto transportContext = dev->createContext(rank, size);
transportContext->setTimeout(getTimeout());
for (int i = 0; i < size; i++) {
if (i == rank) {
continue;
}
auto& pair = transportContext->createPair(i);
auto addrBytes = pair->address().bytes();
allBytes.insert(allBytes.end(), addrBytes.begin(), addrBytes.end());
}
std::ostringstream storeKey;
storeKey << rank;
store.set(storeKey.str(), allBytes);
std::vector<std::shared_ptr<std::thread>> connect_threads(thread_num_);
// Connect every pair
for (uint32_t i = 0; i < connect_threads.size(); ++i) {
connect_threads[i].reset(new std::thread(
[&store, &transportContext, this](size_t thread_idx,
size_t thread_num) -> void {
for (int i = thread_idx; i < size; i += thread_num) {
if (i == rank) {
continue;
}
// Wait for address of other side of this pair to become available
std::string key = std::to_string(i);
store.wait({key}, getTimeout());
// Connect to other side of this pair
auto allAddrs = store.get(key);
auto addr = extractAddress(allAddrs, i);
transportContext->getPair(i)->connect(addr);
}
},
i, connect_threads.size()));
}
for (uint32_t i = 0; i < connect_threads.size(); ++i) {
connect_threads[i]->join();
}
device_ = dev;
transportContext_ = std::move(transportContext);
}
#endif
} // namespace rendezvous
} // namespace gloo
namespace paddle {
namespace framework {
void GlooWrapper::Init(int rank, int size, const std::string& path,
const std::string& fs_name, const std::string& fs_ugi,
const std::string& iface, const std::string& prefix) {
void GlooWrapper::Init() {
if (is_initialized_) {
return;
}
rank_ = rank;
size_ = size;
std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs");
cmd += " -D fs.default.name=" + fs_name;
cmd += " -D hadoop.job.ugi=" + fs_ugi;
paddle::framework::hdfs_set_command(cmd);
#ifdef PADDLE_WITH_GLOO
gloo::transport::tcp::attr attr;
attr.iface = iface;
auto file_store = gloo::rendezvous::HdfsStore(path);
auto prefix_store = gloo::rendezvous::PrefixStore(prefix, file_store);
attr.iface = iface_;
std::shared_ptr<gloo::rendezvous::HdfsStore> file_store = nullptr;
std::shared_ptr<gloo::rendezvous::HTTPStore> http_store = nullptr;
auto context =
std::make_shared<gloo::rendezvous::ParallelConnectContext>(rank_, size_);
context->setTimeout(run_timeout_);
auto dev = gloo::transport::tcp::CreateDevice(attr);
auto context = std::make_shared<gloo::rendezvous::Context>(rank, size);
context->setTimeout(file_store.wait_timeout_);
context->connectFullMesh(prefix_store, dev);
switch (store_type_) {
case GlooStoreType::HDFS: {
std::string cmd = std::string("${HADOOP_HOME}/bin/hadoop fs");
cmd += " -D fs.default.name=" + hdfs_name_;
cmd += " -D hadoop.job.ugi=" + hdfs_ugi_;
paddle::framework::hdfs_set_command(cmd);
file_store = std::make_shared<gloo::rendezvous::HdfsStore>(hdfs_path_);
file_store->SetTimeoutSeconds(init_timeout_.count());
auto prefix_store =
std::make_shared<gloo::rendezvous::PrefixStore>(prefix_, *file_store);
context->connectFullMesh(*prefix_store, dev);
break;
}
case GlooStoreType::HTTP: {
http_store = std::make_shared<gloo::rendezvous::HTTPStore>(
http_ip_, http_port_, prefix_ + "_" + http_scope_, rank_);
http_store->SetTimeoutSeconds(init_timeout_.count());
context->connectFullMesh(*http_store, dev);
http_store->Finalize();
break;
}
default:
LOG(ERROR) << "unknown store type " << store_type_;
exit(-1);
}
context_ = std::move(context);
#endif
is_initialized_ = true;
......@@ -170,6 +294,9 @@ void GlooWrapper::Init(int rank, int size, const std::string& path,
template std::vector<int64_t> GlooWrapper::AllReduce<int64_t>(
std::vector<int64_t>& sendbuf, // NOLINT
const std::string& mode);
template std::vector<float> GlooWrapper::AllReduce<float>(
std::vector<float>& sendbuf, // NOLINT
const std::string& mode);
template std::vector<double> GlooWrapper::AllReduce<double>(
std::vector<double>& sendbuf, // NOLINT
const std::string& mode);
......@@ -180,6 +307,8 @@ template std::vector<int64_t> GlooWrapper::AllGather<int64_t>(
int64_t& input); // NOLINT
template std::vector<uint64_t> GlooWrapper::AllGather<uint64_t>(
uint64_t& input); // NOLINT
template std::vector<float> GlooWrapper::AllGather<float>(
float& input); // NOLINT
template std::vector<double> GlooWrapper::AllGather<double>(
double& input); // NOLINT
......
......@@ -31,6 +31,7 @@ limitations under the License. */
#include <gloo/barrier.h>
#include <gloo/rendezvous/context.h>
#include <gloo/rendezvous/file_store.h>
#include <gloo/rendezvous/http_store.h>
#include <gloo/rendezvous/prefix_store.h>
#include <gloo/rendezvous/store.h>
#include <gloo/transport/tcp/device.h>
......@@ -59,44 +60,87 @@ class HdfsStore {
virtual void wait(const std::vector<std::string>& keys,
const std::chrono::milliseconds& timeout);
virtual void SetTimeoutSeconds(int timeout_seconds);
std::string EncodeName(const std::string& name);
std::string TmpPath(const std::string& name);
std::string ObjectPath(const std::string& name);
bool Check(const std::vector<std::string>& keys);
bool Check(const std::vector<std::string>& keys,
std::vector<bool>* keys_check_status);
void SetRank(int rank) { self_rank_ = rank; }
std::string path_;
int wait_sleep_ms_;
std::chrono::seconds wait_timeout_;
int retry_times_;
int self_rank_;
};
#ifdef PADDLE_WITH_GLOO
class ParallelConnectContext : public gloo::rendezvous::Context {
public:
ParallelConnectContext(int rank, int size, int base = 2)
: gloo::rendezvous::Context(rank, size, base) {}
virtual ~ParallelConnectContext() {}
// in gloo::rendezvous::Context wait&get one by one,
// slowly in case big size, especialy in HdfsStore
void connectFullMesh(Store& store, // NOLINT
std::shared_ptr<transport::Device>& dev); // NOLINT
protected:
int thread_num_ = 6;
};
#endif
} // namespace rendezvous
} // namespace gloo
namespace paddle {
namespace framework {
enum GlooStoreType { HDFS, HTTP };
class GlooWrapper {
public:
GlooWrapper() {}
virtual ~GlooWrapper() {}
void Init(int rank, int size, const std::string& path,
const std::string& fs_name, const std::string& fs_ugi,
const std::string& iface, const std::string& prefix);
void Init();
int Rank() {
CHECK_EQ(is_initialized_, true);
return rank_;
void SetTimeoutSeconds(int init_seconds, int run_seconds) {
init_timeout_ = std::chrono::seconds(init_seconds);
run_timeout_ = std::chrono::seconds(run_seconds);
}
int Size() {
CHECK_EQ(is_initialized_, true);
return size_;
int Rank() { return rank_; }
int Size() { return size_; }
void SetRank(int rank) { rank_ = rank; }
void SetSize(int size) { size_ = size; }
void SetIface(const std::string& iface) { iface_ = iface; }
void SetPrefix(const std::string& prefix) { prefix_ = prefix; }
void SetHdfsStore(const std::string& path, const std::string& fs_name,
const std::string& fs_ugi) {
store_type_ = GlooStoreType::HDFS;
hdfs_path_ = path;
hdfs_name_ = fs_name;
hdfs_ugi_ = fs_ugi;
}
void SetHttpStore(const std::string& ip, int port, const std::string& scope) {
store_type_ = GlooStoreType::HTTP;
http_ip_ = ip;
http_port_ = port;
http_scope_ = scope;
}
void Barrier() {
......@@ -104,6 +148,8 @@ class GlooWrapper {
#ifdef PADDLE_WITH_GLOO
gloo::BarrierOptions opts(context_);
gloo::barrier(opts);
#else
LOG(WARNING) << "Barrier does nothing when WITH_GLOO=OFF";
#endif
}
......@@ -134,6 +180,8 @@ class GlooWrapper {
"AllReduce mode not known: " + mode));
}
gloo::allreduce(opts);
#else
LOG(WARNING) << "AllReduce does nothing when WITH_GLOO=OFF";
#endif
return recvbuf;
}
......@@ -147,6 +195,8 @@ class GlooWrapper {
opts.setInput(&input, 1);
opts.setOutput(ret.data(), size_);
gloo::allgather(opts);
#else
LOG(WARNING) << "AllGather does nothing when WITH_GLOO=OFF";
#endif
return std::move(ret);
}
......@@ -158,6 +208,19 @@ class GlooWrapper {
#endif
int rank_ = 0;
int size_ = 0;
std::chrono::seconds init_timeout_ = std::chrono::seconds(9999999);
std::chrono::seconds run_timeout_ = std::chrono::seconds(9999999);
std::string iface_ = "lo";
std::string prefix_;
GlooStoreType store_type_ = GlooStoreType::HDFS;
// configs for hdfs store
std::string hdfs_path_;
std::string hdfs_name_;
std::string hdfs_ugi_;
std::string http_ip_;
// configs for http store
int http_port_;
std::string http_scope_;
};
} // namespace framework
......
......@@ -17,6 +17,7 @@
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
#include "paddle/fluid/framework/fleet/gloo_wrapper.h"
#include "paddle/fluid/framework/io/fs.h"
#include "paddle/fluid/string/string_helper.h"
#if defined _WIN32 || defined __APPLE__
#else
......@@ -37,14 +38,23 @@ TEST(TEST_GLOO, store_1) {
}
store.wait(std::vector<std::string>{"test"});
store.wait(std::vector<std::string>{"test"}, std::chrono::milliseconds(0));
store.SetTimeoutSeconds(100000);
store.EncodeName("1");
store.TmpPath("1");
store.ObjectPath("1");
store.Check(std::vector<std::string>{"test"});
std::vector<bool> status(1, false);
store.Check(std::vector<std::string>{"test"}, &status);
auto gw = paddle::framework::GlooWrapper();
gw.Init(0, 1, "", "", "", "", "");
gw.Init(0, 1, "", "", "", "", "");
gw.SetTimeoutSeconds(1000, 1000);
gw.SetRank(0);
gw.SetSize(1);
gw.SetPrefix("");
gw.SetIface("lo");
gw.SetHdfsStore("", "", "");
gw.Init();
gw.SetHttpStore("", 8099, "");
gw.Init();
gw.Rank();
gw.Size();
gw.Barrier();
......@@ -63,5 +73,8 @@ TEST(TEST_FLEET, fleet_1) {
fleet->RunServer("", 0);
fleet->SaveModelOneTable(0, "", 0);
fleet->SaveModelOneTablePrefix(0, "", 0, "");
fleet->Confirm();
fleet->Revert();
paddle::string::erase_spaces("1 2");
#endif
}
......@@ -53,7 +53,9 @@ const std::unordered_set<std::string> op_has_unsed_vars_white_list = {
"precision_recall", // 1
"fusion_seqpool_cvm_concat", // 2
"fused_batch_norm_act", // 2
"fused_batch_norm_act_grad" // 2
"fused_batch_norm_act_grad", // 2
"data_norm", // 0
"data_norm_grad", // 0
};
namespace paddle {
......
......@@ -51,6 +51,17 @@ class DataNormOp : public framework::OperatorWithKernel {
PADDLE_ENFORCE(ctx->HasOutput("Means"), "");
PADDLE_ENFORCE(ctx->HasOutput("Scales"), "");
PADDLE_ENFORCE(ctx->HasOutput("Y"), "");
bool enable_scale_and_shift =
ctx->Attrs().Get<bool>("enable_scale_and_shift");
if (enable_scale_and_shift) {
PADDLE_ENFORCE_EQ(
ctx->HasInput("scale_w"), true,
platform::errors::InvalidArgument(
"Input(scale_w) of DataNormOp should not be null."));
PADDLE_ENFORCE_EQ(ctx->HasInput("bias"), true,
platform::errors::InvalidArgument(
"Input(bias) of DataNormOp should not be null."));
}
const auto x_dims = ctx->GetInputDim("X");
const DataLayout data_layout = framework::StringToDataLayout(
......@@ -72,6 +83,45 @@ class DataNormOp : public framework::OperatorWithKernel {
PADDLE_ENFORCE_EQ(ctx->GetInputDim("BatchSquareSum")[0], C);
}
if (enable_scale_and_shift) {
auto scale_dim = ctx->GetInputDim("scale_w");
auto bias_dim = ctx->GetInputDim("bias");
PADDLE_ENFORCE_EQ(
scale_dim.size(), 1UL,
platform::errors::InvalidArgument("the dimensionof scale"
"must equal to 1. But received: "
"the shape of scale is [%s], "
"the dimensionof scale is [%d]",
scale_dim, scale_dim.size()));
PADDLE_ENFORCE_EQ(
bias_dim.size(), 1UL,
platform::errors::InvalidArgument("the dimension of bias"
"must equal to 1. But received: "
"the shape of bias is [%s],"
"the dimension of bias is [%d]",
bias_dim, bias_dim.size()));
bool check = true;
if ((!ctx->IsRuntime()) && (framework::product(scale_dim) <= 0 ||
framework::product(bias_dim) <= 0)) {
check = false;
}
if (check) {
PADDLE_ENFORCE_EQ(scale_dim[0], C,
platform::errors::InvalidArgument(
"the shape of scale must equal to [%d]"
"But received: the shape of scale is [%d]",
C, scale_dim[0]));
PADDLE_ENFORCE_EQ(bias_dim[0], C,
platform::errors::InvalidArgument(
"the shape of bias must equal to [%d]"
"But received: the shape of bias is [%d]",
C, bias_dim[0]));
}
}
ctx->SetOutputDim("Y", x_dims);
ctx->SetOutputDim("Means", {C});
ctx->SetOutputDim("Scales", {C});
......@@ -99,6 +149,17 @@ class DataNormOp : public framework::OperatorWithKernel {
ctx, "BatchSquareSum"),
"BatchSquareSum input should be of float type");
bool enable_scale_and_shift = ctx.Attr<bool>("enable_scale_and_shift");
if (enable_scale_and_shift) {
PADDLE_ENFORCE_EQ(dn_param_type,
OperatorWithKernel::IndicateVarDataType(ctx, "scale_w"),
platform::errors::InvalidArgument(
"scale_w input should be of float type"));
PADDLE_ENFORCE_EQ(dn_param_type,
OperatorWithKernel::IndicateVarDataType(ctx, "bias"),
platform::errors::InvalidArgument(
"bias input should be of float type"));
}
// TODO(pzelazko-intel): enable MKLDNN layout when it's ready
framework::LibraryType library = framework::LibraryType::kPlain;
framework::DataLayout layout = framework::DataLayout::kAnyLayout;
......@@ -133,6 +194,19 @@ class DataNormOpMaker : public framework::OpProtoAndCheckerMaker {
"summary_decay_rate",
"(float, default 0.9999999) The decay rate when update the summary")
.SetDefault(0.9999999);
AddAttr<bool>(
"enable_scale_and_shift",
"(bool, default false) Set to true to enable scale and shift such as "
"batch_norm op")
.SetDefault(false);
AddInput("scale_w",
"scale_w is a 1-dimensional tensor of size C "
"that is applied to the output")
.AsDispensable();
AddInput("bias",
"bias is a 1-dimensional tensor of size C "
"that is applied to the output")
.AsDispensable();
AddAttr<std::string>("data_layout", "").SetDefault("NCHW");
AddAttr<bool>("sync_stats", "(bool, default false) only used in multi-GPU")
.SetDefault(false);
......@@ -194,7 +268,6 @@ class DataNormKernel<platform::CPUDeviceContext, T>
// alloc memory
T *y_data = y->mutable_data<T>(ctx.GetPlace());
Eigen::Array<T, Eigen::Dynamic, 1> inv_std(C);
ConstEigenVectorArrayMap<T> b_size_arr(
ctx.Input<Tensor>("BatchSize")->data<T>(), C);
ConstEigenVectorArrayMap<T> b_sum_arr(
......@@ -210,6 +283,7 @@ class DataNormKernel<platform::CPUDeviceContext, T>
const T *means_data = mean_out->data<T>();
const T *x_data = x->data<T>();
const T *scales_data = scales->data<T>();
const int slot_dim = ctx.Attr<int>("slot_dim");
T min_precision = 1e-7f;
......@@ -218,7 +292,8 @@ class DataNormKernel<platform::CPUDeviceContext, T>
case DataLayout::kNHWC: {
// if slot_dim is set and batch size is larger than zero, we choose
// to check if show number is zero, if so, skip normalization.
if (slot_dim > 0 && N > 0) {
if (slot_dim > 0 && N > 0 &&
(!ctx.Attr<bool>("enable_scale_and_shift"))) {
const int item_size = x->numel() / N;
// location of show number in one embedding
int offset = 0;
......@@ -239,10 +314,56 @@ class DataNormKernel<platform::CPUDeviceContext, T>
offset += item_size;
}
} else {
EigenArrayMap<T>(y_data, C, N) =
(ConstEigenArrayMap<T>(x->data<T>(), C, N).colwise() - means_arr)
.colwise() *
scales_arr;
if (!ctx.Attr<bool>("enable_scale_and_shift") && slot_dim <= 0) {
EigenArrayMap<T>(y_data, C, N) =
(ConstEigenArrayMap<T>(x->data<T>(), C, N).colwise() -
means_arr)
.colwise() *
scales_arr;
} else if (ctx.Attr<bool>("enable_scale_and_shift") &&
slot_dim <= 0) {
const auto *scale_w = ctx.Input<Tensor>("scale_w");
const auto *bias = ctx.Input<Tensor>("bias");
ConstEigenVectorArrayMap<T> scale_w_arr(scale_w->data<T>(), C);
ConstEigenVectorArrayMap<T> bias_arr(bias->data<T>(), C);
Eigen::Array<T, Eigen::Dynamic, 1> new_scale =
scales_arr * scale_w_arr;
Eigen::Array<T, Eigen::Dynamic, 1> new_bias =
bias_arr - means_arr * scales_arr * scale_w_arr;
EigenArrayMap<T>(y_data, C, N) =
(ConstEigenArrayMap<T>(x->data<T>(), C, N).colwise() *
new_scale)
.colwise() +
new_bias;
} else {
const int item_size = x->numel() / N;
const auto *scale_w = ctx.Input<Tensor>("scale_w");
const auto *bias = ctx.Input<Tensor>("bias");
const T *scale_w_data = scale_w->data<T>();
const T *bias_data = bias->data<T>();
// location of show number in one embedding
int offset = 0;
for (int k = 0; k < N; ++k) {
for (int i = 0; i < item_size; i += slot_dim) {
if (x_data[offset + i] > -min_precision &&
x_data[offset + i] < min_precision) {
// show = 0
memset(y_data + offset + i, 0, sizeof(T) * slot_dim);
} else {
for (int j = i; j < i + slot_dim; ++j) {
y_data[offset + j] = ((x_data[offset + j] - means_data[j]) *
scales_data[j]) *
scale_w_data[j] +
bias_data[j];
}
}
} // end for i
offset += item_size;
} // end for k
}
}
break;
}
......@@ -274,7 +395,8 @@ class DataNormGradOp : public framework::OperatorWithKernel {
"Output(BatchSquareSum) of DataNormGradOp should not be null."));
PADDLE_ENFORCE(ctx->HasInput("Means"), "");
PADDLE_ENFORCE(ctx->HasInput("Scales"), "");
bool enable_scale_and_shift =
ctx->Attrs().Get<bool>("enable_scale_and_shift");
// check output
PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("BatchSize")), "");
PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("BatchSum")), "");
......@@ -294,6 +416,22 @@ class DataNormGradOp : public framework::OperatorWithKernel {
ctx->SetOutputDim(framework::GradVarName("BatchSize"), {C});
ctx->SetOutputDim(framework::GradVarName("BatchSum"), {C});
ctx->SetOutputDim(framework::GradVarName("BatchSquareSum"), {C});
if (enable_scale_and_shift) {
const bool has_scale_grad =
ctx->HasOutput(framework::GradVarName("scale_w"));
const bool has_bias_grad = ctx->HasOutput(framework::GradVarName("bias"));
PADDLE_ENFORCE_EQ((has_scale_grad == has_bias_grad), true,
platform::errors::InvalidArgument(
"Output(Scale@GRAD) and Output(Bias@GRAD)"
"must be null or not be null at same time. "
"But now, has Scale@Grad=[%d], has Bias@GRAD=[%d]",
has_scale_grad, has_bias_grad));
if (has_scale_grad) {
ctx->SetOutputDim(framework::GradVarName("scale_w"), {C});
ctx->SetOutputDim(framework::GradVarName("bias"), {C});
}
}
}
protected:
......@@ -353,18 +491,23 @@ class DataNormGradKernel<platform::CPUDeviceContext, T>
const int C =
(data_layout == DataLayout::kNCHW ? x_dims[1]
: x_dims[x_dims.size() - 1]);
// init output
Tensor *d_x = nullptr;
if (ctx.HasOutput(framework::GradVarName("X"))) {
d_x = ctx.Output<Tensor>(framework::GradVarName("X"));
}
auto *d_batch_size =
ctx.Output<Tensor>(framework::GradVarName("BatchSize"));
auto *d_batch_sum = ctx.Output<Tensor>(framework::GradVarName("BatchSum"));
auto *d_batch_square_sum =
ctx.Output<Tensor>(framework::GradVarName("BatchSquareSum"));
const T *mean_data = means->data<T>();
const T *inv_var_data = scales->data<T>();
ConstEigenVectorArrayMap<T> mean_arr(mean_data, C);
ConstEigenVectorArrayMap<T> inv_var_arr(inv_var_data, C);
T *d_batch_size_data = d_batch_size->mutable_data<T>(ctx.GetPlace());
T *d_batch_sum_data = d_batch_sum->mutable_data<T>(ctx.GetPlace());
T *d_batch_square_sum_data =
......@@ -372,7 +515,6 @@ class DataNormGradKernel<platform::CPUDeviceContext, T>
EigenVectorArrayMap<T> d_batch_size_arr(d_batch_size_data, C);
EigenVectorArrayMap<T> d_batch_sum_arr(d_batch_sum_data, C);
EigenVectorArrayMap<T> d_batch_square_sum_arr(d_batch_square_sum_data, C);
d_batch_size_arr.setZero();
d_batch_sum_arr.setZero();
d_batch_square_sum_arr.setZero();
......@@ -392,8 +534,86 @@ class DataNormGradKernel<platform::CPUDeviceContext, T>
if (d_x != nullptr) {
EigenArrayMap<T> d_x_arr(d_x->mutable_data<T>(ctx.GetPlace()), C, N);
d_x_arr.setZero();
for (int nc = 0; nc < N; ++nc) {
d_x_arr.col(nc) = d_y_arr.col(nc) * scales_arr;
if (!ctx.Attr<bool>("enable_scale_and_shift")) {
for (int nc = 0; nc < N; ++nc) {
d_x_arr.col(nc) = d_y_arr.col(nc) * scales_arr;
}
} else {
const auto *scale_w = ctx.Input<Tensor>("scale_w");
auto *d_scale =
ctx.Output<Tensor>(framework::GradVarName("scale_w"));
auto *d_bias = ctx.Output<Tensor>(framework::GradVarName("bias"));
ConstEigenVectorArrayMap<T> scale_arr(scale_w->data<T>(), C);
T *d_bias_data = nullptr;
T *d_scale_data = nullptr;
d_scale->mutable_data<T>(ctx.GetPlace());
d_bias->mutable_data<T>(ctx.GetPlace());
d_bias_data = d_bias->mutable_data<T>(ctx.GetPlace());
d_scale_data = d_scale->mutable_data<T>(ctx.GetPlace());
EigenVectorArrayMap<T> d_bias_arr(d_bias_data, C);
EigenVectorArrayMap<T> d_scale_arr(d_scale_data, C);
Tensor dy_sum;
dy_sum.Resize({C});
dy_sum.mutable_data<T>(ctx.GetPlace());
EigenVectorArrayMap<T> dy_sum_arr(
dy_sum.mutable_data<T>(ctx.GetPlace()), C);
Tensor dy_mul_x_sub_mean_mul_invstd_sum;
dy_mul_x_sub_mean_mul_invstd_sum.Resize({C});
dy_mul_x_sub_mean_mul_invstd_sum.mutable_data<T>(ctx.GetPlace());
EigenVectorArrayMap<T> dy_mul_x_sub_mean_mul_invstd_sum_arr(
dy_mul_x_sub_mean_mul_invstd_sum.mutable_data<T>(
ctx.GetPlace()),
C);
dy_sum_arr.setZero();
dy_mul_x_sub_mean_mul_invstd_sum_arr.setZero();
if (slot_dim <= 0) {
for (int n = 0; n < N; ++n) {
dy_sum_arr += d_y_arr.col(n);
dy_mul_x_sub_mean_mul_invstd_sum_arr +=
((x_arr.col(n) - mean_arr) * inv_var_arr * d_y_arr.col(n));
}
if (d_scale && d_bias) {
d_bias_arr = dy_sum_arr;
d_scale_arr = dy_mul_x_sub_mean_mul_invstd_sum_arr;
}
for (int nc = 0; nc < N; ++nc) {
d_x_arr.col(nc) = d_y_arr.col(nc) * scales_arr * scale_arr;
}
} else {
int offset = 0;
const int item_size = x->numel() / N;
T *d_x_data = d_x->mutable_data<T>(ctx.GetPlace());
T *d_scale_data = d_scale->mutable_data<T>(ctx.GetPlace());
T *d_bias_data = d_bias->mutable_data<T>(ctx.GetPlace());
const T *dy_data = d_y->data<T>();
const T *scales_data = scales->data<T>();
const T *scale_w_data = scale_w->data<T>();
const T *x_data = x->data<T>();
for (int i = 0; i < item_size; i++) {
d_bias_data[i] = 0;
d_scale_data[i] = 0;
}
for (int k = 0; k < N; ++k) {
for (int i = 0; i < item_size; i += slot_dim) {
if (!(x_data[offset + i] > -min_precision &&
x_data[offset + i] < min_precision)) {
// show != 0
for (int j = i; j < i + slot_dim; ++j) {
d_x_data[offset + j] = dy_data[offset + j] *
scales_data[j] * scale_w_data[j];
d_bias_data[j] += dy_data[offset + j];
d_scale_data[j] += (x_data[offset + j] - mean_data[j]) *
inv_var_data[j] * dy_data[offset + j];
}
}
}
offset += item_size;
}
}
}
}
......@@ -466,6 +686,8 @@ class DataNormGradMaker : public framework::SingleGradOpMaker<T> {
op->SetInput("X", this->Input("X"));
op->SetInput(framework::GradVarName("Y"), this->OutputGrad("Y"));
op->SetInput("scale_w", this->Input("scale_w"));
op->SetInput("bias", this->Input("bias"));
op->SetOutput("BatchSize", this->Input("BatchSize"));
op->SetOutput("BatchSum", this->Input("BatchSum"));
op->SetOutput("BatchSquareSum", this->Input("BatchSquareSum"));
......@@ -481,6 +703,9 @@ class DataNormGradMaker : public framework::SingleGradOpMaker<T> {
this->InputGrad("BatchSum"));
op->SetOutput(framework::GradVarName("BatchSquareSum"),
this->InputGrad("BatchSquareSum"));
op->SetOutput(framework::GradVarName("scale_w"),
this->InputGrad("scale_w"));
op->SetOutput(framework::GradVarName("bias"), this->InputGrad("bias"));
}
};
......
......@@ -78,6 +78,8 @@ void BindFleetWrapper(py::module* m) {
&framework::FleetWrapper::SetClient2ClientConfig)
.def("set_pull_local_thread_num",
&framework::FleetWrapper::SetPullLocalThreadNum)
.def("confirm", &framework::FleetWrapper::Confirm)
.def("revert", &framework::FleetWrapper::Revert)
.def("save_model_one_table", &framework::FleetWrapper::SaveModelOneTable)
.def("save_model_one_table_with_prefix",
&framework::FleetWrapper::SaveModelOneTablePrefix)
......
......@@ -37,11 +37,20 @@ void BindGlooWrapper(py::module* m) {
.def("rank", &framework::GlooWrapper::Rank)
.def("size", &framework::GlooWrapper::Size)
.def("barrier", &framework::GlooWrapper::Barrier)
.def("set_timeout_seconds", &framework::GlooWrapper::SetTimeoutSeconds)
.def("set_rank", &framework::GlooWrapper::SetRank)
.def("set_size", &framework::GlooWrapper::SetSize)
.def("set_iface", &framework::GlooWrapper::SetIface)
.def("set_prefix", &framework::GlooWrapper::SetPrefix)
.def("set_hdfs_store", &framework::GlooWrapper::SetHdfsStore)
.def("set_http_store", &framework::GlooWrapper::SetHttpStore)
.def("all_reduce", &framework::GlooWrapper::AllReduce<uint64_t>)
.def("all_reduce", &framework::GlooWrapper::AllReduce<int64_t>)
.def("all_reduce", &framework::GlooWrapper::AllReduce<float>)
.def("all_reduce", &framework::GlooWrapper::AllReduce<double>)
.def("all_gather", &framework::GlooWrapper::AllGather<uint64_t>)
.def("all_gather", &framework::GlooWrapper::AllGather<int64_t>)
.def("all_gather", &framework::GlooWrapper::AllGather<float>)
.def("all_gather", &framework::GlooWrapper::AllGather<double>);
} // end BindGlooWrapper
} // end namespace pybind
......
......@@ -61,6 +61,19 @@ std::string trim_spaces(const std::string& str) {
return std::string(p, len);
}
std::string erase_spaces(const std::string& str) {
std::string result;
result.reserve(str.size());
const char* p = str.c_str();
while (*p != 0) {
if (!isspace(*p)) {
result.append(p, 1);
}
++p;
}
return result;
}
inline int str_to_float(const char* str, float* v) {
const char* head = str;
char* cursor = NULL;
......
......@@ -62,6 +62,9 @@ std::string format_string(const std::string& fmt, ARGS&&... args) {
// remove leading and tailing spaces
std::string trim_spaces(const std::string& str);
// erase all spaces in str
std::string erase_spaces(const std::string& str);
int str_to_float(const char* str, float* v);
// split string by delim
......
......@@ -14,6 +14,7 @@
"""Defination of Role Makers."""
from __future__ import print_function
from multiprocessing import Process, Manager
import paddle.fluid as fluid
import os
import time
......@@ -556,7 +557,21 @@ class GeneralRoleMaker(RoleMakerBase):
self._role_is_generated = False
self._hdfs_name = kwargs.get("hdfs_name", "")
self._hdfs_ugi = kwargs.get("hdfs_ugi", "")
self._hdfs_path = kwargs.get("path", "")
self._hdfs_path = kwargs.get("path", "").rstrip("/")
self._init_timeout_seconds = kwargs.get("init_timeout_seconds", 3600)
self._run_timeout_seconds = kwargs.get("run_timeout_seconds", 9999999)
ip_port = kwargs.get("http_ip_port", "")
self._http_ip_port = []
self._http_server = None
# if ip_port is not empty, it will use http instead of hdfs
if ip_port != "":
self._http_ip_port = ip_port.split(":")
# it's for communication between processes
self._manager = Manager()
# global dict to store status
self._http_server_d = self._manager.dict()
# set running status of http server
self._http_server_d["running"] = False
self._iface = self.__get_default_iface()
# this environment variable can be empty
self._prefix = os.getenv("SYS_JOB_ID", "")
......@@ -572,17 +587,41 @@ class GeneralRoleMaker(RoleMakerBase):
trainers_num = len(worker_endpoints)
if training_role not in ["TRAINER", "PSERVER"]:
raise ValueError("TRAINING_ROLE must be PSERVER or TRAINER")
if training_role == "TRAINER":
role = Role.WORKER
current_id = int(os.environ["PADDLE_TRAINER_ID"])
if current_id == 0 and len(self._http_ip_port) != 0:
size_d = {
"trainer": len(worker_endpoints),
"pserver": len(eplist),
"all": len(worker_endpoints) + len(eplist)
}
# child process for http server
self._http_server = Process(
target=self.__start_kv_server,
args=(self._http_server_d, size_d))
self._http_server.daemon = True
# set running status to True
self._http_server_d["running"] = True
# start child process
self._http_server.start()
self._node_type = 1
self._cur_endpoint = worker_endpoints[current_id]
gloo = fluid.core.Gloo()
gloo.init(current_id,
len(worker_endpoints),
self._hdfs_path.rstrip("/") + "/trainer",
self._hdfs_name, self._hdfs_ugi, self._iface,
self._prefix)
gloo.set_rank(current_id)
gloo.set_size(len(worker_endpoints))
gloo.set_prefix(self._prefix)
gloo.set_iface(self._iface)
gloo.set_timeout_seconds(self._init_timeout_seconds,
self._run_timeout_seconds)
if len(self._http_ip_port) != 0:
gloo.set_http_store(self._http_ip_port[0],
int(self._http_ip_port[1]), "trainer")
else:
gloo.set_hdfs_store(self._hdfs_path + "/trainer",
self._hdfs_name, self._hdfs_ugi)
gloo.init()
self._node_type_comm = gloo
elif training_role == "PSERVER":
role = Role.SERVER
......@@ -598,20 +637,36 @@ class GeneralRoleMaker(RoleMakerBase):
self._node_type = 0
self._cur_endpoint = cur_endpoint
gloo = fluid.core.Gloo()
gloo.init(current_id,
len(eplist),
self._hdfs_path.rstrip("/") + "/pserver",
self._hdfs_name, self._hdfs_ugi, self._iface,
self._prefix)
gloo.set_rank(current_id)
gloo.set_size(len(eplist))
gloo.set_prefix(self._prefix)
gloo.set_iface(self._iface)
gloo.set_timeout_seconds(self._init_timeout_seconds,
self._run_timeout_seconds)
if len(self._http_ip_port) != 0:
gloo.set_http_store(self._http_ip_port[0],
int(self._http_ip_port[1]), "pserver")
else:
gloo.set_hdfs_store(self._hdfs_path + "/pserver",
self._hdfs_name, self._hdfs_ugi)
gloo.init()
self._node_type_comm = gloo
gloo = fluid.core.Gloo()
all_list = worker_endpoints + eplist
gloo.init(
all_list.index(self._cur_endpoint),
len(all_list),
self._hdfs_path.rstrip("/") + "/all", self._hdfs_name,
self._hdfs_ugi, self._iface, self._prefix)
gloo.set_rank(all_list.index(self._cur_endpoint))
gloo.set_size(len(all_list))
gloo.set_prefix(self._prefix)
gloo.set_iface(self._iface)
gloo.set_timeout_seconds(self._init_timeout_seconds,
self._run_timeout_seconds)
if len(self._http_ip_port) != 0:
gloo.set_http_store(self._http_ip_port[0],
int(self._http_ip_port[1]), "all")
else:
gloo.set_hdfs_store(self._hdfs_path + "/all", self._hdfs_name,
self._hdfs_ugi)
gloo.init()
self._all_comm = gloo
self._trainers_num = trainers_num
self._server_endpoints = eplist
......@@ -620,6 +675,11 @@ class GeneralRoleMaker(RoleMakerBase):
self._rank = all_list.index(self._cur_endpoint)
self._size = len(all_list)
self._worker_endpoints = worker_endpoints
if self._http_server is not None:
# set running status to False
self._http_server_d["running"] = False
# wait until child process exits
self._http_server.join()
self._role_is_generated = True
def all_gather(self, input):
......@@ -872,6 +932,16 @@ class GeneralRoleMaker(RoleMakerBase):
return intf_name
return "lo"
def __start_kv_server(self, http_server_d, size_d):
from paddle.fluid.incubate.fleet.utils.http_server import KVServer
http_server = KVServer(int(self._http_ip_port[1]), size_d)
http_server.start()
wait_seconds = 5
while http_server_d.get("running",
False) and not http_server.shoud_stop():
time.sleep(wait_seconds)
http_server.stop()
class UserDefinedRoleMaker(RoleMakerBase):
"""
......
......@@ -567,6 +567,24 @@ class PSLib(Fleet):
model_proto_file, table_var_names, load_combine)
self._role_maker._barrier_worker()
def confirm(self):
"""
confirm all the updated params in current pass
"""
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.confirm()
self._role_maker._barrier_worker()
def revert(self):
"""
revert all the updated params in current pass
"""
self._role_maker._barrier_worker()
if self._role_maker.is_first_worker():
self._fleet_ptr.revert()
self._role_maker._barrier_worker()
def load_model(self, model_dir=None, **kwargs):
"""
load pslib model, there are at least 4 modes, these modes are the same
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Http Server."""
import logging
import BaseHTTPServer
import SimpleHTTPServer
import time
import threading
import socket
def get_logger(name, level, fmt):
logger = logging.getLogger(name)
logger.setLevel(level)
handler = logging.FileHandler('http.log', mode='w')
formatter = logging.Formatter(fmt=fmt)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
_http_server_logger = get_logger(
__name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s')
class KVHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
"""
kv handler class for kv http server,
it defines the way to get/set kv in server.
"""
def do_GET(self):
"""
get method for kv handler, get value according to key.
"""
log_str = "GET " + self.address_string() + self.path
paths = self.path.split('/')
if len(paths) < 3:
print('len of request path must be 3: ' + self.path)
self.send_status_code(400)
return
_, scope, key = paths
with self.server.kv_lock:
value = self.server.kv.get(scope, {}).get(key)
if value is None:
log_str += ' , key not found: ' + key
self.send_status_code(404)
else:
log_str += ' , key found: ' + key
self.send_response(200)
self.send_header("Content-Length", str(len(value)))
self.end_headers()
self.wfile.write(value)
_http_server_logger.info(log_str)
def do_PUT(self):
"""
put method for kv handler, set value according to key.
"""
log_str = "PUT " + self.address_string() + self.path
paths = self.path.split('/')
if len(paths) < 3:
print('len of request path must be 3: ' + self.path)
self.send_status_code(400)
return
_, scope, key = paths
content_length = int(self.headers['Content-Length'])
try:
value = self.rfile.read(content_length)
except:
print("receive error invalid request")
self.send_status_code(404)
return
with self.server.kv_lock:
if self.server.kv.get(scope) is None:
self.server.kv[scope] = {}
self.server.kv[scope][key] = value
self.send_status_code(200)
_http_server_logger.info(log_str)
def do_DELETE(self):
"""
delete method for kv handler, set value according to key.
"""
log_str = "DELETE " + self.address_string() + self.path
paths = self.path.split('/')
if len(paths) < 3:
print('len of request path must be 3: ' + self.path)
self.send_status_code(400)
return
_, scope, key = paths
with self.server.delete_kv_lock:
if self.server.delete_kv.get(scope) is None:
self.server.delete_kv[scope] = []
self.server.delete_kv[scope].append(key)
self.send_status_code(200)
_http_server_logger.info(log_str)
def log_message(self, format, *args):
"""
ignore all logging messages in kv handler.
"""
pass
def send_status_code(self, code):
"""
send status code back to client.
"""
self.send_response(code)
self.send_header("Content-Length", 0)
self.end_headers()
class KVHTTPServer(BaseHTTPServer.HTTPServer, object):
"""
it is a http server storing kv pairs.
"""
def __init__(self, port, handler):
"""Init."""
super(KVHTTPServer, self).__init__(('', port), handler)
self.delete_kv_lock = threading.Lock()
self.delete_kv = {}
self.kv_lock = threading.Lock()
self.kv = {}
def get_deleted_size(self, key):
"""
get deleted size in key.
"""
ret = 0
with self.delete_kv_lock:
ret = self.delete_kv.get(key, 0)
return ret
class KVServer:
"""
it is a server storing kv pairs, has a http server inside.
"""
def __init__(self, port, size={}):
"""Init."""
self.http_server = KVHTTPServer(port, KVHandler)
self.listen_thread = None
self.size = {}
def start(self):
"""
start server until user calls stop to let it quit.
"""
self.listen_thread = threading.Thread(
target=lambda: self.http_server.serve_forever())
self.listen_thread.start()
def stop(self):
"""
stop server and clear its resources.
"""
self.http_server.shutdown()
self.listen_thread.join()
self.http_server.server_close()
def shoud_stop(self):
"""
return whether the server should stop.
Returns:
ret(bool): whether the server should stop
"""
for key in self.size:
s = self.http_server.get_deleted_size(key)
if s != self.size.get(key, 0):
return False
return True
......@@ -4540,7 +4540,8 @@ def data_norm(input,
do_model_average_for_mean_and_var=True,
slot_dim=-1,
sync_stats=False,
summary_decay_rate=0.9999999):
summary_decay_rate=0.9999999,
enable_scale_and_shift=False):
"""
**Data Normalization Layer**
......@@ -4589,6 +4590,7 @@ def data_norm(input,
sync_stats(bool, Default False): When running with multiple GPU cards, using allreduce to sync the
summary messages.
summary_decay_rate(float, Default 0.9999999): The decay rate when updating summary.
enable_scale_and_shift(bool, Default False): do scale&shift after normalization.
Returns:
Variable: A tensor variable which is the result after applying data normalization on the input.
......@@ -4619,12 +4621,35 @@ def data_norm(input,
batch_size_default = 1e4
batch_sum_default = 0.0
batch_square_sum_default = 1e4
scale_w_default = 1.0
bias_default = 0.0
if param_attr and isinstance(param_attr, dict):
batch_size_default = param_attr.get("batch_size", 1e4)
batch_sum_default = param_attr.get("batch_sum", 0.0)
batch_square_sum_default = param_attr.get("batch_square", 1e4)
if enable_scale_and_shift:
scale_w_default = param_attr.get("scale_w", 1.0)
bias_default = param_attr.get("bias", 0.0)
# create scale and shift(bias) when enable_scale_and_shift is True
if name == None:
name = "dn"
if enable_scale_and_shift:
scale_w = helper.create_parameter(
attr=ParamAttr(
name=name + '.scale_w',
initializer=Constant(value=float(scale_w_default)),
trainable=True),
shape=param_shape,
dtype=input.dtype)
bias = helper.create_parameter(
attr=ParamAttr(
name=name + '.bias',
initializer=Constant(value=float(bias_default)),
trainable=True),
shape=param_shape,
dtype=input.dtype)
# create parameter
batch_size = helper.create_parameter(
attr=ParamAttr(
......@@ -4655,14 +4680,18 @@ def data_norm(input,
data_norm_out = input if in_place else helper.create_variable(dtype=dtype)
inputs = {
"X": input,
"BatchSize": batch_size,
"BatchSum": batch_sum,
"BatchSquareSum": batch_square_sum
}
if enable_scale_and_shift:
inputs["scale_w"] = scale_w
inputs["bias"] = bias
helper.append_op(
type="data_norm",
inputs={
"X": input,
"BatchSize": batch_size,
"BatchSum": batch_sum,
"BatchSquareSum": batch_square_sum
},
inputs=inputs,
outputs={
"Y": data_norm_out,
"Means": means,
......@@ -4675,7 +4704,8 @@ def data_norm(input,
"epsilon": epsilon,
"slot_dim": slot_dim,
"sync_stats": sync_stats,
"summary_decay_rate": summary_decay_rate
"summary_decay_rate": summary_decay_rate,
"enable_scale_and_shift": enable_scale_and_shift
})
return helper.append_activation(data_norm_out)
......
......@@ -24,6 +24,7 @@ import paddle.fluid.layers as layers
import os
from op_test import OpTest
from paddle.fluid.framework import grad_var_name
from paddle.fluid import Program, program_guard
def _reference_testing(x, batch_size, batch_sum, batch_square_sum, slot_dim=-1):
......@@ -72,7 +73,13 @@ class TestDataNormOpInference(unittest.TestCase):
def __assert_close(self, tensor, np_array, msg, atol=1e-4):
self.assertTrue(np.allclose(np.array(tensor), np_array, atol=atol), msg)
def check_with_place(self, place, data_layout, dtype, shape, slot_dim=-1):
def check_with_place(self,
place,
data_layout,
dtype,
shape,
slot_dim=-1,
enable_scale_and_shift=False):
"""
do forward and check
......@@ -82,7 +89,7 @@ class TestDataNormOpInference(unittest.TestCase):
dtype(dtype): np.float32
shape(list): input shape
slot_dim(int): dimension of one slot. Refer to data_norm api.
enable_scale_and_shift(bool): if enable scale and shift after normalization.
"""
epsilon = 0.00001
......@@ -127,21 +134,49 @@ class TestDataNormOpInference(unittest.TestCase):
mean_tensor = create_or_get_tensor(scope, "mean", None, place)
scales_tensor = create_or_get_tensor(scope, "scales", None, place)
data_norm_op = Operator(
"data_norm",
# inputs
X="x_val",
BatchSize="batch_size",
BatchSum="batch_sum",
BatchSquareSum="batch_square_sum",
# outputs
Y="y_out",
Means="mean",
Scales="scales",
# attrs
epsilon=epsilon,
use_mkldnn=self.use_mkldnn,
slot_dim=slot_dim)
if not enable_scale_and_shift:
data_norm_op = Operator(
"data_norm",
# inputs
X="x_val",
BatchSize="batch_size",
BatchSum="batch_sum",
BatchSquareSum="batch_square_sum",
# outputs
Y="y_out",
Means="mean",
Scales="scales",
# attrs
epsilon=epsilon,
use_mkldnn=self.use_mkldnn,
slot_dim=slot_dim,
enable_scale_and_shift=False)
else:
scale_w = np.ones(scale_shape).astype(np.float32)
bias = np.zeros(scale_shape).astype(np.float32)
scale_w_tensor = create_or_get_tensor(
scope, "scale_w",
OpTest.np_dtype_to_fluid_dtype(scale_w), place)
bias_tensor = create_or_get_tensor(
scope, "bias", OpTest.np_dtype_to_fluid_dtype(bias), place)
data_norm_op = Operator(
"data_norm",
# inputs
X="x_val",
BatchSize="batch_size",
BatchSum="batch_sum",
BatchSquareSum="batch_square_sum",
scale_w="scale_w",
bias="bias",
# outputs
Y="y_out",
Means="mean",
Scales="scales",
# attrs
epsilon=epsilon,
use_mkldnn=self.use_mkldnn,
slot_dim=slot_dim,
enable_scale_and_shift=True)
data_norm_op.run(scope, place)
......@@ -162,11 +197,13 @@ class TestDataNormOpInference(unittest.TestCase):
for place in places:
for data_format in ["NCHW", "NHWC"]:
for slot_dim in [-1, 1]:
self.check_with_place(
place,
data_format,
self.dtype, [2, 3],
slot_dim=slot_dim)
for enable_scale_and_shift in [False, True]:
self.check_with_place(
place,
data_format,
self.dtype, [2, 3],
slot_dim=slot_dim,
enable_scale_and_shift=enable_scale_and_shift)
class TestDataNormOp(OpTest):
......@@ -220,6 +257,130 @@ class TestDataNormOp(OpTest):
self.check_grad(['X'], 'Y', no_grad_set=set([]))
class TestDataNormOpWithEnableScaleAndShift(OpTest):
"""
test class for data norm op
test forward and backward
"""
def setUp(self):
"""
init data norm op test env
"""
self.op_type = 'data_norm'
self.use_mkldnn = False
epsilon = 0.00001
slot_dim = -1
enable_scale_and_shitf = True
x_shape = [2, 50]
scale_shape = [50]
tp = np.float32
x_val = np.random.uniform(-1, 1, x_shape).astype(tp)
batch_size = np.ones(scale_shape).astype(tp)
batch_size *= 1e4
batch_sum = np.zeros(scale_shape).astype(tp)
batch_square_sum = np.ones(scale_shape).astype(tp)
batch_square_sum *= 1e4
scale_w = np.ones(scale_shape).astype(tp)
bias = np.zeros(scale_shape).astype(tp)
y = np.array(x_val)
mean = np.zeros(x_shape).astype(tp)
scale = np.ones(x_shape).astype(tp)
self.inputs = {
"X": x_val,
"BatchSize": batch_size,
"BatchSum": batch_sum,
"BatchSquareSum": batch_square_sum,
"scale_w": scale_w,
"bias": bias
}
self.outputs = {"Y": y, "Means": mean, "Scales": scale}
self.attrs = {
"epsilon": epsilon,
"use_mkldnn": self.use_mkldnn,
"slot_dim": slot_dim,
"enable_scale_and_shift": True
}
def test_check_output(self):
"""
test check forward, check output
"""
self.check_output()
def test_check_grad(self):
"""
test check backward, check grad
"""
self.check_grad(['X'], 'Y', no_grad_set=set([]))
class TestDataNormOpWithEnableScaleAndShift_1(OpTest):
"""
test class for data norm op
test forward and backward
"""
def setUp(self):
"""
init data norm op test env
"""
self.op_type = 'data_norm'
self.use_mkldnn = False
epsilon = 0.00001
slot_dim = 1
enable_scale_and_shitf = True
x_shape = [2, 50]
scale_shape = [50]
tp = np.float32
x_val = np.random.uniform(-1, 1, x_shape).astype(tp)
batch_size = np.ones(scale_shape).astype(tp)
batch_size *= 1e4
batch_sum = np.zeros(scale_shape).astype(tp)
batch_square_sum = np.ones(scale_shape).astype(tp)
batch_square_sum *= 1e4
scale_w = np.ones(scale_shape).astype(tp)
bias = np.zeros(scale_shape).astype(tp)
y = np.array(x_val)
mean = np.zeros(x_shape).astype(tp)
scale = np.ones(x_shape).astype(tp)
self.inputs = {
"X": x_val,
"BatchSize": batch_size,
"BatchSum": batch_sum,
"BatchSquareSum": batch_square_sum,
"scale_w": scale_w,
"bias": bias
}
self.outputs = {"Y": y, "Means": mean, "Scales": scale}
self.attrs = {
"epsilon": epsilon,
"use_mkldnn": self.use_mkldnn,
"slot_dim": slot_dim,
"enable_scale_and_shift": True
}
def test_check_output(self):
"""
test check forward, check output
"""
self.check_output()
def test_check_grad(self):
"""
test check backward, check grad
"""
self.check_grad(['X'], 'Y', no_grad_set=set([]))
class TestDataNormOpWithSlotDim(OpTest):
"""
test class for data norm op
......@@ -399,5 +560,14 @@ class TestDataNormOpWithSyncStats(unittest.TestCase):
os.remove(f)
class TestDataNormOpErrorr(unittest.TestCase):
def test_errors(self):
with program_guard(Program(), Program()):
x2 = fluid.layers.data(name='x2', shape=[3, 4], dtype="int32")
#self.assertRaises(TypeError, fluid.data_norm, x2)
fluid.layers.data_norm(
input=x2, param_attr={}, enable_scale_and_shift=True)
if __name__ == '__main__':
unittest.main()
......@@ -96,6 +96,8 @@ class TestFleet1(unittest.TestCase):
fleet.save_one_table(0, "./model_002", prefix="hahaha")
fleet.load_model("./model_0003")
fleet.load_one_table(0, "./model_004")
fleet.confirm()
fleet.revert()
except:
print("do not support pslib test, skip")
return
......
......@@ -97,7 +97,7 @@ class TestCloudRoleMaker2(unittest.TestCase):
role4._worker_gather(1)
role4._get_rank()
role4._get_size()
role4._all_comm.init(0, 0, "", "", "", "", "")
role4._all_comm.init()
role5 = GeneralRoleMaker(path="./test_gloo_5")
role5.get_local_endpoint()
role5.get_local_endpoint()
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test cloud role maker."""
from __future__ import print_function
import os
import unittest
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
class TestCloudRoleMaker(unittest.TestCase):
"""
Test cases for PaddleCloudRoleMaker.
"""
def setUp(self):
"""Set up, set envs."""
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001"
def test_pslib_1(self):
"""Test cases for pslib."""
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
try:
import netifaces
except:
print("warning: no netifaces, skip test_pslib_1")
return
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36002"
os.environ["PADDLE_TRAINER_ID"] = "0"
role_maker = GeneralRoleMaker(
init_timeout_seconds=100,
run_timeout_seconds=100,
http_ip_port="127.0.0.1:36003")
role_maker.generate_role()
place = fluid.CPUPlace()
exe = fluid.Executor(place)
fleet.init(role_maker)
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
with fluid.program_guard(train_program, startup_program):
show = fluid.layers.data(name="show", shape=[-1, 1], \
dtype="float32", lod_level=1, append_batch_size=False)
fc = fluid.layers.fc(input=show, size=1, act=None)
label = fluid.layers.data(name="click", shape=[-1, 1], \
dtype="int64", lod_level=1, append_batch_size=False)
label_cast = fluid.layers.cast(label, dtype='float32')
cost = fluid.layers.log_loss(fc, label_cast)
try:
adam = fluid.optimizer.Adam(learning_rate=0.000005)
adam = fleet.distributed_optimizer(adam)
adam.minimize([cost], [scope])
fleet.run_server()
http_server_d = {}
http_server_d["running"] = False
size_d = {}
role_maker._GeneralRoleMaker__start_kv_server(http_server_d, size_d)
except:
print("do not support pslib test, skip")
return
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test cloud role maker."""
from __future__ import print_function
import os
import unittest
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
class TestCloudRoleMaker(unittest.TestCase):
"""
Test cases for PaddleCloudRoleMaker.
"""
def setUp(self):
"""Set up, set envs."""
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ[
"PADDLE_PSERVERS_IP_PORT_LIST"] = "127.0.0.1:36001,127.0.0.2:36001"
def test_pslib_1(self):
"""Test cases for pslib."""
import sys
import threading
import paddle.fluid as fluid
try:
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.parameter_server.pslib import PSLib
from paddle.fluid.incubate.fleet.base.role_maker import \
GeneralRoleMaker
from paddle.fluid.incubate.fleet.utils.http_server import KVHandler
from paddle.fluid.incubate.fleet.utils.http_server import KVServer
from paddle.fluid.incubate.fleet.utils.http_server import \
KVHTTPServer
except:
print("warning: no fleet, skip test_pslib_4")
return
try:
import netifaces
except:
print("warning: no netifaces, skip test_pslib_4")
return
class FakeStream():
"""
it is a fake stream only for test.
"""
def write(self, a):
"""
write a to stream, do nothing
Args:
a(str): the string to write
"""
pass
def read(self, b):
"""
read data of len b from stream, do nothing
Args:
b(str): the len to read
Returns:
c(str): the result
"""
if b == 0:
raise ValueError("this is only for test")
return "fake"
import os
try:
class TmpKVHander(KVHandler):
"""
it is a fake handler only for this test case.
"""
def __init__(self, server):
"""Init."""
self.path = "a/b/c"
self.server = server
self.wfile = FakeStream()
self.rfile = FakeStream()
self.headers = {}
self.headers['Content-Length'] = 0
def address_string(self):
"""
fake address string, it will do nothing.
"""
return "123"
def send_response(self, code):
"""
fake send response, it will do nothing.
Args:
code(int): error code
"""
pass
def send_header(self, a, b):
"""
fake send header, it will do nothing.
Args:
a(str): some header
b(str): some header
"""
pass
def end_headers(self):
"""
fake end header, it will do nothing.
"""
pass
except:
print("warning: no KVHandler, skip test_pslib_4")
return
import sys
try:
class TmpServer(KVHTTPServer):
"""
it is a fake server only for this test case.
"""
def __init__(self):
"""Init."""
self.delete_kv_lock = threading.Lock()
self.delete_kv = {}
self.kv_lock = threading.Lock()
self.kv = {}
except:
print("warning: no KVHTTPServer, skip test_pslib_4")
return
try:
class TmpS(KVServer):
"""
it is a fake server only for this test case.
"""
def __init__(self):
"""Init."""
self.http_server = TmpServer()
self.listen_thread = None
self.size = {}
self.size["a"] = 999
except:
print("warning: no KVServer, skip test_pslib_4")
return
s = TmpServer()
h = TmpKVHander(s)
h.do_GET()
h.path = "a/b"
h.do_GET()
h.do_PUT()
h.do_DELETE()
h.path = "a/b/c"
s.kv["b"] = {}
s.kv["b"]["c"] = "456"
h.do_GET()
h.path = "a/d/e"
h.do_PUT()
h.headers['Content-Length'] = 1
h.do_PUT()
h.do_DELETE()
h.log_message("666")
s.get_deleted_size("haha")
s1 = TmpS()
s1.shoud_stop()
if __name__ == "__main__":
unittest.main()
......@@ -49,6 +49,8 @@ class TrainerDesc(object):
self._infer = False
def _set_fetch_var_and_info(self, fetch_vars, fetch_info, print_period):
# convert fetch_info to list
fetch_info = list(fetch_info)
for i, v in enumerate(fetch_vars):
self.proto_desc.fetch_config.fetch_var_names.extend([v.name])
self.proto_desc.fetch_config.fetch_var_str_format.extend(
......
......@@ -17,10 +17,10 @@ import threading
import time
import logging
import numpy as np
from paddle.fluid.log_helper import get_logger
FORMAT = '%(asctime)s-%(levelname)s: %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
local_logger = logging.getLogger(__name__)
local_logger = get_logger(
__name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s')
from .trainer_desc import MultiTrainer, DistMultiTrainer, PipelineTrainer
from .device_worker import Hogwild, DownpourSGD, Section, DownpourSGDOPT
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册