提交 fd3d236e 编写于 作者: C CatKang

redesign add binlog sender scheduel to solve lock problem between info and trysync

上级 e7a726dd
......@@ -22,7 +22,7 @@ class RedisCli;
class PikaBinlogSenderThread : public pink::Thread {
public:
PikaBinlogSenderThread(std::string &ip, int port, slash::SequentialFile *queue, uint32_t filenum, uint64_t con_offset);
PikaBinlogSenderThread(const std::string &ip, int port, slash::SequentialFile *queue, uint32_t filenum, uint64_t con_offset);
virtual ~PikaBinlogSenderThread();
......
......@@ -80,7 +80,10 @@ public:
}
void DeleteSlave(int fd); // hb_fd
bool FindSlave(std::string& ip_port);
void DeleteSlave(const std::string& ip, int64_t port);
int64_t TryAddSlave(const std::string& ip, int64_t port);
bool SetSlaveSender(const std::string& ip, int64_t port,
PikaBinlogSenderThread* s);
int32_t GetSlaveListString(std::string& slave_list_str);
Status GetSmallestValidLog(uint32_t* max);
void MayUpdateSlavesMap(int64_t sid, int32_t hb_fd);
......@@ -88,7 +91,6 @@ public:
slash::Mutex slave_mutex_; // protect slaves_;
std::vector<SlaveItem> slaves_;
std::vector<PikaBinlogSenderThread *> binlog_sender_threads_;
/*
* Slave use
......@@ -125,7 +127,8 @@ public:
* Binlog
*/
Binlog *logger_;
Status AddBinlogSender(SlaveItem &slave, uint32_t filenum, uint64_t con_offset);
Status AddBinlogSender(const std::string& ip, int64_t port,
uint32_t filenum, uint64_t con_offset);
/*
* BGSave used
......
......@@ -120,33 +120,32 @@ void TrysyncCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info)
}
void TrysyncCmd::Do() {
std::string ip_port = slash::IpPortString(slave_ip_, slave_port_);
LOG(INFO) << "Trysync, Slave ip_port: " << ip_port << " filenum: " << filenum_ << " pro_offset: " << pro_offset_;
slash::MutexLock l(&(g_pika_server->slave_mutex_));
if (!g_pika_server->FindSlave(ip_port)) {
SlaveItem s;
s.sid = g_pika_server->GenSid();
s.ip_port = ip_port;
s.port = slave_port_;
s.hb_fd = -1;
s.stage = SLAVE_ITEM_STAGE_ONE;
gettimeofday(&s.create_time, NULL);
s.sender = NULL;
LOG(INFO) << "Trysync, dont FindSlave, so AddBinlogSender";
Status status = g_pika_server->AddBinlogSender(s, filenum_, pro_offset_);
LOG(INFO) << "Trysync, Slave ip: " << slave_ip_ << "Slave port:" << slave_port_
<< " filenum: " << filenum_ << " pro_offset: " << pro_offset_;
int64_t sid = g_pika_server->TryAddSlave(slave_ip_, slave_port_);
if (sid > 0) {
Status status = g_pika_server->AddBinlogSender(slave_ip_, slave_port_,
filenum_, pro_offset_);
if (status.ok()) {
res_.AppendInteger(s.sid);
LOG(INFO) << "Send Sid to Slave: " << s.sid;
res_.AppendInteger(sid);
LOG(INFO) << "Send Sid to Slave: " << sid;
g_pika_server->BecomeMaster();
} else if (status.IsIncomplete()) {
return;
}
// Create Sender failed, delete the slave
g_pika_server->DeleteSlave(slave_ip_, slave_port_);
if (status.IsIncomplete()) {
res_.AppendString(kInnerReplWait);
} else {
LOG(WARNING) << "slave offset is larger than mine, slave ip: " << ip_port << " filenum: " << filenum_ << " pro_offset_: " << pro_offset_;
LOG(WARNING) << "slave offset is larger than mine, slave ip: " << slave_ip_
<< "slave port:" << slave_port_
<< " filenum: " << filenum_ << " pro_offset_: " << pro_offset_;
res_.SetRes(CmdRes::kErrOther, "InvalidOffset");
}
} else {
LOG(WARNING) << "slave already exist, slave ip: " << ip_port;
LOG(WARNING) << "slave already exist, slave ip: " << slave_ip_
<< "slave port: " << slave_port_;
res_.SetRes(CmdRes::kErrOther, "AlreadyExist");
}
}
......
......@@ -20,7 +20,7 @@ using pink::RedisCli;
extern PikaServer* g_pika_server;
PikaBinlogSenderThread::PikaBinlogSenderThread(std::string &ip, int port, slash::SequentialFile *queue, uint32_t filenum, uint64_t con_offset)
PikaBinlogSenderThread::PikaBinlogSenderThread(const std::string &ip, int port, slash::SequentialFile *queue, uint32_t filenum, uint64_t con_offset)
: con_offset_(con_offset),
filenum_(filenum),
initial_offset_(0),
......
......@@ -108,13 +108,15 @@ PikaServer::~PikaServer() {
{
slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator iter = slaves_.begin();
while (iter != slaves_.end()) {
delete static_cast<PikaBinlogSenderThread*>(iter->sender);
if (iter->sender != NULL) {
delete static_cast<PikaBinlogSenderThread*>(iter->sender);
}
iter = slaves_.erase(iter);
LOG(INFO) << "Delete slave success";
}
}
delete pika_trysync_thread_;
delete ping_thread_;
delete pika_binlog_receiver_thread_;
......@@ -288,6 +290,26 @@ void PikaServer::Start() {
Cleanup();
}
void PikaServer::DeleteSlave(const std::string& ip, int64_t port) {
std::string ip_port = slash::IpPortString(ip, port);
slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator iter = slaves_.begin();
while (iter != slaves_.end()) {
if (iter->ip_port == ip_port) {
break;
}
iter++;
}
if (iter == slaves_.end()) {
return;
}
if (iter->sender != NULL) {
delete static_cast<PikaBinlogSenderThread*>(iter->sender);
}
slaves_.erase(iter);
return;
}
void PikaServer::DeleteSlave(int fd) {
slash::MutexLock l(&slave_mutex_);
......@@ -295,22 +317,9 @@ void PikaServer::DeleteSlave(int fd) {
while (iter != slaves_.end()) {
if (iter->hb_fd == fd) {
//pthread_kill(iter->tid);
// Remove BinlogSender first
// static_cast<PikaBinlogSenderThread*>(iter->sender)->SetExit();
//
// DLOG(INFO) << "DeleteSlave: start join";
// int err = pthread_join(iter->sender_tid, NULL);
// DLOG(INFO) << "DeleteSlave: after join";
// if (err != 0) {
// std::string msg = "can't join thread " + std::string(strerror(err));
// LOG(WARNING) << msg;
// //return Status::Corruption(msg);
// }
delete static_cast<PikaBinlogSenderThread*>(iter->sender);
if (iter->sender != NULL) {
delete static_cast<PikaBinlogSenderThread*>(iter->sender);
}
slaves_.erase(iter);
LOG(INFO) << "Delete slave success";
break;
......@@ -373,34 +382,80 @@ void PikaServer::MayUpdateSlavesMap(int64_t sid, int32_t hb_fd) {
}
}
bool PikaServer::FindSlave(std::string& ip_port) {
// slash::MutexLock l(&slave_mutex_);
// Try add Slave, return slave sid if success,
// return -1 when slave already exist
int64_t PikaServer::TryAddSlave(const std::string& ip, int64_t port) {
std::string ip_port = slash::IpPortString(ip, port);
slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator iter = slaves_.begin();
while (iter != slaves_.end()) {
if (iter->ip_port == ip_port) {
return true;
return -1;
}
iter++;
}
return false;
// Not exist, so add new
LOG(INFO) << "Add new slave, " << ip << ":" << port;
SlaveItem s;
s.sid = GenSid();
s.ip_port = ip_port;
s.port = port;
s.hb_fd = -1;
s.stage = SLAVE_ITEM_STAGE_ONE;
gettimeofday(&s.create_time, NULL);
s.sender = NULL;
slaves_.push_back(s);
return s.sid;
}
int32_t PikaServer::GetSlaveListString(std::string& slave_list_str) {
// Set binlog sender of SlaveItem
bool PikaServer::SetSlaveSender(const std::string& ip, int64_t port,
PikaBinlogSenderThread* s){
std::string ip_port = slash::IpPortString(ip, port);
slash::MutexLock l(&slave_mutex_);
size_t index = 0, slaves_num = slaves_.size();
std::vector<SlaveItem>::iterator iter = slaves_.begin();
while (iter != slaves_.end()) {
if (iter->ip_port == ip_port) {
break;
}
iter++;
}
if (iter == slaves_.end()) {
// Not exist
return false;
}
std::stringstream tmp_stream;
iter->sender = s;
iter->sender_tid = s->thread_id();
LOG(INFO) << "SetSlaveSender ok, tid is " << iter->sender_tid
<< " hd_fd: " << iter->hb_fd << " stage: " << iter->stage;
return true;
}
int32_t PikaServer::GetSlaveListString(std::string& slave_list_str) {
size_t index = 0;
std::string slave_ip_port;
while (index < slaves_num) {
slave_ip_port = slaves_[index].ip_port;
tmp_stream << "slave" << index << ":ip=" << slave_ip_port.substr(0, slave_ip_port.find(":"))
<< ",port=" << slave_ip_port.substr(slave_ip_port.find(":")+1)
<< ",state=" << (slaves_[index].stage == SLAVE_ITEM_STAGE_TWO ? "online" : "offline") << "\r\n";
index++;
std::stringstream tmp_stream;
slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator iter = slaves_.begin();
for (; iter != slaves_.end(); ++iter) {
if ((*iter).sender == NULL) {
// Binlog Sender has not yet created
continue;
}
slave_ip_port =(*iter).ip_port;
tmp_stream << "slave" << index++
<< ":ip=" << slave_ip_port.substr(0, slave_ip_port.find(":"))
<< ",port=" << slave_ip_port.substr(slave_ip_port.find(":")+1)
<< ",state=" << ((*iter).stage == SLAVE_ITEM_STAGE_TWO ? "online" : "offline")
<< "\r\n";
}
slave_list_str.assign(tmp_stream.str());
return slaves_num;
return index;
}
void PikaServer::BecomeMaster() {
......@@ -664,7 +719,8 @@ void PikaServer::DBSyncSendFile(const std::string& ip, int port) {
/*
* BinlogSender
*/
Status PikaServer::AddBinlogSender(SlaveItem &slave, uint32_t filenum, uint64_t con_offset) {
Status PikaServer::AddBinlogSender(const std::string& ip, int64_t port,
uint32_t filenum, uint64_t con_offset) {
// Sanity check
if (con_offset > logger_->file_size()) {
return Status::InvalidArgument("AddBinlogSender invalid offset");
......@@ -676,33 +732,24 @@ Status PikaServer::AddBinlogSender(SlaveItem &slave, uint32_t filenum, uint64_t
return Status::InvalidArgument("AddBinlogSender invalid binlog offset");
}
std::string slave_ip = slave.ip_port.substr(0, slave.ip_port.find(':'));
// Create and set sender
slash::SequentialFile *readfile;
std::string confile = NewFileName(logger_->filename, filenum);
if (!slash::FileExists(confile)) {
// Not found binlog specified by filenum
TryDBSync(slave_ip, slave.port + 3000, cur_filenum);
TryDBSync(ip, port + 3000, cur_filenum);
return Status::Incomplete("Bgsaving and DBSync first");
}
if (!slash::NewSequentialFile(confile, &readfile).ok()) {
return Status::IOError("AddBinlogSender new sequtialfile");
}
PikaBinlogSenderThread* sender = new PikaBinlogSenderThread(slave_ip, slave.port+1000, readfile, filenum, con_offset);
PikaBinlogSenderThread* sender = new PikaBinlogSenderThread(ip,
port + 1000, readfile, filenum, con_offset);
slave.sender = sender;
if (sender->trim() == 0) {
if (sender->trim() == 0 // Error binlog
&& SetSlaveSender(ip, port, sender)) { // SlaveItem not exist
sender->StartThread();
pthread_t tid = sender->thread_id();
slave.sender_tid = tid;
LOG(INFO) << "AddBinlogSender ok, tid is " << slave.sender_tid << " hd_fd: " << slave.hb_fd << " stage: " << slave.stage;
// Add sender
// slash::MutexLock l(&slave_mutex_);
slaves_.push_back(slave);
return Status::OK();
} else {
delete sender;
......@@ -923,6 +970,10 @@ bool PikaServer::GetPurgeWindow(uint32_t &max) {
slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator it;
for (it = slaves_.begin(); it != slaves_.end(); ++it) {
if ((*it).sender == NULL) {
// One Binlog Sender has not yet created, no purge
return false;
}
PikaBinlogSenderThread *pb = static_cast<PikaBinlogSenderThread*>((*it).sender);
uint32_t filenum = pb->filenum();
max = filenum < max ? filenum : max;
......@@ -947,6 +998,10 @@ bool PikaServer::CouldPurge(uint32_t index) {
slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator it;
for (it = slaves_.begin(); it != slaves_.end(); ++it) {
if ((*it).sender == NULL) {
// One Binlog Sender has not yet created, no purge
return false;
}
PikaBinlogSenderThread *pb = static_cast<PikaBinlogSenderThread*>((*it).sender);
uint32_t filenum = pb->filenum();
if (index > filenum) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册