提交 10f384c8 编写于 作者: K KernelMaker

1). add compact_cron; 2). bugfix: cannot stop pika when the bgtask(compaction) is running

上级 edb71c1b
......@@ -56,6 +56,9 @@ db-sync-speed : -1
# network-interface : eth1
# replication
# slaveof : master-ip:master-port
# CronTask, format: start:end-ratio, like 02:04-60, pika will check to schedule compaction between 2 to 4 o'clock everyday
# if the dbsize/disksize > 60%
# compact-cron :
###################
## Critical Settings
......
......@@ -34,6 +34,7 @@ public:
std::string db_path() { RWLock l(&rwlock_, false); return db_path_; }
std::string db_sync_path() { RWLock l(&rwlock_, false); return db_sync_path_; }
int db_sync_speed() { RWLock l(&rwlock_, false); return db_sync_speed_; }
std::string compact_cron() { RWLock l(&rwlock_, false); return compact_cron_; }
int write_buffer_size() { RWLock l(&rwlock_, false); return write_buffer_size_; }
int timeout() { RWLock l(&rwlock_, false); return timeout_; }
......@@ -151,6 +152,7 @@ private:
std::string db_path_;
std::string db_sync_path_;
int db_sync_speed_;
std::string compact_cron_;
int write_buffer_size_;
int log_level_;
bool daemonize_;
......
......@@ -10,6 +10,7 @@
#include <functional>
#include <map>
#include <unordered_set>
#include <sys/statfs.h>
#include <nemo.h>
#include <time.h>
#include "pika_binlog.h"
......@@ -38,6 +39,16 @@ public:
PikaServer();
~PikaServer();
static uint64_t DiskSize(std::string path) {
struct statfs diskInfo;
int ret = statfs(path.c_str(), &diskInfo);
if (ret == -1) {
LOG(WARNING) << "Get DiskSize error: " << strerror(errno);
return 0;
}
return diskInfo.f_bsize * diskInfo.f_blocks;
}
/*
* Get & Set
*/
......@@ -317,6 +328,7 @@ private:
std::shared_ptr<nemo::Nemo> db_;
time_t start_time_s_;
bool have_scheduled_crontask_;
int worker_num_;
PikaWorkerThread* pika_worker_thread_[PIKA_MAX_WORKER_THREAD_NUM];
......@@ -371,6 +383,7 @@ private:
static void DoPurgeLogs(void* arg);
bool GetBinlogFiles(std::map<uint32_t, std::string>& binlogs);
void AutoCompactRange();
void AutoPurge();
bool CouldPurge(uint32_t index);
......
......@@ -765,6 +765,10 @@ void ConfigCmd::ConfigGet(std::string &ret) {
ret = "*2\r\n";
EncodeString(&ret, "db-sync-speed");
EncodeInt32(&ret, g_pika_conf->db_sync_speed());
} else if (get_item == "compact-cron") {
ret = "*2\r\n";
EncodeString(&ret, "compact-cron");
EncodeString(&ret, g_pika_conf->compact_cron());
} else if (get_item == "maxmemory") {
ret = "*2\r\n";
EncodeString(&ret, "maxmemory");
......@@ -874,7 +878,7 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeString(&ret, "slaveof");
EncodeString(&ret, g_pika_conf->slaveof());
} else if (get_item == "*") {
ret = "*72\r\n";
ret = "*74\r\n";
EncodeString(&ret, "port");
EncodeInt32(&ret, g_pika_conf->port());
EncodeString(&ret, "thread-num");
......@@ -943,6 +947,8 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeString(&ret, g_pika_conf->db_sync_path());
EncodeString(&ret, "db-sync-speed");
EncodeInt32(&ret, g_pika_conf->db_sync_speed());
EncodeString(&ret, "compact-cron");
EncodeString(&ret, g_pika_conf->compact_cron());
EncodeString(&ret, "network-interface");
EncodeString(&ret, g_pika_conf->network_interface());
EncodeString(&ret, "slaveof");
......
......@@ -103,6 +103,26 @@ int PikaConf::Load()
sync_buffer_size_ = 100;
}
compact_cron_ = "";
GetConfStr("compact-cron", &compact_cron_);
if (compact_cron_ != "") {
std::string::size_type len = compact_cron_.length();
std::string::size_type colon = compact_cron_.find(":");
std::string::size_type underline = compact_cron_.find("-");
if (colon == std::string::npos || underline == std::string::npos ||
colon >= underline || colon + 1 >= len ||
colon + 1 == underline || underline + 1 >= len) {
compact_cron_ = "";
} else {
int start = std::atoi(compact_cron_.substr(0, colon).c_str());
int end = std::atoi(compact_cron_.substr(colon+1, underline).c_str());
int usage = std::atoi(compact_cron_.substr(underline+1).c_str());
if (start < 0 || start > 23 || end < 0 || end > 23 || usage < 0 || usage > 100) {
compact_cron_ = "";
}
}
}
// write_buffer_size
GetConfInt("write-buffer-size", &write_buffer_size_);
if (write_buffer_size_ <= 0 ) {
......@@ -206,8 +226,7 @@ int PikaConf::ConfigRewrite() {
SetConfInt("root-connection-num", root_connection_num_);
SetConfInt("slowlog-log-slower-than", slowlog_log_slower_than_);
SetConfBool("slave-read-only", readonly_);
SetConfStr("db-sync-path", db_sync_path_);
SetConfInt("db-sync-speed", db_sync_speed_);
SetConfStr("compact-cron", compact_cron_);
SetConfStr("network-interface", network_interface_);
SetConfStr("slaveof", slaveof_);
......
......@@ -27,6 +27,7 @@ extern PikaConf *g_pika_conf;
PikaServer::PikaServer() :
ping_thread_(NULL),
exit_(false),
have_scheduled_crontask_(false),
sid_(0),
master_ip_(""),
master_connection_(0),
......@@ -1099,6 +1100,53 @@ bool PikaServer::GetBinlogFiles(std::map<uint32_t, std::string>& binlogs) {
return true;
}
void PikaServer::AutoCompactRange() {
std::string cc = g_pika_conf->compact_cron();
if (cc == "") {
return;
} else {
std::string::size_type colon = cc.find(":");
std::string::size_type underline = cc.find("-");
int start = std::atoi(cc.substr(0, colon).c_str());
int end = std::atoi(cc.substr(colon+1, underline).c_str());
int usage = std::atoi(cc.substr(underline+1).c_str());
std::time_t t = std::time(nullptr);
std::tm* t_m = std::localtime(&t);
bool in_window = false;
if (start < end && (t_m->tm_hour >= start && t_m->tm_hour < end)) {
in_window = true;
} else if (start > end && ((t_m->tm_hour >= start && t_m->tm_hour < 24) ||
(t_m->tm_hour >= 0 && t_m->tm_hour < end))) {
in_window = true;
} else {
have_scheduled_crontask_ = false;
}
if (!have_scheduled_crontask_ && in_window) {
struct statfs disk_info;
int ret = statfs(g_pika_conf->db_path().c_str(), &disk_info);
if (ret == -1) {
LOG(WARNING) << "statfs error: " << strerror(errno);
return;
}
uint64_t total_size = disk_info.f_bsize * disk_info.f_blocks;
uint64_t db_size = slash::Du(g_pika_conf->db_path());
if ((db_size / total_size) * 100 >= usage) {
nemo::Status s = db_->Compact(nemo::kALL);
if (s.ok()) {
LOG(INFO) << "schedule compactRange, dbsize: " << db_size << " disksize: " << total_size;
} else {
LOG(INFO) << "schedule compactRange Failed, dbsize: " << db_size << " disksize: " << total_size
<< " error: " << s.ToString();
}
have_scheduled_crontask_ = true;
}
}
}
}
void PikaServer::AutoPurge() {
if (!PurgeLogs(0, false, false)) {
DLOG(WARNING) << "Auto purge failed";
......@@ -1309,6 +1357,8 @@ uint64_t PikaServer::ServerCurrentQps() {
}
void PikaServer::DoTimingTask() {
// Maybe schedule compactrange
AutoCompactRange();
// Purge log
AutoPurge();
......
Subproject commit d8bb7ecb7ca27707b712c828607664fdb63616d7
Subproject commit 3342a5d137c15d2a40b93365293bc250c9fbbfb7
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册