提交 1198d56c 编写于 作者: M ml0 提交者: wangzelin.wzl

Automically configure observer network speed.

上级 8d61732e
......@@ -576,13 +576,17 @@ int get_ethernet_speed(const ObString& devname, int64_t& speed)
int rc = OB_SUCCESS;
bool exist = false;
char path[OB_MAX_FILE_NAME_LENGTH];
static int dev_file_exist = 1;
if (0 == devname.length()) {
_OB_LOG(WARN, "empty devname");
rc = OB_INVALID_ARGUMENT;
} else {
IGNORE_RETURN snprintf(path, sizeof(path), "/sys/class/net/%.*s", devname.length(), devname.ptr());
if (OB_SUCCESS != (rc = FileDirectoryUtils::is_exists(path, exist)) || !exist) {
if (dev_file_exist) {
_OB_LOG(WARN, "path %s not exist", path);
dev_file_exist = 0;
}
rc = OB_FILE_NOT_EXIST;
}
}
......
......@@ -142,6 +142,7 @@ ObServer::ObServer()
long_ops_task_(),
ctas_clean_up_task_(),
refresh_active_time_task_(),
refresh_network_speed_task_(),
schema_status_proxy_(sql_proxy_),
is_log_dir_empty_(false)
{
......@@ -300,6 +301,8 @@ int ObServer::init(const ObServerOptions& opts, const ObPLogWriterCfg& log_cfg)
LOG_ERROR("init ctas clean up task fail", K(ret));
} else if (OB_FAIL(init_refresh_active_time_task())) {
LOG_ERROR("init refresh active time task fail", K(ret));
} else if (OB_FAIL(init_refresh_network_speed_task())) {
LOG_ERROR("init refresh network speed task fail", K(ret));
} else if (OB_FAIL(user_col_stat_service_.init(&sql_proxy_, &config_))) {
LOG_WARN("init user table column stat service failed");
} else if (OB_FAIL(user_table_stat_service_.init(&sql_proxy_, &ObPartitionService::get_instance(), &config_))) {
......@@ -1665,39 +1668,193 @@ int ObServer::init_storage()
return ret;
}
int ObServer::init_bandwidth_throttle()
int ObServer::init_gc_partition_adapter()
{
int ret = OB_SUCCESS;
const int64_t sys_bkgd_net_percentage = config_.sys_bkgd_net_percentage;
if (OB_FAIL(GC_PARTITION_ADAPTER.init(&sql_proxy_))) {
LOG_WARN("gc partition adapter init failed", K(ret));
} else {
LOG_INFO("gc partition adapter init success");
}
return ret;
}
int ObServer::get_network_speed_from_sysfs(int64_t &network_speed)
{
int ret = OB_SUCCESS;
// sys_bkgd_net_percentage_ = config_.sys_bkgd_net_percentage;
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = get_ethernet_speed(config_.devname.str(), ethernet_speed_))) {
ethernet_speed_ = DEFAULT_ETHERNET_SPEED;
if (OB_FAIL(get_ethernet_speed(config_.devname.str(), network_speed))) {
LOG_WARN("cannot get Ethernet speed, use default", K(tmp_ret), "devname", config_.devname.str());
} else if (ethernet_speed_ < 0) {
ethernet_speed_ = DEFAULT_ETHERNET_SPEED;
} else if (network_speed < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid Ethernet speed, use default", "devname", config_.devname.str());
}
if (OB_SUCC(ret)) {
int64_t rate = ethernet_speed_ * sys_bkgd_net_percentage / 100;
return ret;
}
if (OB_FAIL(bandwidth_throttle_.init(rate))) {
LOG_WARN("failed to init bandwidth throttle", K(ret), K(rate), K(ethernet_speed_));
char* strtrim(char* str)
{
char* ptr;
if (str == NULL) {
return NULL;
}
ptr = str + strlen(str) - 1;
while (isspace(*str)) {
str++;
}
while ((ptr > str) && isspace(*ptr)) {
*ptr-- = '\0';
}
return str;
}
static int64_t nic_rate_parse(const char *str, bool &valid)
{
char *p_unit = nullptr;
int64_t value = 0;
if (OB_ISNULL(str) || '\0' == str[0]) {
valid = false;
} else {
valid = true;
value = strtol(str, &p_unit, 0);
p_unit = strtrim(p_unit);
if (OB_ISNULL(p_unit)) {
valid = false;
} else if (value <= 0) {
valid = false;
} else if (0 == STRCASECMP("bit", p_unit)
|| 0 == STRCASECMP("b", p_unit)) {
// do nothing
} else if (0 == STRCASECMP("kbit", p_unit)
|| 0 == STRCASECMP("kb", p_unit)
|| 0 == STRCASECMP("k", p_unit)) {
value <<= 10;
} else if ('\0' == *p_unit
|| 0 == STRCASECMP("mbit", p_unit)
|| 0 == STRCASECMP("mb", p_unit)
|| 0 == STRCASECMP("m", p_unit)) {
// default is meta bit
value <<= 20;
} else if (0 == STRCASECMP("gbit", p_unit)
|| 0 == STRCASECMP("gb", p_unit)
|| 0 == STRCASECMP("g", p_unit)) {
value <<= 30;
} else {
valid = false;
LOG_ERROR("parse nic rate error", K(str), K(p_unit));
}
}
return value;
}
int ObServer::get_network_speed_from_config_file(int64_t &network_speed)
{
int ret = OB_SUCCESS;
const char *nic_rate_path = "etc/nic.rate.config";
const int64_t MAX_NIC_CONFIG_FILE_SIZE = 1 << 10; // 1KB
FILE *fp = nullptr;
char *buf = nullptr;
static int nic_rate_file_exist = 1;
if (OB_ISNULL(buf = static_cast<char *>(ob_malloc(MAX_NIC_CONFIG_FILE_SIZE + 1,
ObModIds::OB_BUFFER)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("alloc buffer failed", LITERAL_K(MAX_NIC_CONFIG_FILE_SIZE), K(ret));
} else if (OB_ISNULL(fp = fopen(nic_rate_path, "r"))) {
if (ENOENT == errno) {
ret = OB_FILE_NOT_EXIST;
if (nic_rate_file_exist) {
LOG_WARN("NIC Config file doesn't exist, auto detecting", K(nic_rate_path), K(ret));
nic_rate_file_exist = 0;
}
} else {
ret = OB_IO_ERROR;
if (EAGAIN == errno) {
LOG_WARN("Can't open NIC Config file", K(nic_rate_path), K(errno), K(ret));
} else {
LOG_ERROR("Can't open NIC Config file", K(nic_rate_path), K(errno), K(ret));
}
}
} else {
if (!nic_rate_file_exist) {
LOG_INFO("Reading NIC Config file", K(nic_rate_path));
nic_rate_file_exist = 1;
}
memset(buf, 0, MAX_NIC_CONFIG_FILE_SIZE + 1);
fread(buf, 1, MAX_NIC_CONFIG_FILE_SIZE, fp);
char *prate = nullptr;
if (OB_UNLIKELY(0 != ferror(fp))) {
ret = OB_IO_ERROR;
LOG_ERROR("Read NIC Config file error", K(nic_rate_path), K(ret));
} else if (OB_UNLIKELY(0 == feof(fp))) {
ret = OB_BUF_NOT_ENOUGH;
LOG_ERROR("NIC Config file is too long", K(nic_rate_path), K(ret));
} else {
LOG_INFO("succeed to init_bandwidth_throttle", K(sys_bkgd_net_percentage_), K(ethernet_speed_), K(rate));
prate = strchr(buf, '=');
if (nullptr != prate) {
prate++;
bool valid = false;
int64_t nic_rate = nic_rate_parse(prate, valid);
if (valid) {
network_speed = nic_rate / 8;
} else {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid NIC Rate Config", K(ret));
}
} else {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid NIC Config file", K(ret));
}
} // else
if (OB_UNLIKELY(0 != fclose(fp))) {
ret = OB_IO_ERROR;
LOG_ERROR("Close NIC Config file failed", K(ret));
}
} // else
if (OB_LIKELY(nullptr != buf)) {
ob_free(buf);
buf = nullptr;
}
return ret;
}
int ObServer::init_gc_partition_adapter()
int ObServer::init_bandwidth_throttle()
{
int ret = OB_SUCCESS;
if (OB_FAIL(GC_PARTITION_ADAPTER.init(&sql_proxy_))) {
LOG_WARN("gc partition adapter init failed", K(ret));
int64_t network_speed = 0;
if (OB_SUCC(get_network_speed_from_config_file(network_speed))) {
LOG_DEBUG("got network speed from config file", K(network_speed));
} else if (OB_SUCC(get_network_speed_from_sysfs(network_speed))) {
LOG_DEBUG("got network speed from sysfs", K(network_speed));
} else {
LOG_INFO("gc partition adapter init success");
network_speed = DEFAULT_ETHERNET_SPEED;
LOG_DEBUG("using default network speed", K(network_speed));
}
sys_bkgd_net_percentage_ = config_.sys_bkgd_net_percentage;
if (network_speed > 0) {
int64_t rate = network_speed * sys_bkgd_net_percentage_ / 100;
if (OB_FAIL(bandwidth_throttle_.init(rate))) {
LOG_WARN("failed to init bandwidth throttle", K(ret), K(rate), K(network_speed));
} else {
LOG_INFO("succeed to init_bandwidth_throttle",
K(sys_bkgd_net_percentage_),
K(network_speed),
K(rate));
ethernet_speed_ = network_speed;
}
}
return ret;
}
......@@ -1706,32 +1863,35 @@ int ObServer::reload_config()
{
int ret = OB_SUCCESS;
if (OB_FAIL(reload_bandwidth_throttle_limit())) {
if (OB_FAIL(reload_bandwidth_throttle_limit(ethernet_speed_))) {
LOG_WARN("failed to reload_bandwidth_throttle_limit", K(ret));
}
return ret;
}
int ObServer::reload_bandwidth_throttle_limit()
int ObServer::reload_bandwidth_throttle_limit(int64_t network_speed)
{
int ret = OB_SUCCESS;
const int64_t sys_bkgd_net_percentage = config_.sys_bkgd_net_percentage;
if (OB_SUCC(ret) && sys_bkgd_net_percentage != sys_bkgd_net_percentage_) {
int64_t rate = ethernet_speed_ * sys_bkgd_net_percentage / 100;
if ((sys_bkgd_net_percentage_ != sys_bkgd_net_percentage) || (ethernet_speed_ != network_speed)) {
if (network_speed <= 0) {
LOG_WARN("wrong network speed.", K(ethernet_speed_));
network_speed = DEFAULT_ETHERNET_SPEED;
}
int64_t rate = network_speed * sys_bkgd_net_percentage / 100;
if (OB_FAIL(bandwidth_throttle_.set_rate(rate))) {
LOG_WARN("failed to reset bandwidth throttle", K(ret), K(rate), K(ethernet_speed_));
} else {
sys_bkgd_net_percentage_ = sys_bkgd_net_percentage;
LOG_INFO("succeed to reload_bandwidth_throttle_limit",
"old_percentage",
sys_bkgd_net_percentage_,
"new_percentage",
sys_bkgd_net_percentage,
K(ethernet_speed_),
"old_percentage", sys_bkgd_net_percentage_,
"new_percentage", sys_bkgd_net_percentage,
K(network_speed),
K(rate));
sys_bkgd_net_percentage_ = sys_bkgd_net_percentage;
ethernet_speed_ = network_speed;
}
}
return ret;
......@@ -1957,6 +2117,8 @@ void ObServer::ObRefreshTimeTask::runTimerTask()
} else if (OB_FAIL(obs_->refresh_temp_table_sess_active_time())) {
LOG_WARN("ObRefreshTimeTask clean up task failed", K(ret));
}
LOG_WARN("LICQ, ObRefreshTimeTask::runTimerTask", K(ret));
}
int ObServer::refresh_temp_table_sess_active_time()
......@@ -1972,6 +2134,73 @@ int ObServer::refresh_temp_table_sess_active_time()
return ret;
}
ObServer::ObRefreshNetworkSpeedTask::ObRefreshNetworkSpeedTask()
: obs_(nullptr), is_inited_(false)
{}
int ObServer::ObRefreshNetworkSpeedTask::init(ObServer *obs, int tg_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObRefreshNetworkSpeedTask has already been inited", K(ret));
} else if (OB_ISNULL(obs)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ObRefreshNetworkSpeedTask init with null ptr", K(ret), K(obs));
} else {
obs_ = obs;
is_inited_ = true;
if (OB_FAIL(TG_SCHEDULE(tg_id, *this, REFRESH_INTERVAL, true /*schedule repeatly*/))) {
LOG_WARN("fail to schedule task ObRefreshNetworkSpeedTask", K(ret));
}
}
return ret;
}
void ObServer::ObRefreshNetworkSpeedTask::destroy()
{
is_inited_ = false;
obs_ = nullptr;
}
void ObServer::ObRefreshNetworkSpeedTask::runTimerTask()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObRefreshNetworkSpeedTask has not been inited", K(ret));
} else if (OB_ISNULL(obs_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ObRefreshNetworkSpeedTask cleanup task got null ptr", K(ret));
} else if (OB_FAIL(obs_->refresh_network_speed())) {
LOG_WARN("ObRefreshNetworkSpeedTask reload bandwidth throttle limit failed", K(ret));
}
}
int ObServer::refresh_network_speed()
{
int ret = OB_SUCCESS;
int64_t network_speed = 0;
if (OB_SUCC(get_network_speed_from_config_file(network_speed))) {
LOG_DEBUG("got network speed from config file", K(network_speed));
} else if (OB_SUCC(get_network_speed_from_sysfs(network_speed))) {
LOG_DEBUG("got network speed from sysfs", K(network_speed));
} else {
network_speed = DEFAULT_ETHERNET_SPEED;
LOG_DEBUG("using default network speed", K(network_speed));
}
if ((network_speed > 0) && (network_speed != ethernet_speed_)) {
LOG_INFO("network speed changed", "from", ethernet_speed_, "to", network_speed);
if (OB_FAIL(reload_bandwidth_throttle_limit(network_speed))) {
LOG_WARN("ObRefreshNetworkSpeedTask reload bandwidth throttle limit failed", K(ret));
}
}
return ret;
}
int ObServer::init_refresh_active_time_task()
{
int ret = OB_SUCCESS;
......@@ -1990,6 +2219,15 @@ int ObServer::init_ctas_clean_up_task()
return ret;
}
int ObServer::init_refresh_network_speed_task()
{
int ret = OB_SUCCESS;
if (OB_FAIL(refresh_network_speed_task_.init(this, lib::TGDefIDs::ServerGTimer))) {
LOG_WARN("fail to init refresh network speed task", K(ret));
}
return ret;
}
// @@Query cleanup rules for built tables and temporary tables:
// 1, Traverse all table_schema, if the session_id of table T <> 0 means that the table is being created or the previous
// creation failed or the temporary table is to be cleared, then enter 2#; 2, Create a table for the query: traverse the
......
......@@ -108,7 +108,22 @@ public:
bool is_inited_;
};
class ObRefreshTime {
class ObRefreshNetworkSpeedTask: public common::ObTimerTask
{
public:
ObRefreshNetworkSpeedTask();
virtual ~ObRefreshNetworkSpeedTask() {}
int init(ObServer *observer, int tg_id);
void destroy();
virtual void runTimerTask() override;
private:
const static int64_t REFRESH_INTERVAL = 1L * 1000L * 1000L;//1hr
ObServer *obs_;
bool is_inited_;
};
class ObRefreshTime {
public:
explicit ObRefreshTime(ObServer* obs) : obs_(obs)
{}
......@@ -257,10 +272,13 @@ private:
int wait_gts();
int init_gts_cache_mgr();
int init_storage();
int init_bandwidth_throttle();
int init_gc_partition_adapter();
int reload_bandwidth_throttle_limit();
int init_loaddata_global_stat();
int init_bandwidth_throttle();
int reload_bandwidth_throttle_limit(int64_t network_speed);
int get_network_speed_from_sysfs(int64_t &network_speed);
int get_network_speed_from_config_file(int64_t &network_speed);
int refresh_network_speed();
int clean_up_invalid_tables();
int init_ctas_clean_up_task(); // Regularly clean up the residuals related to querying and building tables and
......@@ -268,6 +286,7 @@ private:
int refresh_temp_table_sess_active_time();
int init_refresh_active_time_task(); // Regularly update the sess_active_time of the temporary table created by the
// proxy connection sess
int init_refresh_network_speed_task();
int set_running_mode();
int check_server_can_start_service();
......@@ -394,6 +413,7 @@ private:
storage::ObPurgeCompletedMonitorInfoTask long_ops_task_;
ObCTASCleanUpTask ctas_clean_up_task_; // repeat & no retry
ObRefreshTimeTask refresh_active_time_task_; // repeat & no retry
ObRefreshNetworkSpeedTask refresh_network_speed_task_; // repeat & no retry
blocksstable::ObStorageEnv storage_env_;
share::ObSchemaStatusProxy schema_status_proxy_;
ObSignalWorker sig_worker_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册