提交 845d79f2 编写于 作者: C CatKang

add purgelogsto command

上级 917ca7f3
...@@ -75,4 +75,13 @@ private: ...@@ -75,4 +75,13 @@ private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info); virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
}; };
class PurgelogstoCmd : public Cmd {
public:
PurgelogstoCmd() : num_(0){
}
virtual void Do();
private:
uint32_t num_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};
#endif #endif
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <cstdio> #include <cstdio>
#include <list> #include <list>
#include <string>
#include <deque> #include <deque>
#include <pthread.h> #include <pthread.h>
...@@ -30,7 +31,7 @@ class Version; ...@@ -30,7 +31,7 @@ class Version;
class Binlog class Binlog
{ {
public: public:
Binlog(const char* Binlog_path); Binlog(const std::string& Binlog_path);
~Binlog(); ~Binlog();
void Lock() { mutex_.Lock(); } void Lock() { mutex_.Lock(); }
......
...@@ -17,6 +17,7 @@ const std::string kCmdNameAuth = "auth"; ...@@ -17,6 +17,7 @@ const std::string kCmdNameAuth = "auth";
const std::string kCmdNameBgsave = "bgsave"; const std::string kCmdNameBgsave = "bgsave";
const std::string kCmdNameBgsaveoff = "bgsaveoff"; const std::string kCmdNameBgsaveoff = "bgsaveoff";
const std::string kCmdNameCompact = "compact"; const std::string kCmdNameCompact = "compact";
const std::string kCmdNamePurgelogsto = "purgelogsto";
//Kv //Kv
const std::string kCmdNameSet = "set"; const std::string kCmdNameSet = "set";
...@@ -184,6 +185,8 @@ public: ...@@ -184,6 +185,8 @@ public:
kOutOfRange, kOutOfRange,
kInvalidPwd, kInvalidPwd,
kNoneBgsave, kNoneBgsave,
kPurgeExist,
kInvalidParameter,
kWrongNum, kWrongNum,
kErrOther, kErrOther,
}; };
...@@ -226,6 +229,10 @@ public: ...@@ -226,6 +229,10 @@ public:
return "-ERR invalid password\r\n"; return "-ERR invalid password\r\n";
case kNoneBgsave: case kNoneBgsave:
return "-ERR No BGSave Works now\r\n"; return "-ERR No BGSave Works now\r\n";
case kPurgeExist:
return "-ERR binlog may in use or non_exist or already in purging...\r\n";
case kInvalidParameter:
return "-ERR Invalid Argument\r\n";
case kWrongNum: case kWrongNum:
result = "-ERR wrong number of arguments for '"; result = "-ERR wrong number of arguments for '";
result.append(message_); result.append(message_);
......
...@@ -76,9 +76,10 @@ static const size_t kHeaderSize = 1 + 3; ...@@ -76,9 +76,10 @@ static const size_t kHeaderSize = 1 + 3;
*/ */
static const int64_t kPoolSize = 1073741824; static const int64_t kPoolSize = 1073741824;
static std::string kBinlog = "/binlog"; static const std::string kBinlogPrefix = "binlog";
static const size_t kBinlogPrefixLen = 6;
static std::string kManifest = "/manifest"; static const std::string kManifest = "manifest";
/* /*
* define common character * define common character
......
...@@ -80,7 +80,6 @@ public: ...@@ -80,7 +80,6 @@ public:
void DeleteSlave(int fd); // hb_fd void DeleteSlave(int fd); // hb_fd
bool FindSlave(std::string& ip_port); bool FindSlave(std::string& ip_port);
Status GetSmallestValidLog(uint32_t* max);
void MayUpdateSlavesMap(int64_t sid, int32_t hb_fd); void MayUpdateSlavesMap(int64_t sid, int32_t hb_fd);
void BecomeMaster(); void BecomeMaster();
...@@ -140,9 +139,25 @@ public: ...@@ -140,9 +139,25 @@ public:
} }
void Bgsave(); void Bgsave();
bool Bgsaveoff(); bool Bgsaveoff();
void ClearBgsave();
bool RunBgsaveEngine(); bool RunBgsaveEngine();
void ClearBgsave() {
bgsave_info_.Clear();
bgsaving_ = false;
}
/*
* PurgeLog used
*/
struct PurgeArg {
PikaServer *p;
uint32_t to;
};
bool PurgeLogs(uint32_t to);
bool PurgeFiles(uint32_t to);
void ClearPurge() {
purging_ = false;
}
private: private:
...@@ -176,7 +191,6 @@ private: ...@@ -176,7 +191,6 @@ private:
/* /*
* Bgsave use * Bgsave use
*/ */
slash::Mutex bgsave_protector_;
std::atomic<bool> bgsaving_; std::atomic<bool> bgsaving_;
pink::BGThread bgsave_thread_; pink::BGThread bgsave_thread_;
nemo::BackupEngine *bgsave_engine_; nemo::BackupEngine *bgsave_engine_;
...@@ -186,6 +200,15 @@ private: ...@@ -186,6 +200,15 @@ private:
bool InitBgsaveEnv(const std::string& bgsave_path); bool InitBgsaveEnv(const std::string& bgsave_path);
bool InitBgsaveEngine(); bool InitBgsaveEngine();
/*
* Purgelogs use
*/
std::atomic<bool> purging_;
pink::BGThread purge_thread_;
static void DoPurgeLogs(void* arg);
bool GetPurgeWindow(uint32_t &max);
PikaServer(PikaServer &ps); PikaServer(PikaServer &ps);
void operator =(const PikaServer &ps); void operator =(const PikaServer &ps);
}; };
......
...@@ -27,7 +27,7 @@ static void PikaConfInit(const std::string& path) { ...@@ -27,7 +27,7 @@ static void PikaConfInit(const std::string& path) {
} }
static void PikaGlogInit() { static void PikaGlogInit() {
FLAGS_log_dir = "./log"; FLAGS_log_dir = g_pika_conf->log_path();
FLAGS_minloglevel = 0; FLAGS_minloglevel = 0;
FLAGS_alsologtostderr = true; FLAGS_alsologtostderr = true;
FLAGS_max_log_size = 1800; FLAGS_max_log_size = 1800;
......
...@@ -198,7 +198,6 @@ void CompactCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) ...@@ -198,7 +198,6 @@ void CompactCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info)
} }
} }
void CompactCmd::Do() { void CompactCmd::Do() {
CmdRes::CmdRet ret;
nemo::Status s = g_pika_server->db()->Compact(nemo::kALL); nemo::Status s = g_pika_server->db()->Compact(nemo::kALL);
if (s.ok()) { if (s.ok()) {
res_.SetRes(CmdRes::kOk); res_.SetRes(CmdRes::kOk);
...@@ -206,3 +205,31 @@ void CompactCmd::Do() { ...@@ -206,3 +205,31 @@ void CompactCmd::Do() {
res_.SetRes(CmdRes::kErrOther, s.ToString()); res_.SetRes(CmdRes::kErrOther, s.ToString());
} }
} }
void PurgelogstoCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
if (!ptr_info->CheckArg(argv.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNamePurgelogsto);
return;
}
std::string filename = slash::StringToLower(argv[1]);
if (filename.size() <= kBinlogPrefixLen ||
kBinlogPrefix != filename.substr(0, kBinlogPrefixLen)) {
res_.SetRes(CmdRes::kInvalidParameter);
return;
}
std::string str_num = filename.substr(kBinlogPrefixLen);
int64_t num = 0;
if (!slash::string2l(str_num.data(), str_num.size(), &num) || num < 0) {
res_.SetRes(CmdRes::kInvalidParameter);
return;
}
num_ = num;
}
void PurgelogstoCmd::Do() {
if (g_pika_server->PurgeLogs(num_)) {
res_.SetRes(CmdRes::kOk);
} else {
res_.SetRes(CmdRes::kPurgeExist);
}
}
...@@ -75,7 +75,7 @@ Status Version::Init() { ...@@ -75,7 +75,7 @@ Status Version::Init() {
/* /*
* Binlog * Binlog
*/ */
Binlog::Binlog(const char* Binlog_path) : Binlog::Binlog(const std::string& Binlog_path) :
version_(NULL), version_(NULL),
consumer_num_(0), consumer_num_(0),
item_num_(0), item_num_(0),
...@@ -94,7 +94,7 @@ Binlog::Binlog(const char* Binlog_path) : ...@@ -94,7 +94,7 @@ Binlog::Binlog(const char* Binlog_path) :
slash::CreateDir(binlog_path_); slash::CreateDir(binlog_path_);
filename = binlog_path_ + kBinlog; filename = binlog_path_ + kBinlogPrefix;
const std::string manifest = binlog_path_ + kManifest; const std::string manifest = binlog_path_ + kManifest;
std::string profile; std::string profile;
...@@ -417,3 +417,4 @@ Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset) { ...@@ -417,3 +417,4 @@ Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset) {
InitLogFile(); InitLogFile();
return Status::OK(); return Status::OK();
} }
...@@ -23,6 +23,8 @@ void InitCmdInfoTable() { ...@@ -23,6 +23,8 @@ void InitCmdInfoTable() {
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameBgsaveoff, bgsaveoffptr)); cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameBgsaveoff, bgsaveoffptr));
CmdInfo* compactptr = new CmdInfo(kCmdNameCompact, 1, kCmdFlagsRead | kCmdFlagsAdmin); CmdInfo* compactptr = new CmdInfo(kCmdNameCompact, 1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameCompact, compactptr)); cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameCompact, compactptr));
CmdInfo* purgelogptr = new CmdInfo(kCmdNamePurgelogsto, 2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNamePurgelogsto, purgelogptr));
//Kv //Kv
////SetCmd ////SetCmd
...@@ -271,6 +273,8 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) { ...@@ -271,6 +273,8 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameBgsaveoff, bgsaveoffptr)); cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameBgsaveoff, bgsaveoffptr));
Cmd* compactptr = new CompactCmd(); Cmd* compactptr = new CompactCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameCompact, compactptr)); cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameCompact, compactptr));
Cmd* purgelogptr = new PurgelogstoCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePurgelogsto, purgelogptr));
//Kv //Kv
////SetCmd ////SetCmd
......
...@@ -35,6 +35,10 @@ int PikaConf::Load() ...@@ -35,6 +35,10 @@ int PikaConf::Load()
GetConfStr("dump_path", &bgsave_path_); GetConfStr("dump_path", &bgsave_path_);
GetConfInt("target_file_size_base", &target_file_size_base_); GetConfInt("target_file_size_base", &target_file_size_base_);
if (log_path_[log_path_.length() - 1] != '/') {
log_path_ += "/";
}
if (bgsave_path_[bgsave_path_.length() - 1] != '/') { if (bgsave_path_[bgsave_path_.length() - 1] != '/') {
bgsave_path_ += "/"; bgsave_path_ += "/";
} }
......
...@@ -19,7 +19,8 @@ PikaServer::PikaServer() : ...@@ -19,7 +19,8 @@ PikaServer::PikaServer() :
master_port_(0), master_port_(0),
repl_state_(PIKA_REPL_NO_CONNECT), repl_state_(PIKA_REPL_NO_CONNECT),
role_(PIKA_ROLE_SINGLE), role_(PIKA_ROLE_SINGLE),
bgsaving_(false) { bgsaving_(false),
purging_(false) {
pthread_rwlock_init(&rwlock_, NULL); pthread_rwlock_init(&rwlock_, NULL);
...@@ -49,7 +50,7 @@ PikaServer::PikaServer() : ...@@ -49,7 +50,7 @@ PikaServer::PikaServer() :
pika_trysync_thread_ = new PikaTrysyncThread(); pika_trysync_thread_ = new PikaTrysyncThread();
pthread_rwlock_init(&state_protector_, NULL); pthread_rwlock_init(&state_protector_, NULL);
logger_ = new Binlog("./log"); logger_ = new Binlog(g_pika_conf->log_path());
} }
PikaServer::~PikaServer() { PikaServer::~PikaServer() {
...@@ -284,27 +285,6 @@ Status PikaServer::AddBinlogSender(SlaveItem &slave, uint32_t filenum, uint64_t ...@@ -284,27 +285,6 @@ Status PikaServer::AddBinlogSender(SlaveItem &slave, uint32_t filenum, uint64_t
} }
} }
Status PikaServer::GetSmallestValidLog(uint32_t* max) {
slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator iter;
*max = logger_->version_->pro_num();
for (iter = slaves_.begin(); iter != slaves_.end(); iter++) {
int tmp = static_cast<PikaBinlogSenderThread*>(iter->sender)->filenum();
if (tmp < *max) {
*max = tmp;
}
}
return Status::OK();
}
void PikaServer::ClearBgsave() {
bgsave_info_.Clear();
bgsaving_ = false;
}
bool PikaServer::InitBgsaveEnv(const std::string& bgsave_path) { bool PikaServer::InitBgsaveEnv(const std::string& bgsave_path) {
// Prepare for bgsave dir // Prepare for bgsave dir
bgsave_info_.start_time = time(NULL); bgsave_info_.start_time = time(NULL);
...@@ -375,12 +355,10 @@ bool PikaServer::RunBgsaveEngine() { ...@@ -375,12 +355,10 @@ bool PikaServer::RunBgsaveEngine() {
void PikaServer::Bgsave() { void PikaServer::Bgsave() {
// Only one thread can go through // Only one thread can go through
bgsave_protector_.Lock(); bool expect = false;
if (bgsaving_) { if (!bgsaving_.compare_exchange_strong(expect, true)) {
return; return;
} }
bgsaving_ = true;
bgsave_protector_.Unlock();
// Prepare for Bgsaving // Prepare for Bgsaving
if (!InitBgsaveEnv(g_pika_conf->bgsave_path()) if (!InitBgsaveEnv(g_pika_conf->bgsave_path())
...@@ -435,3 +413,84 @@ bool PikaServer::Bgsaveoff() { ...@@ -435,3 +413,84 @@ bool PikaServer::Bgsaveoff() {
} }
return true; return true;
} }
bool PikaServer::PurgeLogs(uint32_t to) {
// Only one thread can go through
bool expect = false;
if (!purging_.compare_exchange_strong(expect, true)) {
return false;
}
uint32_t max = 0;
if (!GetPurgeWindow(max)){
return false;
}
LOG(WARNING) << "max seqnum could be deleted: " << max;
if (to > max) {
ClearPurge();
return false;
}
PurgeArg *arg = new PurgeArg();
arg->p = this;
arg->to = to;
// Start new thread if needed
if (!purge_thread_.is_running()) {
purge_thread_.StartThread();
}
purge_thread_.Schedule(&DoPurgeLogs, static_cast<void*>(arg));
return true;
}
void PikaServer::DoPurgeLogs(void* arg) {
PurgeArg *ppurge = static_cast<PurgeArg*>(arg);
PikaServer* ps = ppurge->p;
ps->PurgeFiles(ppurge->to);
ps->ClearPurge();
delete (PurgeArg*)arg;
}
bool PikaServer::PurgeFiles(uint32_t to)
{
std::string log_path = g_pika_conf->log_path();
std::vector<std::string> children;
int ret = slash::GetChildren(log_path, children);
if (ret != 0){
LOG(ERROR) << "Get all files in log path failed! error:" << ret;
return false;
}
std::string filename;
std::vector<std::string>::iterator it;
for (it = children.begin(); it != children.end(); ++it) {
filename = *it;
if (filename.compare(0, kBinlogPrefixLen, kBinlogPrefix) == 0
&& stoul(filename.substr(kBinlogPrefixLen)) <= to) {
slash::Status s = slash::DeleteFile(log_path + filename);
LOG(WARNING) << "Purge log file : " << filename;
if (!s.ok()) {
LOG(ERROR) << "Purge log file : " << filename << " failed! error:" << s.ToString();
}
}
}
return true;
}
bool PikaServer::GetPurgeWindow(uint32_t &max) {
max = logger_->version_->pro_num();
slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator it;
for (it = slaves_.begin(); it != slaves_.end(); ++it) {
PikaBinlogSenderThread *pb = static_cast<PikaBinlogSenderThread*>((*it).sender);
uint32_t filenum = pb->filenum();
max = filenum < max ? filenum : max;
}
// remain some more
if (max > 10) {
max -= 10;
return true;
}
max = 0;
return false;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册