提交 3ab80217 编写于 作者: W wangkang-xy

Merge

#ifndef PIKA_BINLOG_SENDER_THREAD_H_
#define PIKA_BINLOG_SENDER_THREAD_H_
#include "simple_thread.h"
#include "slice.h"
#include "status.h"
#include "env.h"
#include "slash_mutex.h"
class PikaBinlogSenderThread : public pink::SimpleThread {
public:
PikaBinlogSenderThread(std::string &ip, int port, slash::SequentialFile *queue, uint32_t filenum, uint64_t con_offset);
virtual ~PikaBinlogSenderThread();
/*
* Get and Set
*/
uint64_t last_record_offset () {
slash::RWLock l(&rwlock_, false);
return last_record_offset_;
}
uint32_t filenum() {
slash::RWLock l(&rwlock_, false);
return filenum_;
}
uint64_t con_offset() {
slash::RWLock l(&rwlock_, false);
return con_offset_;
}
bool IsExit() {
slash::RWLock l(&rwlock_, false);
return should_exit_;
}
void SetExit() {
slash::RWLock l(&rwlock_, true);
should_exit_ = true;
}
int trim();
uint64_t get_next(bool &is_error);
private:
slash::Status Parse();
slash::Status Consume(std::string &scratch);
unsigned int ReadPhysicalRecord(slash::Slice *fragment);
uint64_t con_offset_;
uint32_t filenum_;
uint64_t initial_offset_;
uint64_t last_record_offset_;
uint64_t end_of_buffer_offset_;
slash::SequentialFile* queue_;
char* const backing_store_;
slash::Slice buffer_;
std::string ip_;
int port_;
bool should_exit_;
pthread_rwlock_t rwlock_;
bool Init();
bool Connect();
bool Send(const std::string &msg);
bool Recv();
int sockfd_;
virtual void* ThreadMain();
};
#endif
...@@ -17,6 +17,7 @@ struct WorkerCronTask { ...@@ -17,6 +17,7 @@ struct WorkerCronTask {
struct SlaveItem { struct SlaveItem {
uint64_t sid; uint64_t sid;
std::string ip_port; std::string ip_port;
int port;
pthread_t sender_tid; pthread_t sender_tid;
int hb_fd; int hb_fd;
int stage; int stage;
...@@ -37,4 +38,43 @@ struct SlaveItem { ...@@ -37,4 +38,43 @@ struct SlaveItem {
#define PIKA_ROLE_SLAVE 1 #define PIKA_ROLE_SLAVE 1
#define PIKA_ROLE_MASTER 2 #define PIKA_ROLE_MASTER 2
/*
* The size of Binlogfile
*/
static uint64_t kBinlogSize = 128;
//static uint64_t kBinlogSize = 1024 * 1024 * 100;
enum RecordType {
kZeroType = 0,
kFullType = 1,
kFirstType = 2,
kMiddleType = 3,
kLastType = 4,
kEof = 5,
kBadRecord = 6,
kOldRecord = 7
};
/*
* the block size that we read and write from write2file
* the default size is 64KB
*/
static const size_t kBlockSize = 64 * 1024;
/*
* Header is Type(1 byte), length (2 bytes)
*/
static const size_t kHeaderSize = 1 + 3;
/*
* the size of memory when we use memory mode
* the default memory size is 2GB
*/
static const int64_t kPoolSize = 1073741824;
static std::string kBinlog = "/binlog";
static std::string kManifest = "/manifest";
#endif #endif
#ifndef PIKA_SERVER_H_ #ifndef PIKA_SERVER_H_
#define PIKA_SERVER_H_ #define PIKA_SERVER_H_
#include <vector>
#include <list>
#include "pika_binlog.h"
#include "pika_binlog_receiver_thread.h" #include "pika_binlog_receiver_thread.h"
#include "pika_binlog_sender_thread.h"
#include "pika_heartbeat_thread.h" #include "pika_heartbeat_thread.h"
#include "pika_dispatch_thread.h" #include "pika_dispatch_thread.h"
#include "pika_trysync_thread.h" #include "pika_trysync_thread.h"
#include "pika_worker_thread.h" #include "pika_worker_thread.h"
#include "pika_define.h" #include "pika_define.h"
#include "slash_status.h"
using slash::Status;
using slash::Slice;
class PikaServer class PikaServer
{ {
public: public:
...@@ -45,8 +55,12 @@ public: ...@@ -45,8 +55,12 @@ public:
* Master use * Master use
*/ */
void DeleteSlave(int fd); // hb_fd void DeleteSlave(int fd); // hb_fd
Status GetSmallestValidLog(uint32_t* max);
slash::Mutex slave_mutex_; // protect slaves_; slash::Mutex slave_mutex_; // protect slaves_;
std::vector<SlaveItem> slaves_; std::vector<SlaveItem> slaves_;
// pthread_mutex_t binlog_sender_mutex_;
std::vector<PikaBinlogSenderThread *> binlog_sender_threads_;
/* /*
* Slave use * Slave use
...@@ -58,10 +72,14 @@ public: ...@@ -58,10 +72,14 @@ public:
void MinusMasterConnection(); void MinusMasterConnection();
void PlusMasterConnection(); void PlusMasterConnection();
void Start(); void Start();
slash::Mutex mutex_; // double lock to block main thread slash::Mutex mutex_; // double lock to block main thread
/*
* Binlog
*/
Binlog *logger;
Status AddBinlogSender(SlaveItem &slave, uint32_t filenum, uint64_t con_offset);
private: private:
int port_; int port_;
...@@ -72,9 +90,10 @@ private: ...@@ -72,9 +90,10 @@ private:
PikaHeartbeatThread* pika_heartbeat_thread_; PikaHeartbeatThread* pika_heartbeat_thread_;
PikaTrysyncThread* pika_trysync_thread_; PikaTrysyncThread* pika_trysync_thread_;
/*
* Slave use /*
*/ * Slave use
*/
pthread_rwlock_t state_protector_; //protect below, use for master-slave mode pthread_rwlock_t state_protector_; //protect below, use for master-slave mode
std::string master_ip_; std::string master_ip_;
int master_connection_; int master_connection_;
......
...@@ -38,4 +38,5 @@ private: ...@@ -38,4 +38,5 @@ private:
void ClientKill(std::string ip_port); void ClientKill(std::string ip_port);
void ClientKillAll(); void ClientKillAll();
}; };
#endif #endif
...@@ -23,11 +23,19 @@ static void PikaSignalSetup() { ...@@ -23,11 +23,19 @@ static void PikaSignalSetup() {
} }
int main() int main(int argc, char *argv[])
{ {
if (argc < 2) {
printf ("Usage: ./pika port\n");
exit(-1);
}
PikaGlogInit(); PikaGlogInit();
PikaSignalSetup(); PikaSignalSetup();
g_pika_server = new PikaServer(9221);
DLOG(INFO) << "Server at: " << argv[1];
g_pika_server = new PikaServer(atoi(argv[1]));
g_pika_server->Start(); g_pika_server->Start();
return 0; return 0;
......
#include "pika_binlog.h"
#include <iostream>
#include <string>
#include <stdint.h>
#include <signal.h>
#include <unistd.h>
#include <glog/logging.h>
#include "slash_mutex.h"
using slash::RWLock;
std::string NewFileName(const std::string name, const uint32_t current) {
char buf[256];
snprintf(buf, sizeof(buf), "%s%u", name.c_str(), current);
return std::string(buf);
}
/*
* Version
*/
Version::Version(slash::RWFile *save) : save_(save) {
assert(save_ != NULL);
pro_offset_ = 0;
con_offset_ = 0;
item_num_ = 0;
pronum_ = 0;
connum_ = 0;
pthread_rwlock_init(&rwlock_, NULL);
}
Version::~Version() {
StableSave();
pthread_rwlock_destroy(&rwlock_);
}
Status Version::StableSave() {
slash::RWLock(&rwlock_, true);
char *p = save_->GetData();
memcpy(p, &pro_offset_, sizeof(uint64_t));
p += 8;
memcpy(p, &con_offset_, sizeof(uint64_t));
p += 8;
memcpy(p, &item_num_, sizeof(uint32_t));
p += 4;
memcpy(p, &pronum_, sizeof(uint32_t));
p += 4;
memcpy(p, &connum_, sizeof(uint32_t));
p += 4;
return Status::OK();
}
Status Version::Init() {
RWLock(&rwlock_, false);
Status s;
if (save_->GetData() != NULL) {
memcpy((char*)(&pro_offset_), save_->GetData(), sizeof(uint64_t));
memcpy((char*)(&con_offset_), save_->GetData() + 8, sizeof(uint64_t));
memcpy((char*)(&item_num_), save_->GetData() + 16, sizeof(uint32_t));
memcpy((char*)(&pronum_), save_->GetData() + 20, sizeof(uint32_t));
memcpy((char*)(&connum_), save_->GetData() + 24, sizeof(uint32_t));
// DLOG(INFO) << "Version Init pro_offset "<< pro_offset_ << " itemnum " << item_num << " pronum " << pronum_ << " connum " << connum_;
return Status::OK();
} else {
return Status::Corruption("version init error");
}
}
/*
* Binlog
*/
Binlog::Binlog(const char* Binlog_path) :
version_(NULL),
consumer_num_(0),
item_num_(0),
queue_(NULL),
versionfile_(NULL),
pronum_(0),
//retry_(retry),
pool_(NULL),
exit_all_consume_(false),
binlog_path_(Binlog_path) {
slash::SetMmapBoundSize(1024 * 1024 * 100);
//slash::kMmapBoundSize = 1024 * 1024 * 100;
Status s;
slash::CreateDir(binlog_path_);
filename = binlog_path_ + kBinlog;
const std::string manifest = binlog_path_ + kManifest;
std::string profile;
if (!slash::FileExists(manifest)) {
DLOG(INFO) << "Binlog: Manifest file not exist";
profile = NewFileName(filename, pronum_);
s = slash::NewWritableFile(profile, &queue_);
if (!s.ok()) {
LOG(WARNING) << "Binlog: new " << filename << " " << s.ToString();
}
s = slash::NewRWFile(manifest, &versionfile_);
if (!s.ok()) {
LOG(WARNING) << "Binlog: new versionfile error " << s.ToString();
}
version_ = new Version(versionfile_);
version_->StableSave();
} else {
DLOG(INFO) << "Binlog: Find the exist file ";
s = slash::NewRWFile(manifest, &versionfile_);
if (s.ok()) {
version_ = new Version(versionfile_);
version_->Init();
pronum_ = version_->pronum();
// Debug
version_->debug();
} else {
LOG(WARNING) << "Binlog: open versionfile error";
}
profile = NewFileName(filename, pronum_);
DLOG(INFO) << "Binlog: open profile " << profile;
slash::AppendWritableFile(profile, &queue_, version_->pro_offset());
uint64_t filesize = queue_->Filesize();
DLOG(INFO) << "Binlog: filesize is " << filesize;
}
InitLogFile();
}
Binlog::~Binlog() {
delete version_;
delete versionfile_;
delete queue_;
}
void Binlog::InitLogFile() {
assert(queue_ != NULL);
uint64_t filesize = queue_->Filesize();
block_offset_ = filesize % kBlockSize;
}
Status Binlog::GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset) {
*filenum = version_->pronum();
*pro_offset = version_->pro_offset();
return Status::OK();
}
Status Binlog::Put(const std::string &item) {
Status s;
/* Check to roll log file */
uint64_t filesize = queue_->Filesize();
if (filesize > kBinlogSize) {
//log_info("roll file filesize %llu kBinlogSize %llu\n", filesize, kBinlogSize);
delete queue_;
queue_ = NULL;
pronum_++;
std::string profile = NewFileName(filename, pronum_);
slash::NewWritableFile(profile, &queue_);
version_->set_pro_offset(0);
version_->set_pronum(pronum_);
version_->StableSave();
version_->debug();
InitLogFile();
}
s = Produce(Slice(item.data(), item.size()));
if (s.ok()) {
version_->plus_item_num();
version_->StableSave();
}
return s;
}
Status Binlog::Put(const char* item, int len) {
Status s;
/* Check to roll log file */
uint64_t filesize = queue_->Filesize();
if (filesize > kBinlogSize) {
//log_info("roll file filesize %llu kBinlogSize %llu\n", filesize, kBinlogSize);
delete queue_;
queue_ = NULL;
pronum_++;
std::string profile = NewFileName(filename, pronum_);
slash::NewWritableFile(profile, &queue_);
version_->set_pro_offset(0);
version_->set_pronum(pronum_);
version_->StableSave();
version_->debug();
InitLogFile();
}
s = Produce(Slice(item, len));
if (s.ok()) {
version_->plus_item_num();
version_->StableSave();
}
return s;
}
Status Binlog::EmitPhysicalRecord(RecordType t, const char *ptr, size_t n) {
Status s;
assert(n <= 0xffffff);
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
char buf[kHeaderSize];
buf[0] = static_cast<char>(n & 0xff);
buf[1] = static_cast<char>((n & 0xff00) >> 8);
buf[2] = static_cast<char>(n >> 16);
buf[3] = static_cast<char>(t);
s = queue_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
s = queue_->Append(Slice(ptr, n));
if (s.ok()) {
s = queue_->Flush();
}
}
block_offset_ += static_cast<int>(kHeaderSize + n);
// log_info("block_offset %d", (kHeaderSize + n));
version_->rise_pro_offset((uint64_t)(kHeaderSize + n));
version_->StableSave();
return s;
}
Status Binlog::Produce(const Slice &item) {
Status s;
const char *ptr = item.data();
size_t left = item.size();
bool begin = true;
do {
const int leftover = static_cast<int>(kBlockSize) - block_offset_;
assert(leftover >= 0);
if (static_cast<size_t>(leftover) < kHeaderSize) {
if (leftover > 0) {
queue_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00", leftover));
version_->rise_pro_offset(leftover);
version_->StableSave();
}
block_offset_ = 0;
}
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;
} else if (begin) {
type = kFirstType;
} else if (end) {
type = kLastType;
} else {
type = kMiddleType;
}
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}
// // TODO Skip con_offset
// Status Binlog::SetConsumer(int fd, uint32_t filenum, uint64_t con_offset) {
// std::list<ConsumerItem *>::iterator it;
// for (it = consumers_.begin(); it != consumers_.end(); it++) {
// if ((*it)->fd_ == fd) {
// ConsumerItem *c = *it;
// std::string confile = NewFileName(filename, filenum);
//
// //if (slash::FileExists(confile)) {
// SequentialFile *readfile;
// Status s = slash::AppendSequentialFile(confile, &readfile);
// if (!s.ok()){
// return s;
// }
//
// mutex_.Lock();
// delete c->readfile_;
// c->readfile_ = readfile;
//
// delete c->consumer_;
// c->consumer_ = new Consumer(c->readfile_, c->h_, 0, filenum);
// int ret = c->consumer_->trim();
// mutex_.Unlock();
//
// if (ret != 0) {
// return Status::InvalidArgument("invalid offset");
// }
// return Status::OK();
// //} else {
// // return Status::InvalidArgument();
// //}
// }
// }
// return Status::NotFound("");
// }
//
//
// Status Binlog::GetConsumerStatus(int fd, uint32_t *filenum, uint64_t *con_offset) {
// std::list<ConsumerItem *>::iterator it;
// for (it = consumers_.begin(); it != consumers_.end(); it++) {
// if ((*it)->fd_ == fd) {
// *filenum = (*it)->consumer_->filenum();
// *con_offset = (*it)->consumer_->con_offset();
//
// return Status::OK();
// }
// }
// return Status::NotFound("");
// }
Status Binlog::AppendBlank(slash::WritableFile *file, uint64_t len) {
if (len < kHeaderSize) {
return Status::OK();
}
uint64_t pos = 0;
std::string blank(kBlockSize, ' ');
for (; pos + kBlockSize < len; pos += kBlockSize) {
file->Append(Slice(blank.data(), blank.size()));
}
// Append a msg which occupy the remain part of the last block
uint32_t n = (uint32_t) ((len % kBlockSize) - kHeaderSize);
char buf[kBlockSize];
buf[0] = static_cast<char>(n & 0xff);
buf[1] = static_cast<char>((n & 0xff00) >> 8);
buf[2] = static_cast<char>(n >> 16);
buf[3] = static_cast<char>(kFullType);
Status s = file->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
s = file->Append(Slice(blank.data(), n));
if (s.ok()) {
s = file->Flush();
}
}
return s;
}
Status Binlog::SetProducerStatus(uint32_t pronum, uint64_t pro_offset) {
slash::MutexLock l(&mutex_);
// offset smaller than the first header
if (pro_offset < 4) {
pro_offset = 0;
}
if (queue_ != NULL) {
delete queue_;
}
std::string init_profile = NewFileName(filename, 0);
if (slash::FileExists(init_profile)) {
slash::DeleteFile(init_profile);
}
std::string profile = NewFileName(filename, pronum);
if (slash::FileExists(profile)) {
slash::DeleteFile(profile);
}
slash::NewWritableFile(profile, &queue_);
Binlog::AppendBlank(queue_, pro_offset);
pronum_ = pronum;
version_->set_pronum(pronum);
version_->set_pro_offset(pro_offset);
version_->StableSave();
InitLogFile();
return Status::OK();
}
#ifndef PIKA_BINLOG_H_
#define PIKA_BINLOG_H_
#include <cstdio>
#include <list>
#include <deque>
#include <pthread.h>
#ifndef __STDC_FORMAT_MACROS
# define __STDC_FORMAT_MACROS
# include <inttypes.h>
#endif
#include "env.h"
//#include "port.h"
#include "pika_define.h"
#include "slash_status.h"
#include "slash_mutex.h"
using slash::Status;
using slash::Slice;
std::string NewFileName(const std::string name, const uint32_t current);
class Version;
class Binlog
{
public:
Binlog(const char* Binlog_path);
~Binlog();
void Lock() { mutex_.Lock(); }
void Unlock() { mutex_.Unlock(); }
Status Put(const std::string &item);
Status Put(const char* item, int len);
Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset);
/*
* Set Producer pronum and pro_offset with lock
*/
Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset);
// set the filenum and con_offset of the consumer which has the given ip and port;
// return NotFound when can not find the consumer with the given ip and port;
// return InvalidArgument when the filenum and con_offset are invalid;
Status SetConsumer(int fd, uint32_t filenum, uint64_t con_offset);
// no lock
Status GetConsumerStatus(int fd, uint32_t* filenum, uint64_t* con_offset);
static Status AppendBlank(slash::WritableFile *file, uint64_t len);
slash::WritableFile *queue() { return queue_; }
//slash::WritableFile *writefile() { return writefile_; }
std::string filename;
Version* version_;
/*
* Produce
*/
Status Produce(const Slice &item);
private:
void InitLogFile();
Status EmitPhysicalRecord(RecordType t, const char *ptr, size_t n);
uint32_t consumer_num_;
uint64_t item_num_;
slash::WritableFile *queue_;
slash::RWFile *versionfile_;
slash::Mutex mutex_;
uint32_t pronum_;
int32_t retry_;
int block_offset_;
char* pool_;
bool exit_all_consume_;
const std::string binlog_path_;
// No copying allowed
Binlog(const Binlog&);
void operator=(const Binlog&);
};
class Version {
public:
Version(slash::RWFile *save);
~Version();
// Status Recovery(WritableFile *save);
Status StableSave();
Status Init();
uint64_t pro_offset() {
slash::RWLock(&rwlock_, false);
return pro_offset_;
}
void set_pro_offset(uint64_t pro_offset) {
slash::RWLock(&rwlock_, true);
pro_offset_ = pro_offset;
}
void rise_pro_offset(uint64_t r) {
slash::RWLock(&rwlock_, true);
pro_offset_ += r;
}
uint64_t con_offset() {
slash::RWLock(&rwlock_, false);
return con_offset_;
}
void set_con_offset(uint64_t con_offset) {
slash::RWLock(&rwlock_, true);
con_offset_ = con_offset;
}
void rise_con_offset(uint64_t r) {
slash::RWLock(&rwlock_, true);
con_offset_ += r;
}
uint32_t item_num() {
slash::RWLock(&rwlock_, false);
return item_num_;
}
void set_item_num(uint32_t item_num) {
slash::RWLock(&rwlock_, true);
item_num_ = item_num;
}
void plus_item_num() {
slash::RWLock(&rwlock_, true);
item_num_++;
}
void minus_item_num() {
slash::RWLock(&rwlock_, true);
item_num_--;
}
uint32_t pronum() {
slash::RWLock(&rwlock_, false);
return pronum_;
}
void set_pronum(uint32_t pronum) {
slash::RWLock(&rwlock_, true);
pronum_ = pronum;
}
uint32_t connum() {
slash::RWLock(&rwlock_, false);
return connum_;
}
void set_connum(uint32_t connum) {
slash::RWLock(&rwlock_, true);
connum_ = connum;
}
void debug() {
slash::RWLock(&rwlock_, false);
printf ("Current pro_offset %lu con_offset %lu itemnum %u pronum %u connum %u",
pro_offset_, con_offset_, item_num_, pronum_, connum_);
}
private:
uint64_t pro_offset_;
uint64_t con_offset_;
uint32_t item_num_;
uint32_t pronum_;
uint32_t connum_;
slash::RWFile *save_;
pthread_rwlock_t rwlock_;
// port::Mutex mutex_;
// No copying allowed;
Version(const Version&);
void operator=(const Version&);
};
#endif
#include "pika_binlog_sender_thread.h"
#include <glog/logging.h>
#include <poll.h>
#include "pika_server.h"
#include "pika_define.h"
#include "pika_binlog_sender_thread.h"
#include "pika_master_conn.h"
using slash::Status;
using slash::Slice;
extern PikaServer* g_pika_server;
PikaBinlogSenderThread::PikaBinlogSenderThread(std::string &ip, int port, slash::SequentialFile *queue, uint32_t filenum, uint64_t con_offset) :
con_offset_(con_offset),
filenum_(filenum),
initial_offset_(0),
end_of_buffer_offset_(kBlockSize),
queue_(queue),
backing_store_(new char[kBlockSize]),
buffer_(),
ip_(ip),
port_(port),
should_exit_(false) {
last_record_offset_ = con_offset % kBlockSize;
}
PikaBinlogSenderThread::~PikaBinlogSenderThread() {
delete [] backing_store_;
}
int PikaBinlogSenderThread::trim() {
slash::Status s;
uint64_t start_block = (con_offset_ / kBlockSize) * kBlockSize;
s = queue_->Skip((con_offset_ / kBlockSize) * kBlockSize);
uint64_t block_offset = con_offset_ % kBlockSize;
uint64_t ret = 0;
uint64_t res = 0;
bool is_error = false;
while (true) {
if (res >= block_offset) {
con_offset_ = start_block + res;
break;
}
ret = get_next(is_error);
if (is_error == true) {
return -1;
}
res += ret;
}
last_record_offset_ = con_offset_ % kBlockSize;
return 0;
}
uint64_t PikaBinlogSenderThread::get_next(bool &is_error) {
uint64_t offset = 0;
slash::Status s;
is_error = false;
while (true) {
buffer_.clear();
s = queue_->Read(kHeaderSize, &buffer_, backing_store_);
if (!s.ok()) {
is_error = true;
}
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[0]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[1]) & 0xff;
const uint32_t c = static_cast<uint32_t>(header[2]) & 0xff;
const unsigned int type = header[3];
const uint32_t length = a | (b << 8) | (c << 16);
if (type == kFullType) {
s = queue_->Read(length, &buffer_, backing_store_);
offset += kHeaderSize + length;
break;
} else if (type == kFirstType) {
s = queue_->Read(length, &buffer_, backing_store_);
offset += kHeaderSize + length;
} else if (type == kMiddleType) {
s = queue_->Read(length, &buffer_, backing_store_);
offset += kHeaderSize + length;
} else if (type == kLastType) {
s = queue_->Read(length, &buffer_, backing_store_);
offset += kHeaderSize + length;
break;
} else {
is_error = true;
break;
}
}
return offset;
}
unsigned int PikaBinlogSenderThread::ReadPhysicalRecord(slash::Slice *result) {
slash::Status s;
if (end_of_buffer_offset_ - last_record_offset_ <= kHeaderSize) {
queue_->Skip(end_of_buffer_offset_ - last_record_offset_);
con_offset_ += (end_of_buffer_offset_ - last_record_offset_);
last_record_offset_ = 0;
}
buffer_.clear();
s = queue_->Read(kHeaderSize, &buffer_, backing_store_);
if (s.IsEndFile()) {
return kEof;
} else if (!s.ok()) {
return kBadRecord;
}
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[0]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[1]) & 0xff;
const uint32_t c = static_cast<uint32_t>(header[2]) & 0xff;
const unsigned int type = header[3];
const uint32_t length = a | (b << 8) | (c << 16);
if (type == kZeroType || length == 0) {
buffer_.clear();
return kOldRecord;
}
buffer_.clear();
//std::cout<<"2 --> con_offset_: "<<con_offset_<<" last_record_offset_: "<<last_record_offset_<<std::endl;
s = queue_->Read(length, &buffer_, backing_store_);
*result = slash::Slice(buffer_.data(), buffer_.size());
last_record_offset_ += kHeaderSize + length;
if (s.ok()) {
con_offset_ += (kHeaderSize + length);
}
return type;
}
Status PikaBinlogSenderThread::Consume(std::string &scratch) {
Status s;
if (last_record_offset_ < initial_offset_) {
return slash::Status::IOError("last_record_offset exceed");
}
slash::Slice fragment;
while (true) {
const unsigned int record_type = ReadPhysicalRecord(&fragment);
switch (record_type) {
case kFullType:
scratch = std::string(fragment.data(), fragment.size());
s = Status::OK();
break;
case kFirstType:
scratch.assign(fragment.data(), fragment.size());
s = Status::NotFound("Middle Status");
break;
case kMiddleType:
scratch.append(fragment.data(), fragment.size());
s = Status::NotFound("Middle Status");
break;
case kLastType:
scratch.append(fragment.data(), fragment.size());
s = Status::OK();
break;
case kEof:
return Status::EndFile("Eof");
case kBadRecord:
return Status::Corruption("Data Corruption");
case kOldRecord:
return Status::EndFile("Eof");
default:
return Status::Corruption("Unknow reason");
}
// TODO:do handler here
if (s.ok()) {
break;
}
}
//DLOG(INFO) << "Binlog Sender consumer a msg: " << scratch;
return Status::OK();
}
bool PikaBinlogSenderThread::Init() {
sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd_ == -1) {
LOG(WARNING) << "BinlogSender socket error: " << strerror(errno);
return false;
}
int flags = fcntl(sockfd_, F_GETFL, 0);
fcntl(sockfd_, F_SETFL, flags | O_NONBLOCK);
int yes = 1;
if (setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1) {
LOG(WARNING) << "BinlogSender setsockopt SO_REUSEADDR error: " << strerror(errno);
return false;
}
if (setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)) == -1) {
LOG(WARNING) << "BinlogSender setsockopt SO_KEEPALIVE: error: " << strerror(errno);
return false;
}
return true;
}
bool PikaBinlogSenderThread::Connect() {
struct sockaddr_in s_addr;
memset(&s_addr, 0, sizeof(s_addr));
s_addr.sin_family = AF_INET;
s_addr.sin_addr.s_addr = inet_addr(ip_.c_str());
s_addr.sin_port = htons(port_);
if (-1 == connect(sockfd_, (struct sockaddr*)(&s_addr), sizeof(s_addr))) {
if (errno == EINPROGRESS) {
struct pollfd wfd[1];
wfd[0].fd = sockfd_;
wfd[0].events = POLLOUT;
int res;
if ((res = poll(wfd, 1, 500)) == -1) {
LOG(WARNING) << "BinlogSender Connect, poll error: " << strerror(errno);
return false;
} else if (res == 0) {
LOG(WARNING) << "BinlogSender Connect, timeout";
return false;
}
int err = 0;
socklen_t errlen = sizeof(err);
if (getsockopt(sockfd_, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) {
LOG(WARNING) << "BinlogSender Connect, getsockopt error";
return false;
}
if (err) {
errno = err;
LOG(WARNING) << "BinlogSender Connect, error: " << strerror(errno);
return false;
}
}
}
return true;
}
bool PikaBinlogSenderThread::Send(const std::string &msg) {
// length to small
char wbuf[1024];
int wbuf_len = 17;
int wbuf_pos = 0;
int nwritten = 0;
memcpy(wbuf, msg.data(), msg.size());
while (1) {
while (wbuf_len > 0) {
nwritten = write(sockfd_, wbuf + wbuf_pos, wbuf_len - wbuf_pos);
if (nwritten < 0) {
break;
}
wbuf_pos += nwritten;
if (wbuf_pos == wbuf_len) {
wbuf_len = 0;
}
}
if (nwritten == -1) {
if (errno == EAGAIN) {
continue;
} else {
LOG(WARNING) << "BinlogSender Send, error: " << strerror(errno);
return false;
}
}
if (wbuf_len == 0) {
return true;
}
}
}
bool PikaBinlogSenderThread::Recv() {
char rbuf[256];
int rbuf_pos = 0;
int nread = 0;
while (1) {
nread = read(sockfd_, rbuf + rbuf_pos, 1);
if (nread == -1) {
if (errno == EAGAIN) {
continue;
} else {
LOG(WARNING) << "BinlogSender Recv error: " << strerror(errno);
return false;
}
} else if (nread == 0) {
LOG(WARNING) << "BinlogSender slave close the connection";
return false;
}
if (rbuf[rbuf_pos] == '\n') {
rbuf[rbuf_pos] = '\0';
rbuf_pos--;
if (rbuf_pos >= 0 && rbuf[rbuf_pos] == '\r') {
rbuf[rbuf_pos] = '\0';
rbuf_pos--;
}
break;
}
rbuf_pos++;
}
DLOG(INFO) << "BinlogSender Reply from slave after : " << std::string(rbuf, rbuf_pos+1);
if (rbuf[0] == '+') {
return true;
} else {
return false;
}
}
Status PikaBinlogSenderThread::Parse() {
std::string scratch("");
Status s;
Version* version = g_pika_server->logger->version_;
while (!IsExit()) {
if (filenum_ == version->pronum() && con_offset_ == version->pro_offset()) {
DLOG(INFO) << "BinlogSender Parse no new msg";
usleep(10000);
continue;
}
scratch = "";
s = Consume(scratch);
//DLOG(INFO) << "BinlogSender Parse a msg return: " << s.ToString();
if (s.IsEndFile()) {
std::string confile = NewFileName(g_pika_server->logger->filename, filenum_ + 1);
// Roll to next File
if (slash::FileExists(confile)) {
DLOG(INFO) << "BinlogSender roll to new binlog" << confile;
delete queue_;
queue_ = NULL;
slash::NewSequentialFile(confile, &(queue_));
filenum_++;
con_offset_ = 0;
initial_offset_ = 0;
end_of_buffer_offset_ = kBlockSize;
last_record_offset_ = con_offset_ % kBlockSize;
} else {
usleep(10000);
}
} else if (s.ok()) {
DLOG(INFO) << "BinlogSender Parse ok, filenum = " << filenum_ << ", con_offset = " << con_offset_;
DLOG(INFO) << "BinlogSender Parse a msg" << scratch;
if (Send(scratch) && Recv()) {
return s;
} else {
return Status::Corruption("Send or Recv error");
}
} else if (s.IsCorruption()) {
return s;
}
}
return s;
}
void* PikaBinlogSenderThread::ThreadMain() {
Status s;
// 1. Connect to slave
while (!IsExit()) {
DLOG(INFO) << "BinlogSender start Connect";
if (Init()) {
if (Connect()) {
DLOG(INFO) << "BinlogSender Connect slave(" << ip_ << ":" << port_ << ") ok";
do {
s = Parse();
} while (s.ok());
} else {
close(sockfd_);
}
}
sleep(5);
}
return NULL;
// pthread_exit(NULL);
}
#include <glog/logging.h> #include <glog/logging.h>
#include "pika_server.h"
#include "pika_client_conn.h" #include "pika_client_conn.h"
extern PikaServer* g_pika_server;
PikaClientConn::PikaClientConn(int fd, std::string ip_port, pink::Thread* thread) : PikaClientConn::PikaClientConn(int fd, std::string ip_port, pink::Thread* thread) :
RedisConn(fd, ip_port) { RedisConn(fd, ip_port) {
pika_thread_ = reinterpret_cast<PikaWorkerThread*>(thread); pika_thread_ = reinterpret_cast<PikaWorkerThread*>(thread);
...@@ -11,6 +15,43 @@ PikaClientConn::~PikaClientConn() { ...@@ -11,6 +15,43 @@ PikaClientConn::~PikaClientConn() {
int PikaClientConn::DealMessage() { int PikaClientConn::DealMessage() {
PlusConnQuerynum(); PlusConnQuerynum();
/* Sync cmd
* 1 logger->Lock()
* 2 cmd->Do
* 3 AppendLog
* 4 logger->Unlock
*/
//logger->Lock();
// TEST trysync
if (argv_[0] == "trysync") {
DLOG(INFO) << "recieve \"trysync\"";
// Test BinlogSender
SlaveItem slave;
slave.ip_port = "127.0.0.1:9922";
slave.port = 9922;
Status s = g_pika_server->AddBinlogSender(slave, 0, 0);
if (s.ok()) {
DLOG(INFO) << "AddBinlogSender ok";
} else {
DLOG(INFO) << "AddBinlogSender failed, " << s.ToString();
}
} else if (argv_[0] == "slaveof") {
DLOG(INFO) << "recieve \"slaveof\"";
DLOG(INFO) << "SetMaster " << g_pika_server->SetMaster("127.0.0.1", 9821);
} else {
// TEST Put
for (int i = 0; i < 10; i++) {
DLOG(INFO) << "Logger Put a msg:" << i << ";";
g_pika_server->logger->Put(std::string("*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$1\r\n1\r\n"));
}
}
memcpy(wbuf_ + wbuf_len_, "+OK\r\n", 5); memcpy(wbuf_ + wbuf_len_, "+OK\r\n", 5);
wbuf_len_ += 5; wbuf_len_ += 5;
return 0; return 0;
......
...@@ -52,6 +52,8 @@ void PikaHeartbeatThread::CronHandle() { ...@@ -52,6 +52,8 @@ void PikaHeartbeatThread::CronHandle() {
if ((iter->stage = SLAVE_ITEM_STAGE_ONE && now.tv_sec - iter->create_time.tv_sec > 30) if ((iter->stage = SLAVE_ITEM_STAGE_ONE && now.tv_sec - iter->create_time.tv_sec > 30)
|| (iter->stage == SLAVE_ITEM_STAGE_TWO && !FindSlave(iter->hb_fd))) { || (iter->stage == SLAVE_ITEM_STAGE_TWO && !FindSlave(iter->hb_fd))) {
//pthread_kill(iter->tid); //pthread_kill(iter->tid);
// Kill BinlogSender
iter = g_pika_server->slaves_.erase(iter); iter = g_pika_server->slaves_.erase(iter);
continue; continue;
} }
......
#include <glog/logging.h> #include <glog/logging.h>
#include "env.h"
#include "pika_server.h" #include "pika_server.h"
PikaServer::PikaServer(int port) : PikaServer::PikaServer(int port) :
...@@ -17,6 +19,15 @@ PikaServer::PikaServer(int port) : ...@@ -17,6 +19,15 @@ PikaServer::PikaServer(int port) :
pika_binlog_receiver_thread_ = new PikaBinlogReceiverThread(port_ + 100); pika_binlog_receiver_thread_ = new PikaBinlogReceiverThread(port_ + 100);
pika_heartbeat_thread_ = new PikaHeartbeatThread(port_ + 200, 1000); pika_heartbeat_thread_ = new PikaHeartbeatThread(port_ + 200, 1000);
pika_trysync_thread_ = new PikaTrysyncThread(); pika_trysync_thread_ = new PikaTrysyncThread();
pthread_rwlock_init(&state_protector_, NULL);
logger = new Binlog("./log");
}
PikaServer::~PikaServer() {
pthread_rwlock_destroy(&state_protector_);
//delete logger;
} }
void PikaServer::Start() { void PikaServer::Start() {
...@@ -26,7 +37,7 @@ void PikaServer::Start() { ...@@ -26,7 +37,7 @@ void PikaServer::Start() {
pika_trysync_thread_->StartThread(); pika_trysync_thread_->StartThread();
SetMaster("127.0.0.1", 9211); //SetMaster("127.0.0.1", 9221);
mutex_.Lock(); mutex_.Lock();
mutex_.Lock(); mutex_.Lock();
...@@ -37,9 +48,25 @@ void PikaServer::Start() { ...@@ -37,9 +48,25 @@ void PikaServer::Start() {
void PikaServer::DeleteSlave(int fd) { void PikaServer::DeleteSlave(int fd) {
slash::MutexLock l(&slave_mutex_); slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator iter = slaves_.begin(); std::vector<SlaveItem>::iterator iter = slaves_.begin();
while (iter != slaves_.end()) { while (iter != slaves_.end()) {
if (iter->hb_fd == fd) { if (iter->hb_fd == fd) {
//pthread_kill(iter->tid); //pthread_kill(iter->tid);
// Remove BinlogSender first
std::vector<PikaBinlogSenderThread *>::iterator sender = binlog_sender_threads_.begin() + (iter - slaves_.begin());
(*sender)->SetExit();
int err = pthread_join(iter->sender_tid, NULL);
if (err != 0) {
std::string msg = "can't join thread " + std::string(strerror(err));
LOG(WARNING) << msg;
//return Status::Corruption(msg);
}
delete (*sender);
binlog_sender_threads_.erase(sender);
slaves_.erase(iter); slaves_.erase(iter);
break; break;
} }
...@@ -103,3 +130,51 @@ void PikaServer::PlusMasterConnection() { ...@@ -103,3 +130,51 @@ void PikaServer::PlusMasterConnection() {
} }
} }
} }
/*
* BinlogSender
*/
Status PikaServer::AddBinlogSender(SlaveItem &slave, uint32_t filenum, uint64_t con_offset) {
if (con_offset > kBinlogSize) {
return Status::InvalidArgument("AddBinlogSender invalid offset");
}
slash::SequentialFile *readfile;
std::string confile = NewFileName(logger->filename, filenum);
if (!slash::NewSequentialFile(confile, &readfile).ok()) {
return Status::IOError("AddBinlogSender new sequtialfile");
}
std::string slave_ip = slave.ip_port.substr(0, slave.ip_port.find(':'));
PikaBinlogSenderThread* sender = new PikaBinlogSenderThread(slave_ip, slave.port, readfile, con_offset, filenum);
if (sender->trim() == 0) {
sender->StartThread();
pthread_t tid = sender->thread_id();
DLOG(INFO) << "AddBinlogSender ok, tid is " << tid;
// Add sender
slash::MutexLock l(&slave_mutex_);
binlog_sender_threads_.push_back(sender);
return Status::OK();
} else {
DLOG(INFO) << "AddBinlogSender failed";
return Status::NotFound("AddBinlogSender bad sender");
}
}
Status PikaServer::GetSmallestValidLog(uint32_t* max) {
slash::MutexLock l(&slave_mutex_);
std::vector<PikaBinlogSenderThread *>::iterator iter;
*max = logger->version_->pronum();
for (iter = binlog_sender_threads_.begin(); iter != binlog_sender_threads_.end(); iter++) {
int tmp = (*iter)->filenum();
if (tmp < *max) {
*max = tmp;
}
}
return Status::OK();
}
Subproject commit 663e8cb902e4f70deb5669c9a9e7f3a4db729471 Subproject commit 7400ec188aa71d32c7bf1d26c690fe9ef9b88c82
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册