未验证 提交 0589ed21 编写于 作者: T tangwei12 提交者: GitHub

LOG CLEAN (#31819)

* upgrade vlog

* train from dataset fetch optimize
上级 b807e408
...@@ -41,7 +41,7 @@ ExternalProject_Add( ...@@ -41,7 +41,7 @@ ExternalProject_Add(
${EXTERNAL_PROJECT_LOG_ARGS} ${EXTERNAL_PROJECT_LOG_ARGS}
# TODO(gongwb): change to de newst repo when they changed. # TODO(gongwb): change to de newst repo when they changed.
GIT_REPOSITORY "https://github.com/wangjiawei04/brpc" GIT_REPOSITORY "https://github.com/wangjiawei04/brpc"
GIT_TAG "6d79e0b17f25107c35b705ea58d888083f59ff47" GIT_TAG "e203afb794caf027da0f1e0776443e7d20c0c28e"
PREFIX ${BRPC_SOURCES_DIR} PREFIX ${BRPC_SOURCES_DIR}
UPDATE_COMMAND "" UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
......
...@@ -60,7 +60,8 @@ uint64_t BrpcPsServer::start(const std::string &ip, uint32_t port) { ...@@ -60,7 +60,8 @@ uint64_t BrpcPsServer::start(const std::string &ip, uint32_t port) {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
std::string ip_port = ip + ":" + std::to_string(port); std::string ip_port = ip + ":" + std::to_string(port);
VLOG(3) << "server of rank " << _rank << " starts at " << ip_port; VLOG(0) << "running server with rank id: " << _rank
<< ", endpoint: " << ip_port;
brpc::ServerOptions options; brpc::ServerOptions options;
int num_threads = std::thread::hardware_concurrency(); int num_threads = std::thread::hardware_concurrency();
...@@ -538,7 +539,7 @@ int32_t BrpcPsService::stop_server(Table *table, ...@@ -538,7 +539,7 @@ int32_t BrpcPsService::stop_server(Table *table,
auto *p_server = _server; auto *p_server = _server;
std::thread t_stop([p_server]() { std::thread t_stop([p_server]() {
p_server->stop(); p_server->stop();
LOG(INFO) << "Server Stoped"; VLOG(3) << "Server Stoped";
}); });
t_stop.detach(); t_stop.detach();
return 0; return 0;
......
...@@ -324,7 +324,7 @@ std::string GetIntTypeEndpoint(const std::string& ip, const uint32_t& port) { ...@@ -324,7 +324,7 @@ std::string GetIntTypeEndpoint(const std::string& ip, const uint32_t& port) {
while (hp->h_addr_list[i] != NULL) { while (hp->h_addr_list[i] != NULL) {
int_ip = inet_ntoa(*(struct in_addr*)hp->h_addr_list[i]); int_ip = inet_ntoa(*(struct in_addr*)hp->h_addr_list[i]);
VLOG(0) << "Brpc Get host by name, host:" << ip << " -> ip: " << int_ip; VLOG(3) << "Brpc Get host by name, host:" << ip << " -> ip: " << int_ip;
break; break;
} }
......
...@@ -39,7 +39,7 @@ struct PSHost { ...@@ -39,7 +39,7 @@ struct PSHost {
// |---ip---|---port---|--rank--| // |---ip---|---port---|--rank--|
// |-32bit--|--20bit---|--12bit-| // |-32bit--|--20bit---|--12bit-|
// for pslib
uint64_t serialize_to_uint64() { uint64_t serialize_to_uint64() {
uint64_t host_label = 0; uint64_t host_label = 0;
host_label = inet_addr(ip.c_str()); host_label = inet_addr(ip.c_str());
...@@ -175,14 +175,12 @@ class PSEnvironment { ...@@ -175,14 +175,12 @@ class PSEnvironment {
host.ip = ip; host.ip = ip;
host.port = port; host.port = port;
host.rank = rank; host.rank = rank;
if (sign_set.count(rank) > 0) {
LOG(WARNING) << "ps-host :" << host.ip << ":" << host.port if (sign_set.count(rank) == 0) {
<< ", rank:" << host.rank
<< " already register, ignore register";
} else {
host_list.push_back(host); host_list.push_back(host);
sign_set.insert(rank); sign_set.insert(rank);
} }
return 0; return 0;
} }
......
...@@ -78,8 +78,7 @@ PSClient *PSClientFactory::create(const PSParameter &ps_config) { ...@@ -78,8 +78,7 @@ PSClient *PSClientFactory::create(const PSParameter &ps_config) {
} }
TableManager::instance().initialize(); TableManager::instance().initialize();
LOG(INFO) << "Create PSClient[" << service_param.client_class() VLOG(3) << "Create PSClient[" << service_param.client_class() << "] success";
<< "] success";
return client; return client;
} }
} // namespace distributed } // namespace distributed
......
...@@ -47,7 +47,7 @@ paddle::distributed::PSParameter load_from_prototxt( ...@@ -47,7 +47,7 @@ paddle::distributed::PSParameter load_from_prototxt(
} }
void PSCore::init_gflag(const std::string& gflags) { void PSCore::init_gflag(const std::string& gflags) {
LOG(INFO) << "Init With Gflags:" << gflags; VLOG(3) << "Init With Gflags:" << gflags;
std::vector<std::string> flags = paddle::string::split_string(gflags); std::vector<std::string> flags = paddle::string::split_string(gflags);
if (flags.size() < 1) { if (flags.size() < 1) {
flags.push_back("-max_body_size=314217728"); flags.push_back("-max_body_size=314217728");
......
...@@ -89,7 +89,6 @@ class DSGD : public DenseOptimizer { ...@@ -89,7 +89,6 @@ class DSGD : public DenseOptimizer {
auto blas = GetBlas<float>(); auto blas = GetBlas<float>();
float lr = *(global_learning_rate_) * (*learning_rate); float lr = *(global_learning_rate_) * (*learning_rate);
VLOG(4) << "DSGD LearningRate: " << lr;
blas.VCOPY(update_numel, update_values + begin, grads.data()); blas.VCOPY(update_numel, update_values + begin, grads.data());
blas.SCAL(update_numel, lr, grads.data()); blas.SCAL(update_numel, lr, grads.data());
blas.VSUB(update_numel, param + begin, grads.data(), param + begin); blas.VSUB(update_numel, param + begin, grads.data(), param + begin);
...@@ -157,7 +156,6 @@ class DAdam : public DenseOptimizer { ...@@ -157,7 +156,6 @@ class DAdam : public DenseOptimizer {
beta2_pow[0] = beta2_pow[0] * beta2; beta2_pow[0] = beta2_pow[0] * beta2;
float lr_ = *(global_learning_rate_)*learning_rate[0]; float lr_ = *(global_learning_rate_)*learning_rate[0];
VLOG(4) << "DAdam LearningRate: " << lr_;
lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]); lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]);
float* tmp_ = tmp.data(); float* tmp_ = tmp.data();
......
...@@ -110,7 +110,6 @@ class SSGD : public SparseOptimizer { ...@@ -110,7 +110,6 @@ class SSGD : public SparseOptimizer {
auto* value = block->Get(id); auto* value = block->Get(id);
float learning_rate = *(global_learning_rate_) * (value + lr_offset)[0]; float learning_rate = *(global_learning_rate_) * (value + lr_offset)[0];
VLOG(4) << "SSGD LearningRate: " << learning_rate;
float* param = value + param_offset; float* param = value + param_offset;
std::vector<float> grads; std::vector<float> grads;
...@@ -166,7 +165,6 @@ class SAdam : public SparseOptimizer { ...@@ -166,7 +165,6 @@ class SAdam : public SparseOptimizer {
if (!block->GetEntry(id)) continue; if (!block->GetEntry(id)) continue;
auto* values = block->Get(id); auto* values = block->Get(id);
float lr_ = *(global_learning_rate_) * (values + lr_offset)[0]; float lr_ = *(global_learning_rate_) * (values + lr_offset)[0];
VLOG(4) << "SAdam LearningRate: " << lr_;
float* param = values + param_offset; float* param = values + param_offset;
float* moment1 = values + m1_offset; float* moment1 = values + m1_offset;
float* moment2 = values + m2_offset; float* moment2 = values + m2_offset;
......
...@@ -161,9 +161,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { ...@@ -161,9 +161,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \ #if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \
!defined(_WIN32) && !defined(__APPLE__) !defined(_WIN32) && !defined(__APPLE__)
AppendPassWithCheck(strategy_.enable_auto_fusion_, "fusion_group_pass"); AppendPassWithCheck(strategy_.enable_auto_fusion_, "fusion_group_pass");
#else
LOG(WARNING) << "fusion_group is not enabled for Windows/MacOS now, and "
"only effective when running with CUDA GPU.";
#endif #endif
AppendPassWithCheck(strategy_.fuse_elewise_add_act_ops_, AppendPassWithCheck(strategy_.fuse_elewise_add_act_ops_,
"fuse_elewise_add_act_pass"); "fuse_elewise_add_act_pass");
...@@ -265,8 +262,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { ...@@ -265,8 +262,7 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
if (FLAGS_use_mkldnn) { if (FLAGS_use_mkldnn) {
AppendPass(pass_name); AppendPass(pass_name);
} else if (!strategy_.mkldnn_enabled_op_types_.empty()) { } else if (!strategy_.mkldnn_enabled_op_types_.empty()) {
LOG(WARNING) VLOG(1) << "mkldnn_enabled_op_types specify the operator type list to "
<< "mkldnn_enabled_op_types specify the operator type list to "
"use MKLDNN acceleration. It is null in default, means " "use MKLDNN acceleration. It is null in default, means "
"that all the operators supported by MKLDNN will be " "that all the operators supported by MKLDNN will be "
"accelerated. And it should not be set when " "accelerated. And it should not be set when "
...@@ -403,25 +399,25 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph, ...@@ -403,25 +399,25 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
<< ", num_trainers:" << num_trainers_; << ", num_trainers:" << num_trainers_;
} else if (pass->Type() == "fuse_relu_depthwise_conv_pass") { } else if (pass->Type() == "fuse_relu_depthwise_conv_pass") {
if (use_device != p::kCUDA) { if (use_device != p::kCUDA) {
LOG(WARNING) << "fuse_relu_depthwise_conv_pass is only supported on " VLOG(1) << "fuse_relu_depthwise_conv_pass is only supported on "
"GPU, skipped."; "GPU, skipped.";
continue; continue;
} }
} else if (pass->Type() == "fusion_group_pass") { } else if (pass->Type() == "fusion_group_pass") {
pass->Set<bool>("use_gpu", new bool((use_device == p::kCUDA))); pass->Set<bool>("use_gpu", new bool((use_device == p::kCUDA)));
if (use_device != p::kCUDA) { if (use_device != p::kCUDA) {
LOG(WARNING) << "fusion_group_pass is only supported on GPU, skipped."; VLOG(1) << "fusion_group_pass is only supported on GPU, skipped.";
continue; continue;
} }
} else if (pass->Type() == "fuse_bn_act_pass") { } else if (pass->Type() == "fuse_bn_act_pass") {
if (use_device != p::kCUDA) { if (use_device != p::kCUDA) {
LOG(WARNING) << "fuse_bn_act_pass is only supported on " VLOG(1) << "fuse_bn_act_pass is only supported on "
"GPU, skipped."; "GPU, skipped.";
continue; continue;
} }
} else if (pass->Type() == "fuse_bn_add_act_pass") { } else if (pass->Type() == "fuse_bn_add_act_pass") {
if (use_device != p::kCUDA) { if (use_device != p::kCUDA) {
LOG(WARNING) << "fuse_bn_add_act_pass is only supported on " VLOG(1) << "fuse_bn_add_act_pass is only supported on "
"GPU, skipped."; "GPU, skipped.";
continue; continue;
} }
......
...@@ -205,7 +205,7 @@ class DeviceWorker { ...@@ -205,7 +205,7 @@ class DeviceWorker {
Scope* root_scope_ = nullptr; Scope* root_scope_ = nullptr;
Scope* thread_scope_; Scope* thread_scope_;
paddle::platform::Place place_; paddle::platform::Place place_;
int64_t batch_num_; int64_t batch_num_ = 0;
FetchConfig fetch_config_; FetchConfig fetch_config_;
bool use_cvm_; bool use_cvm_;
bool no_cvm_; bool no_cvm_;
......
...@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include <ctime>
#include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/device_worker.h" #include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h" #include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h"
...@@ -226,14 +227,32 @@ void HogwildWorker::PrintFetchVars() { ...@@ -226,14 +227,32 @@ void HogwildWorker::PrintFetchVars() {
// call count // call count
batch_num_++; batch_num_++;
int batch_per_print = fetch_config_.print_period(); int batch_per_print = fetch_config_.print_period();
if (thread_id_ == 0) {
if (batch_num_ % batch_per_print == 0) {
int fetch_var_num = fetch_config_.fetch_var_names_size(); int fetch_var_num = fetch_config_.fetch_var_names_size();
if (fetch_var_num == 0) {
return;
}
if (thread_id_ == 0 && batch_num_ % batch_per_print == 0) {
time_t curtime;
time(&curtime);
char mbstr[80];
std::strftime(mbstr, sizeof(mbstr), "%Y-%m-%d %H:%M:%S",
std::localtime(&curtime));
std::stringstream ss;
ss << "time: [" << mbstr << "], ";
ss << "batch: [" << batch_num_ << "], ";
for (int i = 0; i < fetch_var_num; ++i) { for (int i = 0; i < fetch_var_num; ++i) {
platform::PrintVar(thread_scope_, fetch_config_.fetch_var_names(i), platform::PrintVar(thread_scope_, fetch_config_.fetch_var_names(i),
fetch_config_.fetch_var_str_format(i)); fetch_config_.fetch_var_str_format(i), &ss);
if (i < fetch_var_num - 1) {
ss << ", ";
} }
} }
std::cout << ss.str() << std::endl;
} }
} }
......
...@@ -27,24 +27,38 @@ namespace paddle { ...@@ -27,24 +27,38 @@ namespace paddle {
namespace platform { namespace platform {
void PrintVar(framework::Scope* scope, const std::string& var_name, void PrintVar(framework::Scope* scope, const std::string& var_name,
const std::string& print_info) { const std::string& print_info, std::stringstream* sstream) {
framework::Variable* var = scope->FindVar(var_name); framework::Variable* var = scope->FindVar(var_name);
if (var == nullptr) { if (var == nullptr) {
VLOG(1) << "Variable Name " << var_name << " does not exist in your scope"; VLOG(0) << "Variable Name " << var_name << " does not exist in your scope";
return; return;
} }
framework::LoDTensor* tensor = var->GetMutable<framework::LoDTensor>(); framework::LoDTensor* tensor = var->GetMutable<framework::LoDTensor>();
if (tensor == nullptr) { if (tensor == nullptr) {
VLOG(1) << "tensor of variable " << var_name VLOG(0) << "tensor of variable " << var_name
<< " does not exist in your scope"; << " does not exist in your scope";
return; return;
} }
std::ostringstream sstream; *sstream << print_info << ": ";
sstream << print_info << "\t";
sstream << var_name << "\t"; #define PrintTensorCallback(cpp_type, proto_type) \
sstream << *tensor << "\t"; do { \
std::cout << sstream.str() << std::endl; if (tensor->type() == proto_type) { \
*sstream << "["; \
auto* data = tensor->data<cpp_type>(); \
auto element_num = tensor->numel(); \
if (element_num > 0) { \
*sstream << data[0]; \
for (int j = 1; j < element_num; ++j) { \
*sstream << " " << data[j]; \
} \
} \
*sstream << "]"; \
} \
} while (0)
_ForEachDataType_(PrintTensorCallback);
} }
} // end namespace platform } // end namespace platform
......
...@@ -26,6 +26,6 @@ class Scope; ...@@ -26,6 +26,6 @@ class Scope;
namespace paddle { namespace paddle {
namespace platform { namespace platform {
void PrintVar(framework::Scope* scope, const std::string& var_name, void PrintVar(framework::Scope* scope, const std::string& var_name,
const std::string& print_info); const std::string& print_info, std::stringstream* out);
} // end namespace platform } // end namespace platform
} // end namespace paddle } // end namespace paddle
...@@ -18,5 +18,6 @@ ...@@ -18,5 +18,6 @@
TEST(LodTensorPrinter, PrintVar) { TEST(LodTensorPrinter, PrintVar) {
paddle::framework::Scope scope; paddle::framework::Scope scope;
paddle::platform::PrintVar(&scope, "NotAVar", "We don't have var"); std::stringstream ss;
paddle::platform::PrintVar(&scope, "NotAVar", "We don't have var", &ss);
} }
...@@ -628,6 +628,7 @@ class Fleet(object): ...@@ -628,6 +628,7 @@ class Fleet(object):
self.user_defined_optimizer = optimizer self.user_defined_optimizer = optimizer
if strategy is not None: if strategy is not None:
if self._is_collective:
warnings.warn( warnings.warn(
"It is recommended to use DistributedStrategy " "It is recommended to use DistributedStrategy "
"in fleet.init(). The strategy here is only for compatibility. " "in fleet.init(). The strategy here is only for compatibility. "
......
...@@ -768,7 +768,7 @@ class TheOnePSRuntime(RuntimeBase): ...@@ -768,7 +768,7 @@ class TheOnePSRuntime(RuntimeBase):
server = self._get_fleet_proto(is_server=True, is_sync=is_sync) server = self._get_fleet_proto(is_server=True, is_sync=is_sync)
proto_txt = str(server) proto_txt = str(server)
debug = bool(os.getenv("PSERVER_DEBUG", "0")) debug = bool(int(os.getenv("PSERVER_DEBUG", "0")))
if debug: if debug:
print("server: \n{}".format(proto_txt)) print("server: \n{}".format(proto_txt))
......
...@@ -17,6 +17,8 @@ TestCases for Monitor ...@@ -17,6 +17,8 @@ TestCases for Monitor
from __future__ import print_function from __future__ import print_function
import paddle import paddle
paddle.enable_static()
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.core as core import paddle.fluid.core as core
import numpy as np import numpy as np
...@@ -52,6 +54,11 @@ class TestDatasetWithStat(unittest.TestCase): ...@@ -52,6 +54,11 @@ class TestDatasetWithStat(unittest.TestCase):
name=slot, shape=[1], dtype="int64", lod_level=1) name=slot, shape=[1], dtype="int64", lod_level=1)
slots_vars.append(var) slots_vars.append(var)
embs = []
for x in slots_vars:
emb = fluid.layers.embedding(x, is_sparse=True, size=[100001, 4])
embs.append(emb)
dataset = paddle.distributed.InMemoryDataset() dataset = paddle.distributed.InMemoryDataset()
dataset._set_batch_size(32) dataset._set_batch_size(32)
dataset._set_thread(3) dataset._set_thread(3)
...@@ -74,11 +81,17 @@ class TestDatasetWithStat(unittest.TestCase): ...@@ -74,11 +81,17 @@ class TestDatasetWithStat(unittest.TestCase):
for i in range(self.epoch_num): for i in range(self.epoch_num):
for data in data_loader(): for data in data_loader():
exe.run(fluid.default_main_program(), feed=data) exe.run(fluid.default_main_program(), feed=data)
else: else:
for i in range(self.epoch_num): for i in range(self.epoch_num):
try: try:
exe.train_from_dataset(fluid.default_main_program(), exe.train_from_dataset(
dataset) fluid.default_main_program(),
dataset,
fetch_list=[embs[0], embs[1]],
fetch_info=["emb0", "emb1"],
print_period=1)
except Exception as e: except Exception as e:
self.assertTrue(False) self.assertTrue(False)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册