提交 66652786 编写于 作者: W wenduo

binlog_sender support range input and store failed data now

上级 01ff5e2a
......@@ -15,14 +15,9 @@
* Binlog
*/
Binlog::Binlog(const std::string& binlog_path, const int file_size) :
consumer_num_(0),
item_num_(0),
version_(NULL),
queue_(NULL),
versionfile_(NULL),
pro_num_(0),
pool_(NULL),
exit_all_consume_(false),
binlog_path_(binlog_path),
file_size_(file_size) {
......@@ -41,12 +36,6 @@ Binlog::Binlog(const std::string& binlog_path, const int file_size) :
if (!slash::FileExists(manifest)) {
DLOG(INFO) << "Binlog: Manifest file not exist";
profile = NewFileName(filename, pro_num_);
s = slash::NewWritableFile(profile, &queue_);
if (!s.ok()) {
LOG(WARNING) << "Binlog: new " << filename << " " << s.ToString();
}
s = slash::NewRWFile(manifest, &versionfile_);
if (!s.ok()) {
LOG(WARNING) << "Binlog: new versionfile error " << s.ToString();
......@@ -68,26 +57,15 @@ Binlog::Binlog(const std::string& binlog_path, const int file_size) :
LOG(WARNING) << "Binlog: open versionfile error";
}
profile = NewFileName(filename, pro_num_);
slash::AppendWritableFile(profile, &queue_, version_->pro_offset_);
}
InitLogFile();
}
Binlog::~Binlog() {
delete version_;
delete versionfile_;
delete queue_;
}
void Binlog::InitLogFile() {
assert(queue_ != NULL);
uint64_t filesize = queue_->Filesize();
block_offset_ = filesize % kBlockSize;
}
Status Binlog::GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset) {
slash::RWLock(&(version_->rwlock_), false);
......
......@@ -46,7 +46,6 @@ class Binlog
Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset);
slash::WritableFile *queue() { return queue_; }
uint64_t file_size() {
......@@ -56,15 +55,11 @@ class Binlog
std::string filename;
void InitLogFile();
uint32_t consumer_num_;
uint64_t item_num_;
Version* version_;
slash::WritableFile *queue_;
slash::RWFile *versionfile_;
slash::Mutex mutex_;
......@@ -73,8 +68,6 @@ class Binlog
int block_offset_;
char* pool_;
bool exit_all_consume_;
const std::string binlog_path_;
uint64_t file_size_;
......
......@@ -153,6 +153,7 @@ Status BinlogConsumer::LoadNextFile(){
last_record_offset_ = con_offset_ % kBlockSize;
return Status::OK();
} else {
DLOG(INFO) << "Can't find binlog file " << confile;
return Status::Corruption("no binlog file exist to jump to");
}
}
......
......@@ -217,12 +217,8 @@ int main(int argc, char *argv[]) {
std::cout << "testing new binlog.............................................. " << std::endl;
Binlog* new_logger = new Binlog(output_path, INPUT_FILESIZE);
std::cout << "test result:new binlog can be loaded " << std::endl;
delete binlog_consumer;
delete binlog_producer;
delete old_logger;
delete new_logger;
return 0;
}
......@@ -4,6 +4,7 @@
#include <stdio.h>
#include <errno.h>
#include <string>
#include <signal.h>
#include "binlog.h"
#include "binlog_consumer.h"
......@@ -29,25 +30,52 @@ static void Usage()
"\t-s -- start time , default: '2001-00-00 00:59:01' \n"
"\t-e -- end time , default: '2100-01-30 24:00:01' \n"
" example: ./binlog_sender -n /data2/wangwenduo/newlog/ -t new -i 127.0.0.1 -p 10221 -s '2001-10-11 11:11:11' -e '2020-12-11 11:11:11' -f 526,527 \n"
" example2: ./binlog_sender -n /data2/wangwenduo/newlog/ -t new -i 127.0.0.1 -p 10221 -s '2001-10-11 11:11:11' -e '2020-12-11 11:11:11' -f 514-530 \n"
);
}
int GetFiles(std::string& files_str, std::string& pattern, std::vector<int>& files) {
std::string::size_type pos;
std::string file;
files_str += pattern;
int str_size = files_str.size();
for(int i = 0; i < str_size; i++) {
pos = files_str.find(pattern,i);
if (pos != (unsigned int)str_size) {
file = files_str.substr(i, pos - i);
files.push_back(atoi(file.c_str()));
i = pos + pattern.size() - 1;
}
}
return files.size();
/*
* io functions
*/
bool Exists(std::string& base, std::string pattern) {
std::size_t found = base.find(pattern);
if (found == std::string::npos) {
return false;
} else {
return true;
}
}
std::tuple<int,std::vector<int>> GetFileList(std::string files_str) {
std::vector<int> file_vec;
int str_size = files_str.size();
std::string::size_type pos;
if (Exists(files_str, std::string(1,','))) {
files_str += ',';
for (int i = 0; i < str_size; i++) {
pos = files_str.find(',',i);
if (pos != (unsigned int)str_size) {
std::string file = files_str.substr(i, pos - i);
file_vec.push_back(atoi(file.c_str()));
i = pos;
}
}
} else if (Exists(files_str, std::string(1,'-'))) {
pos = files_str.find('-',0);
int start_file = atoi(files_str.substr(0, pos).c_str());
int end_file = atoi(files_str.substr(pos+1).c_str());
for (int i = start_file; i <= end_file; i++) {
file_vec.push_back(i);
}
} else {
fprintf (stderr, "wrong input file string:%s \n", files_str.c_str());
exit(-1);
}
int file_num = file_vec.size();
return std::make_tuple(file_num, file_vec);
}
bool CheckSequential(std::vector<int>& seq) {
......@@ -64,20 +92,38 @@ bool CheckSequential(std::vector<int>& seq) {
}
/*
* Signal Handle functions
*/
void LastWord(int sig) {
fprintf(stderr,"receive signal %s", strsignal(sig));
exit(-1);
}
void SignalAssign() {
signal(SIGPIPE,LastWord);
}
int main(int argc, char *argv[]) {
if (argc < 2) {
Usage();
exit(-1);
}
SignalAssign();
/*
* parse args
*/
std::string passwd;
std::string input_path = "./old_log/";
std::string ip = "127.0.0.1";
int port = 6279;
std::string log_type = "old";
std::string files_str = "0";
std::string start_time_str = "2001-00-00 00:59:01";
std::string end_time_str = "2100-01-30 24:00:01";
std::string start_time_str = "2001-01-01 00:01:01";
std::string end_time_str = "2100-01-01 00:00:01";
// for correct inputs , we use these flags to generate warning to user
bool use_passwd = false;
......@@ -148,30 +194,31 @@ int main(int argc, char *argv[]) {
if (default_log_type) {
fprintf (stderr, "Warning: use default log type:%s \n", log_type.c_str());
}
if (default_start_time) {
fprintf (stderr, "Warning: use default start time:%s \n", start_time_str.c_str());
}
if (default_end_time) {
fprintf (stderr, "Warning: use default end time:%s \n", end_time_str.c_str());
}
if (log_type != "old" && log_type != "new") {
fprintf (stderr, "undefined log case: old or new only\n" );
exit(-1);
}
std::vector<int> files;
std::string comma = std::string(",");
int file_num = GetFiles(files_str, comma, files);
int file_num;
std::tie(file_num,files) = GetFileList(files_str);
int start_file = files[0];
bool isSequential = CheckSequential(files);
if (!isSequential) {
std::cout << "please input sequential binlog num :" << std::endl;
fprintf (stderr, "please input sequential binlog num \n" );
exit(-1);
}
/*
* open binlog
*/
Binlog* old_logger = new Binlog(input_path, INPUT_FILESIZE);
BinlogConsumer* binlog_consumer;
if (log_type == "old") {
......@@ -183,18 +230,24 @@ int main(int argc, char *argv[]) {
Status s;
s = binlog_consumer->LoadFile(start_file);
if(!s.ok()) {
std::cout << "something wrong while loading binlog:" << s.ToString() << std::endl;
fprintf (stderr, "error while loading binlog file write2file%d : %s \n", start_file, s.ToString().c_str());
exit(-1);
}
/*
* Connect
*/
pink::RedisCli *rcli = new pink::RedisCli();
rcli->set_connect_timeout(3000);
fprintf (stderr, "Connecting...\n");
pink::Status pink_s = rcli->Connect(ip, port);
if (!pink_s.ok()) {
printf ("Connect failed, %s\n", pink_s.ToString().c_str());
fprintf (stderr, "Connect failed, %s\n", pink_s.ToString().c_str());
exit(-1);
}
fprintf (stderr, "Connected...\n");
if (use_passwd) {
fprintf (stderr, "Sending Auth...\n");
std::string auth_str;
auth_str = "*2\r\n$4\r\nauth\r\n$";
auth_str.append(std::to_string(passwd.size()));
......@@ -203,9 +256,17 @@ int main(int argc, char *argv[]) {
auth_str.append("\r\n");
pink_s = rcli->Send(&auth_str);
pink_s = rcli->Recv(NULL);
if (!pink_s.ok()) {
fprintf (stderr, "Auth failed, %s\n", pink_s.ToString().c_str());
exit(-1);
}
}
/*
* parse binlog and send to pika server
*/
fprintf (stderr, "Sending query data...\n");
std::string scratch;
scratch.reserve(1024 * 1024);
int finished_num = 0;
......@@ -219,33 +280,57 @@ int main(int argc, char *argv[]) {
timet = mktime(&tm);
tv_end = timet;
uint64_t produce_time = 0;
int fail_num = 0;
int success_num = 0;
FILE * error_fp;
error_fp = fopen("binlog_sender.error.log", "w");
setvbuf(error_fp, NULL, _IONBF, 0);
if (!error_fp) {
fprintf (stderr, "error opening binlog_sender.error.log\n");
exit(-1);
}
fprintf(error_fp, "This log contains data that failed to be sent");
while (true){
s = binlog_consumer->Parse(scratch, &produce_time);
if (s.IsEndFile()) {
std::cout << "parse binlog file:"<< NewFileName(old_logger->filename, start_file++) << " finished" << std::endl;
fprintf (stderr, "parse binlog file: %s finished\n", NewFileName(old_logger->filename, start_file++).c_str());
finished_num ++;
if (finished_num < file_num) {
s = binlog_consumer->LoadNextFile();
if (!s.ok()) {
fprintf (stderr, "error loading binlog file\n");
exit(-1);
}
} else {
break;
}
} else if (s.IsComplete()) {
std::cout << "all binlog parsed" << std::endl;
fprintf (stderr,"all binlog parsed \n");
break;
} else if (s.ok()) {
if (log_type == "new" && (produce_time < tv_start || produce_time > tv_end)) {
if (log_type == "new" && (produce_time < tv_start || produce_time > tv_end) && !(default_start_time && default_end_time)) {
continue;
}
pink_s = rcli->Send(&scratch);
pink_s = rcli->Recv(NULL);
if (pink_s.ok()) {
pink_s = rcli->Recv(NULL);
}
if (!pink_s.ok()) {
fail_num ++;
fprintf(error_fp, "No.%d send data failed, status:%s \n", fail_num, pink_s.ToString().c_str());
fprintf(error_fp, "Data:\n%s\n", scratch.c_str());
} else if (++success_num % 10000 == 0) {
fprintf(stderr, "%d query data has been sent successfully\n", success_num);
}
} else if (!s.ok()) {
std::cout << "something wrong when parsing binlog " << std::endl;
fprintf(stderr, "something wrong when parsing binlog \n");
break;
}
}
fclose (error_fp);
delete binlog_consumer;
delete old_logger;
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册