提交 4895ee1a 编写于 作者: S SongZhao 提交者: GitHub

Codis (#51)

* pika support codis slot mgirate (#48)

* codis slot migrate support

* add slotsreload slotsreloadoff slotsdel slotsscan command

* info command add slots reloading status

* support migrate large keys

* slotsmgrttagone command response fix

* dbsize command get  keys number in time if set slotmigrate mode

* fix some bugs like config get *, slots migrate command only execute on master

* slot migrate support keys with expire time (#49)

* support masterauth and fix slave replication auth (#50)

* support masterauth and fix slave replication auth

* compitible with official auth
上级 3618f6e8
......@@ -18,6 +18,8 @@ write-buffer-size : 268435456
timeout : 60
# Requirepass
requirepass :
# Masterauth
masterauth :
# Userpass
userpass :
# User Blacklist
......@@ -26,6 +28,8 @@ userblacklist :
dump-prefix :
# daemonize [yes | no]
#daemonize : yes
# slotmigrate [yes | no]
#slotmigrate : no
# Dump Path
dump-path : ./dump/
# pidfile Path
......
......@@ -34,6 +34,18 @@ const std::string kCmdNameConfig = "config";
const std::string kCmdNameMonitor = "monitor";
const std::string kCmdNameDbsize = "dbsize";
//Migrate slot
const std::string kCmdNameSlotsMgrtSlot = "slotsmgrtslot";
const std::string kCmdNameSlotsMgrtTagSlot = "slotsmgrttagslot";
const std::string kCmdNameSlotsMgrtOne = "slotsmgrtone";
const std::string kCmdNameSlotsMgrtTagOne = "slotsmgrttagone";
const std::string kCmdNameSlotsInfo = "slotsinfo";
const std::string kCmdNameSlotsHashKey = "slotshashkey";
const std::string kCmdNameSlotsReload = "slotsreload";
const std::string kCmdNameSlotsReloadOff = "slotsreloadoff";
const std::string kCmdNameSlotsDel = "slotsdel";
const std::string kCmdNameSlotsScan = "slotsscan";
//Kv
const std::string kCmdNameSet = "set";
const std::string kCmdNameGet = "get";
......
......@@ -38,6 +38,8 @@ public:
int timeout() { RWLock l(&rwlock_, false); return timeout_; }
std::string requirepass() { RWLock l(&rwlock_, false); return requirepass_; }
std::string masterauth() { RWLock l(&rwlock_, false); return masterauth_; }
bool slotmigrate() { RWLock l(&rwlock_, false); return slotmigrate_; }
std::string bgsave_path() { RWLock l(&rwlock_, false); return bgsave_path_; }
std::string bgsave_prefix() { RWLock l(&rwlock_, false); return bgsave_prefix_; }
std::string userpass() { RWLock l(&rwlock_, false); return userpass_; }
......@@ -91,6 +93,14 @@ public:
RWLock l(&rwlock_, true);
requirepass_ = value;
}
void SetMasterAuth(const std::string &value) {
RWLock l(&rwlock_, true);
masterauth_ = value;
}
void SetSlotMigrate(const std::string &value) {
RWLock l(&rwlock_, true);
slotmigrate_ = (value == "yes") ? true : false;
}
void SetUserPass(const std::string &value) {
RWLock l(&rwlock_, true);
userpass_ = value;
......@@ -143,8 +153,10 @@ private:
int write_buffer_size_;
int log_level_;
bool daemonize_;
bool slotmigrate_;
int timeout_;
std::string requirepass_;
std::string masterauth_;
std::string userpass_;
std::vector<std::string> user_blacklist_;
std::string bgsave_path_;
......
......@@ -161,6 +161,51 @@ public:
bgsave_info_.bgsaving = false;
}
/*
* BGSlotsReload used
*/
struct BGSlotsReload {
bool reloading;
time_t start_time;
std::string s_start_time;
int64_t cursor;
std::string pattern;
int64_t count;
BGSlotsReload() : reloading(false), cursor(0), pattern("*"), count(100){}
void Clear() {
reloading = false;
pattern = "*";
count = 100;
cursor = 0;
}
};
BGSlotsReload bgslots_reload() {
slash::MutexLock l(&bgsave_protector_);
return bgslots_reload_;
}
bool GetSlotsreloading() {
slash::MutexLock l(&bgsave_protector_);
return bgslots_reload_.reloading;
}
void SetSlotsreloading(bool reloading) {
slash::MutexLock l(&bgsave_protector_);
bgslots_reload_.reloading = reloading;
}
void SetSlotsreloadingCursor(int64_t cursor) {
slash::MutexLock l(&bgsave_protector_);
bgslots_reload_.cursor = cursor;
}
int64_t GetSlotsreloadingCursor() {
slash::MutexLock l(&bgsave_protector_);
return bgslots_reload_.cursor;
}
void Bgslotsreload();
void StopBgslotsreload() {
slash::MutexLock l(&bgsave_protector_);
bgslots_reload_.reloading = false;
}
/*
* PurgeLog used
*/
......@@ -309,6 +354,11 @@ private:
bgsave_info_.Clear();
}
/*
* BGSlotsReload use
*/
BGSlotsReload bgslots_reload_;
static void DoBgslotsreload(void* arg);
/*
* Purgelogs use
......
#ifndef PIKA_SLOT_H_
#define PIKA_SLOT_H_
#include "pika_command.h"
#include "pika_client_conn.h"
#include "strings.h"
const std::string SlotKeyPrefix = "_internal:slotkey:4migrate:";
const size_t MaxKeySendSize = 10 * 1024;
//crc 32
#define HASH_SLOTS_MASK 0x000003ff
#define HASH_SLOTS_SIZE (HASH_SLOTS_MASK + 1)
const uint32_t IEEE_POLY = 0xedb88320;
extern uint32_t crc32tab[256];
void CRC32TableInit(uint32_t poly);
extern void InitCRC32Table();
extern uint32_t CRC32Update(uint32_t crc, const char *buf, int len);
extern int SlotNum(const std::string &str);
extern int KeyType(const std::string key, std::string &key_type);
extern void SlotKeyAdd(const std::string type, const std::string key);
extern void SlotKeyRem(const std::string key);
extern void KeyNotExistsRem(const std::string type, const std::string key);
class SlotsMgrtTagSlotCmd : public Cmd {
public:
SlotsMgrtTagSlotCmd() {}
virtual void Do();
private:
std::string dest_ip_;
int64_t dest_port_;
int64_t timeout_ms_;
int64_t slot_num_;
std::string key_;
char key_type_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
int SlotKeyPop();
};
class SlotsMgrtTagOneCmd : public Cmd {
public:
SlotsMgrtTagOneCmd() {}
virtual void Do();
private:
std::string dest_ip_;
int64_t dest_port_;
int64_t timeout_ms_;
std::string key_;
int64_t slot_num_;
char key_type_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
int KeyTypeCheck();
int SlotKeyRemCheck();
};
class SlotsInfoCmd : public Cmd {
public:
SlotsInfoCmd() {}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};
class SlotsHashKeyCmd : public Cmd {
public:
SlotsHashKeyCmd() {}
virtual void Do();
private:
std::vector<std::string> keys_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};
class SlotsReloadCmd : public Cmd {
public:
SlotsReloadCmd() {}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};
class SlotsReloadOffCmd : public Cmd {
public:
SlotsReloadOffCmd() {}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};
class SlotsDelCmd : public Cmd {
public:
SlotsDelCmd() {}
virtual void Do();
private:
std::vector<std::string> slots_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};
class SlotsScanCmd : public Cmd {
public:
SlotsScanCmd() : pattern_("*"), count_(10) {}
virtual void Do();
private:
std::string key_, pattern_;
int64_t cursor_, count_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
virtual void Clear() {
pattern_ = "*";
count_ = 10;
}
};
#endif
......@@ -9,6 +9,7 @@
#include "pika_command.h"
#include "pika_conf.h"
#include "pika_define.h"
#include "pika_slot.h"
#include "env.h"
PikaConf *g_pika_conf;
......@@ -165,6 +166,7 @@ int main(int argc, char *argv[]) {
PikaGlogInit();
PikaSignalSetup();
InitCmdInfoTable();
InitCRC32Table();
LOG(INFO) << "Server at: " << path;
g_pika_server = new PikaServer();
......
......@@ -8,6 +8,7 @@
#include "pika_conf.h"
#include "pika_admin.h"
#include "pika_server.h"
#include "pika_slot.h"
#include <sys/utsname.h>
......@@ -532,6 +533,10 @@ void InfoCmd::InfoStats(std::string &info) {
time_t current_time_s = time(NULL);
tmp_stream << "is_bgsaving:" << (is_bgsaving ? "Yes, " : "No, ") << bgsave_info.s_start_time << ", "
<< (is_bgsaving ? (current_time_s - bgsave_info.start_time) : 0) << "\r\n";
PikaServer::BGSlotsReload bgslotsreload_info = g_pika_server->bgslots_reload();
bool is_reloading = g_pika_server->GetSlotsreloading();
tmp_stream << "is_slots_reloading:" << (is_reloading ? "Yes, " : "No, ") << bgslotsreload_info.s_start_time << ", "
<< (is_reloading ? (current_time_s - bgslotsreload_info.start_time) : 0) << "\r\n";
PikaServer::KeyScanInfo key_scan_info = g_pika_server->key_scan_info();
bool is_scaning = g_pika_server->key_scaning();
tmp_stream << "is_scaning_keyspace:" << (is_scaning ? ("Yes, " + key_scan_info.s_start_time) + "," : "No");
......@@ -776,6 +781,10 @@ void ConfigCmd::ConfigGet(std::string &ret) {
ret = "*2\r\n";
EncodeString(&ret, "requirepass");
EncodeString(&ret, g_pika_conf->requirepass());
} else if (get_item == "masterauth") {
ret = "*2\r\n";
EncodeString(&ret, "masterauth");
EncodeString(&ret, g_pika_conf->masterauth());
} else if (get_item == "userpass") {
ret = "*2\r\n";
EncodeString(&ret, "userpass");
......@@ -792,6 +801,10 @@ void ConfigCmd::ConfigGet(std::string &ret) {
ret = "*2\r\n";
EncodeString(&ret, "daemonize");
EncodeString(&ret, g_pika_conf->daemonize() ? "yes" : "no");
} else if (get_item == "slotmigrate") {
ret = "*2\r\n";
EncodeString(&ret, "slotmigrate");
EncodeString(&ret, g_pika_conf->slotmigrate() ? "yes" : "no");
} else if (get_item == "dump-path") {
ret = "*2\r\n";
EncodeString(&ret, "dump-path");
......@@ -857,7 +870,7 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeString(&ret, "slaveof");
EncodeString(&ret, g_pika_conf->slaveof());
} else if (get_item == "*") {
ret = "*66\r\n";
ret = "*70\r\n";
EncodeString(&ret, "port");
EncodeInt32(&ret, g_pika_conf->port());
EncodeString(&ret, "thread-num");
......@@ -880,12 +893,16 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeInt32(&ret, g_pika_conf->timeout());
EncodeString(&ret, "requirepass");
EncodeString(&ret, g_pika_conf->requirepass());
EncodeString(&ret, "masterauth");
EncodeString(&ret, g_pika_conf->masterauth());
EncodeString(&ret, "userpass");
EncodeString(&ret, g_pika_conf->userpass());
EncodeString(&ret, "userblacklist");
EncodeString(&ret, g_pika_conf->suser_blacklist());
EncodeString(&ret, "daemonize");
EncodeInt32(&ret, g_pika_conf->daemonize());
EncodeString(&ret, "slotmigrate");
EncodeInt32(&ret, g_pika_conf->slotmigrate());
EncodeString(&ret, "dump-path");
EncodeString(&ret, g_pika_conf->bgsave_path());
EncodeString(&ret, "dump-prefix");
......@@ -932,10 +949,12 @@ void ConfigCmd::ConfigGet(std::string &ret) {
void ConfigCmd::ConfigSet(std::string& ret) {
std::string set_item = config_args_v_[1];
if (set_item == "*") {
ret = "*13\r\n";
ret = "*15\r\n";
EncodeString(&ret, "loglevel");
EncodeString(&ret, "timeout");
EncodeString(&ret, "requirepass");
EncodeString(&ret, "masterauth");
EncodeString(&ret, "slotmigrate");
EncodeString(&ret, "userpass");
EncodeString(&ret, "userblacklist");
EncodeString(&ret, "dump-prefix");
......@@ -973,6 +992,12 @@ void ConfigCmd::ConfigSet(std::string& ret) {
} else if (set_item == "requirepass") {
g_pika_conf->SetRequirePass(value);
ret = "+OK\r\n";
} else if (set_item == "masterauth") {
g_pika_conf->SetMasterAuth(value);
ret = "+OK\r\n";
} else if (set_item == "slotmigrate") {
g_pika_conf->SetSlotMigrate(value);
ret = "+OK\r\n";
} else if (set_item == "userpass") {
g_pika_conf->SetUserPass(value);
ret = "+OK\r\n";
......@@ -1081,6 +1106,22 @@ void DbsizeCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info)
}
void DbsizeCmd::Do() {
if (g_pika_conf->slotmigrate()){
int64_t dbsize = 0;
for (int i = 0; i < HASH_SLOTS_SIZE; ++i){
int64_t card = 0;
card = g_pika_server->db()->SCard(SlotKeyPrefix+std::to_string(i));
if (card >= 0) {
dbsize += card;
}else {
res_.SetRes(CmdRes::kErrOther, "Get dbsize error");
return;
}
}
res_.AppendInteger(dbsize);
return;
}
PikaServer::KeyScanInfo key_scan_info = g_pika_server->key_scan_info();
std::vector<uint64_t> &key_nums_v = key_scan_info.key_nums_v;
if (key_scan_info.key_nums_v.size() != 5) {
......
......@@ -8,6 +8,7 @@
#include "nemo.h"
#include "pika_bit.h"
#include "pika_server.h"
#include "pika_slot.h"
extern PikaServer *g_pika_server;
......@@ -47,6 +48,7 @@ void BitSetCmd::Do() {
nemo::Status s = g_pika_server->db()->BitSet(key_, bit_offset_, on_, &bit_val);
if (s.ok()){
res_.AppendInteger((int)bit_val);
SlotKeyAdd("k", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......
......@@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.
#include "pika_admin.h"
#include "pika_slot.h"
#include "pika_kv.h"
#include "pika_hash.h"
#include "pika_list.h"
......@@ -53,6 +54,28 @@ void InitCmdInfoTable() {
CmdInfo* dbsizeptr = new CmdInfo(kCmdNameDbsize, 1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameDbsize, dbsizeptr));
//migrate slot
CmdInfo* slotmgrtslotptr = new CmdInfo(kCmdNameSlotsMgrtSlot, 5, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsMgrtSlot, slotmgrtslotptr));
CmdInfo* slotmgrttagslotptr = new CmdInfo(kCmdNameSlotsMgrtTagSlot, 5, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsMgrtTagSlot, slotmgrttagslotptr));
CmdInfo* slotmgrtoneptr = new CmdInfo(kCmdNameSlotsMgrtOne, 5, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsMgrtOne, slotmgrtoneptr));
CmdInfo* slotmgrttagoneptr = new CmdInfo(kCmdNameSlotsMgrtTagOne, 5, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsMgrtTagOne, slotmgrttagoneptr));
CmdInfo* slotsinfoptr = new CmdInfo(kCmdNameSlotsInfo, -1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsInfo, slotsinfoptr));
CmdInfo* slotshashkeyptr = new CmdInfo(kCmdNameSlotsHashKey, -1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsHashKey, slotshashkeyptr));
CmdInfo* slotsreloadptr = new CmdInfo(kCmdNameSlotsReload, -1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsReload, slotsreloadptr));
CmdInfo* slotsreloadoffptr = new CmdInfo(kCmdNameSlotsReloadOff, -1, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsReloadOff, slotsreloadoffptr));
CmdInfo* slotsdelptr = new CmdInfo(kCmdNameSlotsDel, -2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsDel, slotsdelptr));
CmdInfo* slotsscanptr = new CmdInfo(kCmdNameSlotsScan, -3, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlotsScan, slotsscanptr));
//Kv
////SetCmd
CmdInfo* setptr = new CmdInfo(kCmdNameSet, -3, kCmdFlagsWrite | kCmdFlagsKv);
......@@ -404,6 +427,28 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
Cmd* dbsizeptr = new DbsizeCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameDbsize, dbsizeptr));
//migrate slot
Cmd* slotmgrtslotptr = new SlotsMgrtTagSlotCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlotsMgrtSlot, slotmgrtslotptr));
Cmd* slotmgrttagslotptr = new SlotsMgrtTagSlotCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlotsMgrtTagSlot, slotmgrttagslotptr));
Cmd* slotmgrtoneptr = new SlotsMgrtTagOneCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlotsMgrtOne, slotmgrtoneptr));
Cmd* slotmgrttagoneptr = new SlotsMgrtTagOneCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlotsMgrtTagOne, slotmgrttagoneptr));
Cmd* slotsinfoptr = new SlotsInfoCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlotsInfo, slotsinfoptr));
Cmd* slotshashkeyptr = new SlotsHashKeyCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlotsHashKey, slotshashkeyptr));
Cmd* slotsreloadptr = new SlotsReloadCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlotsReload, slotsreloadptr));
Cmd* slotsreloadoffptr = new SlotsReloadOffCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlotsReloadOff, slotsreloadoffptr));
Cmd* slotsdelptr = new SlotsDelCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlotsDel, slotsdelptr));
Cmd* slotsscanptr = new SlotsScanCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSlotsScan, slotsscanptr));
//Kv
////SetCmd
Cmd* setptr = new SetCmd();
......
......@@ -42,6 +42,7 @@ int PikaConf::Load()
timeout_ = 60; // 60s
}
GetConfStr("requirepass", &requirepass_);
GetConfStr("masterauth", &masterauth_);
GetConfStr("userpass", &userpass_);
GetConfInt("maxclients", &maxclients_);
if (maxclients_ <= 0) {
......@@ -148,6 +149,11 @@ int PikaConf::Load()
}
GetConfStr("pidfile", &pidfile_);
// slot migrate
std::string smgrt;
GetConfStr("slotmigrate", &smgrt);
slotmigrate_ = (smgrt == "yes") ? true : false;
// db sync
GetConfStr("db-sync-path", &db_sync_path_);
if (db_sync_path_[db_sync_path_.length() - 1] != '/') {
......@@ -180,10 +186,12 @@ int PikaConf::ConfigRewrite() {
SetConfInt("write-buffer-size", write_buffer_size_);
SetConfInt("timeout", timeout_);
SetConfStr("requirepass", requirepass_);
SetConfStr("masterauth", masterauth_);
SetConfStr("userpass", userpass_);
SetConfStr("userblacklist", suser_blacklist());
SetConfStr("dump-prefix", bgsave_prefix_);
SetConfStr("daemonize", daemonize_ ? "yes" : "no");
SetConfStr("slotmigrate", slotmigrate_ ? "yes" : "no");
SetConfStr("dump-path", bgsave_path_);
SetConfStr("pidfile", pidfile_);
SetConfInt("maxclients", maxclients_);
......
......@@ -7,6 +7,7 @@
#include "nemo.h"
#include "pika_hash.h"
#include "pika_server.h"
#include "pika_slot.h"
extern PikaServer *g_pika_server;
......@@ -34,6 +35,7 @@ void HDelCmd::Do() {
}
}
res_.AppendInteger(num);
KeyNotExistsRem("h", key_);
return;
}
......@@ -113,11 +115,13 @@ void HSetCmd::Do() {
if (s.ok()) {
if (tmp != value_) {
DoHSet(key_, field_, value_, res_, ":0");
SlotKeyAdd("h", key_);
} else {
res_.AppendContent(":0");
}
} else if (s.IsNotFound()) {
DoHSet(key_, field_, value_, res_, ":1");
SlotKeyAdd("h", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -163,6 +167,7 @@ void HIncrbyCmd::Do() {
nemo::Status s = g_pika_server->db()->HIncrby(key_, field_, by_, new_value);
if (s.ok() || s.IsNotFound()) {
res_.AppendContent(":" + new_value);
SlotKeyAdd("h", key_);
} else if (s.IsCorruption() && s.ToString() == "Corruption: value is not integer") {
res_.SetRes(CmdRes::kInvalidInt);
} else if (s.IsInvalidArgument()) {
......@@ -193,6 +198,7 @@ void HIncrbyfloatCmd::Do() {
if (s.ok()) {
res_.AppendStringLen(new_value.size());
res_.AppendContent(new_value);
SlotKeyAdd("h", key_);
} else if (s.IsCorruption() && s.ToString() == "Corruption: value is not float") {
res_.SetRes(CmdRes::kInvalidFloat);
} else if (s.IsInvalidArgument()) {
......@@ -304,6 +310,7 @@ void HMsetCmd::Do() {
nemo::Status s = g_pika_server->db()->HMSet(key_, fv_v_);
if (s.ok()) {
res_.SetRes(CmdRes::kOk);
SlotKeyAdd("h", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -325,6 +332,7 @@ void HSetnxCmd::Do() {
nemo::Status s = g_pika_server->db()->HSetnx(key_, field_, value_);
if (s.ok()) {
res_.AppendContent(":1");
SlotKeyAdd("h", key_);
} else if (s.IsCorruption() && s.ToString() == "Corruption: Already Exist") {
res_.AppendContent(":0");
} else {
......
......@@ -7,6 +7,7 @@
#include "nemo.h"
#include "pika_kv.h"
#include "pika_server.h"
#include "pika_slot.h"
extern PikaServer *g_pika_server;
......@@ -66,6 +67,7 @@ void SetCmd::Do() {
} else {
res_.AppendArrayLen(-1);;
}
SlotKeyAdd("k", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -105,6 +107,10 @@ void DelCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
void DelCmd::Do() {
int64_t count = 0;
std::vector<std::string>::const_iterator it;
for (it = keys_.begin(); it != keys_.end(); it++) {
SlotKeyRem(*it);
}
nemo::Status s = g_pika_server->db()->MDel(keys_, &count);
if (s.ok()) {
res_.AppendInteger(count);
......@@ -128,6 +134,7 @@ void IncrCmd::Do() {
nemo::Status s = g_pika_server->db()->Incrby(key_, 1, new_value);
if (s.ok()) {
res_.AppendContent(":" + new_value);
SlotKeyAdd("k", key_);
} else if (s.IsCorruption() && s.ToString() == "Corruption: value is not a integer") {
res_.SetRes(CmdRes::kInvalidInt);
} else if (s.IsInvalidArgument()) {
......@@ -156,6 +163,7 @@ void IncrbyCmd::Do() {
nemo::Status s = g_pika_server->db()->Incrby(key_, by_, new_value);
if (s.ok()) {
res_.AppendContent(":" + new_value);
SlotKeyAdd("k", key_);
} else if (s.IsCorruption() && s.ToString() == "Corruption: value is not a integer") {
res_.SetRes(CmdRes::kInvalidInt);
} else if (s.IsInvalidArgument()) {
......@@ -185,6 +193,7 @@ void IncrbyfloatCmd::Do() {
if (s.ok()) {
res_.AppendStringLen(new_value.size());
res_.AppendContent(new_value);
SlotKeyAdd("k", key_);
} else if (s.IsCorruption() && s.ToString() == "Corruption: value is not a float"){
res_.SetRes(CmdRes::kInvalidFloat);
} else if (s.IsInvalidArgument()) {
......@@ -267,6 +276,7 @@ void GetsetCmd::Do() {
res_.AppendStringLen(old_value.size());
res_.AppendContent(old_value);
}
SlotKeyAdd("k", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -288,6 +298,7 @@ void AppendCmd::Do() {
nemo::Status s = g_pika_server->db()->Append(key_, value_, &new_len);
if (s.ok() || s.IsNotFound()) {
res_.AppendInteger(new_len);
SlotKeyAdd("k", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -355,6 +366,7 @@ void SetnxCmd::Do() {
nemo::Status s = g_pika_server->db()->Setnx(key_, value_, &res);
if (s.ok()) {
res_.AppendInteger(res);
SlotKeyAdd("k", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -379,6 +391,7 @@ void SetexCmd::Do() {
nemo::Status s = g_pika_server->db()->Set(key_, value_, sec_);
if (s.ok()) {
res_.SetRes(CmdRes::kOk);
SlotKeyAdd("k", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -405,6 +418,10 @@ void MsetCmd::Do() {
nemo::Status s = g_pika_server->db()->MSet(kvs_);
if (s.ok()) {
res_.SetRes(CmdRes::kOk);
std::vector<nemo::KV>::const_iterator it;
for (it = kvs_.begin(); it != kvs_.end(); it++) {
SlotKeyAdd("k", it->key);
}
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -432,6 +449,10 @@ void MsetnxCmd::Do() {
nemo::Status s = g_pika_server->db()->MSetnx(kvs_, &res);
if (s.ok()) {
res_.AppendInteger(res);
std::vector<nemo::KV>::const_iterator it;
for (it = kvs_.begin(); it != kvs_.end(); it++) {
SlotKeyAdd("k", it->key);
}
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -484,6 +505,7 @@ void SetrangeCmd::Do() {
nemo::Status s = g_pika_server->db()->Setrange(key_, offset_, value_, &new_len);
if (s.ok()) {
res_.AppendInteger(new_len);
SlotKeyAdd("k", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......
......@@ -8,6 +8,7 @@
#include "nemo.h"
#include "pika_list.h"
#include "pika_server.h"
#include "pika_slot.h"
extern PikaServer *g_pika_server;
......@@ -107,6 +108,7 @@ void LPushCmd::Do() {
res_.SetRes(CmdRes::kErrOther, s.ToString());
} else {
res_.AppendInteger(llen);
SlotKeyAdd("l", key_);
}
}
......@@ -122,6 +124,7 @@ void LPopCmd::Do() {
nemo::Status s = g_pika_server->db()->LPop(key_, &value);
if (s.ok()) {
res_.AppendString(value);
KeyNotExistsRem("l", key_);
} else if (s.IsNotFound()) {
res_.AppendStringLen(-1);
} else {
......@@ -142,6 +145,7 @@ void LPushxCmd::Do() {
nemo::Status s = g_pika_server->db()->LPushx(key_, value_, &llen);
if (s.ok() || s.IsNotFound()) {
res_.AppendInteger(llen);
SlotKeyAdd("l", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -201,6 +205,7 @@ void LRemCmd::Do() {
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
KeyNotExistsRem("l", key_);
}
void LSetCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info) {
......@@ -268,6 +273,7 @@ void RPopCmd::Do() {
nemo::Status s = g_pika_server->db()->RPop(key_, &value);
if (s.ok()) {
res_.AppendString(value);
KeyNotExistsRem("l", key_);
} else if (s.IsNotFound()) {
res_.AppendStringLen(-1);
} else {
......@@ -289,7 +295,7 @@ void RPopLPushCmd::Do() {
if (s.ok()) {
res_.AppendString(value);
} else if (s.IsNotFound() && s.ToString() == "NotFound: not found the source key") {
res_.AppendStringLen(-1);
res_.AppendStringLen(-1);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -320,6 +326,7 @@ void RPushCmd::Do() {
res_.SetRes(CmdRes::kErrOther, s.ToString());
} else {
res_.AppendInteger(llen);
SlotKeyAdd("l", key_);
}
}
......@@ -336,6 +343,7 @@ void RPushxCmd::Do() {
nemo::Status s = g_pika_server->db()->RPushx(key_, value_, &llen);
if (s.ok() || s.IsNotFound()) {
res_.AppendInteger(llen);
SlotKeyAdd("l", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......
......@@ -20,6 +20,7 @@
#include "slash_string.h"
#include "bg_thread.h"
#include "pika_conf.h"
#include "pika_slot.h"
extern PikaConf *g_pika_conf;
......@@ -831,6 +832,63 @@ bool PikaServer::Bgsaveoff() {
return true;
}
void PikaServer::Bgslotsreload() {
// Only one thread can go through
{
slash::MutexLock l(&bgsave_protector_);
if (bgslots_reload_.reloading || bgsave_info_.bgsaving) {
return;
}
bgslots_reload_.reloading = true;
}
bgslots_reload_.start_time = time(NULL);
char s_time[32];
int len = strftime(s_time, sizeof(s_time), "%Y%m%d%H%M%S", localtime(&bgslots_reload_.start_time));
bgslots_reload_.s_start_time.assign(s_time, len);
bgslots_reload_.cursor = 0;
bgslots_reload_.pattern = "*";
bgslots_reload_.count = 100;
LOG(INFO) << "Start slot reloading";
// Start new thread if needed
bgsave_thread_.StartIfNeed();
bgsave_thread_.Schedule(&DoBgslotsreload, static_cast<void*>(this));
}
void PikaServer::DoBgslotsreload(void* arg) {
PikaServer* p = static_cast<PikaServer*>(arg);
BGSlotsReload reload = p->bgslots_reload();
// Do slotsreload
std::vector<std::string> keys;
int64_t cursor_ret = -1;
while(cursor_ret != 0 && p->GetSlotsreloading()){
nemo::Status s = p->db()->Scan(reload.cursor, reload.pattern, reload.count, keys, &cursor_ret);
if (!s.ok()){
LOG(WARNING) << "BG slotsreload error: " <<strerror(errno);
return;
}
std::vector<std::string>::const_iterator iter;
for (iter = keys.begin(); iter != keys.end(); iter++){
std::string key_type;
if ((*iter).find(SlotKeyPrefix) != std::string::npos){
continue;
}
if(KeyType(*iter, key_type) > 0){
SlotKeyAdd(key_type, *iter);
}
}
reload.cursor = cursor_ret;
p->SetSlotsreloadingCursor(cursor_ret);
keys.clear();
}
p->SetSlotsreloading(false);
LOG(INFO) << "Finish slot reloading";
}
bool PikaServer::PurgeLogs(uint32_t to, bool manual, bool force) {
// Only one thread can go through
bool expect = false;
......
......@@ -7,6 +7,7 @@
#include "nemo.h"
#include "pika_set.h"
#include "pika_server.h"
#include "pika_slot.h"
extern PikaServer *g_pika_server;
......@@ -36,6 +37,7 @@ void SAddCmd::Do() {
return;
}
}
SlotKeyAdd("s", key_);
res_.AppendInteger(count);
return;
}
......@@ -55,6 +57,7 @@ void SPopCmd::Do() {
if (s.ok()) {
res_.AppendStringLen(member.size());
res_.AppendContent(member);
KeyNotExistsRem("s", key_);
} else if (s.IsNotFound()) {
res_.AppendContent("$-1");
} else {
......@@ -210,6 +213,7 @@ void SRemCmd::Do() {
}
}
res_.AppendInteger(count);
KeyNotExistsRem("s", key_);
return;
}
......@@ -381,10 +385,17 @@ void SMoveCmd::Do() {
int64_t res = 0;
nemo::Status s = g_pika_server->db()->SMove(src_key_, dest_key_, member_, &res);
if (s.ok() || s.IsNotFound()) {
res_.AppendInteger(res);
if (s.IsNotFound()){
res_.AppendInteger(res);
} else {
res_.AppendInteger(res);
SlotKeyAdd("s", dest_key_);
}
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
KeyNotExistsRem("s", src_key_);
return;
}
......
此差异已折叠。
......@@ -27,8 +27,13 @@ PikaTrysyncThread::~PikaTrysyncThread() {
bool PikaTrysyncThread::Send() {
pink::RedisCmdArgsType argv;
std::string wbuf_str;
std::string masterauth = g_pika_conf->masterauth();
std::string requirepass = g_pika_conf->requirepass();
if (requirepass != "") {
if (masterauth != "") {
argv.push_back("auth");
argv.push_back(masterauth);
pink::RedisCli::SerializeCommand(argv, &wbuf_str);
} else if (requirepass != ""){
argv.push_back("auth");
argv.push_back(requirepass);
pink::RedisCli::SerializeCommand(argv, &wbuf_str);
......
......@@ -7,6 +7,7 @@
#include "nemo.h"
#include "pika_zset.h"
#include "pika_server.h"
#include "pika_slot.h"
extern PikaServer *g_pika_server;
......@@ -48,6 +49,7 @@ void ZAddCmd::Do() {
return;
}
}
SlotKeyAdd("z", key_);
res_.AppendInteger(count);
return;
}
......@@ -176,6 +178,7 @@ void ZIncrbyCmd::Do() {
if (s.ok()) {
res_.AppendStringLen(new_value.size());
res_.AppendContent(new_value);
SlotKeyAdd("z", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -503,6 +506,8 @@ void ZRemCmd::Do() {
}
}
res_.AppendInteger(count);
KeyNotExistsRem("z", key_);
return;
}
......@@ -892,6 +897,7 @@ void ZRemrangebyrankCmd::Do() {
nemo::Status s = g_pika_server->db()->ZRemrangebyrank(key_, start_rank_, stop_rank_, &count);
if (s.ok()) {
res_.AppendInteger(count);
KeyNotExistsRem("z", key_);
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
}
......@@ -923,6 +929,7 @@ void ZRemrangebyscoreCmd::Do() {
res_.SetRes(CmdRes::kErrOther, s.ToString());
return;
}
KeyNotExistsRem("z", key_);
res_.AppendInteger(count);
return;
}
......@@ -958,6 +965,7 @@ void ZRemrangebylexCmd::Do() {
res_.SetRes(CmdRes::kErrOther, s.ToString());
return;
}
KeyNotExistsRem("z", key_);
res_.AppendInteger(count);
return;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册