提交 96db964d 编写于 作者: K KernelMaker

add ssdb_to_pika

上级 66e5a165
CXX = g++
CXXFLAGS = -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DOS_LINUX -Wall -W -DDEBUG -g -O2 -D__XDEBUG__ -fPIC -Wno-unused-function -std=c++11
OBJECT = ssdb_to_pika
SRC_DIR = .
OUTPUT = .
INCLUDE_PATH = -I./ \
-I../../third/nemo/output/include \
-I../../third/nemo/3rdparty/nemo-rocksdb/output/include \
-I../../third/nemo/3rdparty/nemo-rocksdb/rocksdb \
-I../../third/nemo/3rdparty/nemo-rocksdb/rocksdb/include
STATIC_LIBS = ./libssdb-client.a \
../../third/nemo/output/lib/libnemo.a \
../../third/nemo/output/lib/libnemodb.a \
../../third/nemo/output/lib/librocksdb.a
LIBS = -lsnappy \
-lrt \
-lz \
-lbz2 \
-lpthread
.PHONY: all clean
BASE_OBJS := $(wildcard $(SRC_DIR)/*.cc)
BASE_OBJS += $(wildcard $(SRC_DIR)/*.c)
BASE_OBJS += $(wildcard $(SRC_DIR)/*.cpp)
OBJS = $(patsubst %.cc,%.o,$(BASE_OBJS))
all: $(OBJECT)
@echo "Success, go, go, go..."
$(OBJECT): $(OBJS)
$(CXX) $(CXXFLAGS) -o $@ $(OBJS) $(INCLUDE_PATH) $(STATIC_LIBS) $(LIBS)
$(OBJS): %.o : %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_PATH)
clean:
rm -rf $(SRC_DIR)/*.o
rm -rf $(OBJECT)
#ifndef SSDB_API_CPP
#define SSDB_API_CPP
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <string>
#include <vector>
#include <map>
namespace ssdb{
/**
* Client requests' return status.
*/
class Status{
public:
/**
* Returns <code>true</code> if the request succeeded.
*/
bool ok(){
return code_ == "ok";
}
/**
* Returns <code>true</code> if the requested key is not found. When this method
* returns <code>true</code>, ok() will always returns <code>false</code>.
*/
bool not_found(){
return code_ == "not_found";
}
/**
* Returns <code>true</code> if error occurs during the request.
* It might be a server error, or a client error.
*/
bool error(){
return code_ != "ok";
}
/**
* The response code.
*/
std::string code(){
return code_;
}
Status(){}
Status(const std::string &code){
code_ = code;
}
Status(const std::vector<std::string> *resp){
if(resp && resp->size() > 0){
code_ = resp->at(0);
}else{
code_ = "error";
}
}
private:
std::string code_;
};
/**
* The SSDB client used to connect to SSDB server.
*/
class Client{
public:
static Client* connect(const char *ip, int port);
static Client* connect(const std::string &ip, int port);
Client(){};
virtual ~Client(){};
/// @name Free hand methods
/// All these methods return NULL if error; vector<std::string> if response ready,
/// the first element is response code.
/// @{
virtual const std::vector<std::string>* request(const std::vector<std::string> &req) = 0;
virtual const std::vector<std::string>* request(const std::string &cmd) = 0;
virtual const std::vector<std::string>* request(const std::string &cmd, const std::string &s2) = 0;
virtual const std::vector<std::string>* request(const std::string &cmd, const std::string &s2, const std::string &s3) = 0;
virtual const std::vector<std::string>* request(const std::string &cmd, const std::string &s2, const std::string &s3, const std::string &s4) = 0;
virtual const std::vector<std::string>* request(const std::string &cmd, const std::string &s2, const std::string &s3, const std::string &s4, const std::string &s5) = 0;
virtual const std::vector<std::string>* request(const std::string &cmd, const std::string &s2, const std::string &s3, const std::string &s4, const std::string &s5, const std::string &s6) = 0;
virtual const std::vector<std::string>* request(const std::string &cmd, const std::vector<std::string> &s2) = 0;
virtual const std::vector<std::string>* request(const std::string &cmd, const std::string &s2, const std::vector<std::string> &s3) = 0;
/// @}
virtual Status dbsize(int64_t *ret) = 0;
virtual Status get_kv_range(std::string *start, std::string *end) = 0;
virtual Status set_kv_range(const std::string &start, const std::string &end) = 0;
/// @name KV methods
/// @{
virtual Status get(const std::string &key, std::string *val) = 0;
virtual Status set(const std::string &key, const std::string &val) = 0;
/**
* Set the value of the key, with a time to live.
*/
virtual Status setx(const std::string &key, const std::string &val, int ttl) = 0;
virtual Status del(const std::string &key) = 0;
virtual Status incr(const std::string &key, int64_t incrby, int64_t *ret) = 0;
/**
* @param key_start Empty string means no limit.
* @param key_end Empty string means no limit.
*/
virtual Status keys(const std::string &key_start, const std::string &key_end,
uint64_t limit, std::vector<std::string> *ret) = 0;
/**
* Return key-value pairs.
* The two elements at ret[n] and ret[n+1] form a key-value pair, n=0,2,4,...
*/
virtual Status scan(const std::string &key_start, const std::string &key_end,
uint64_t limit, std::vector<std::string> *ret) = 0;
/**
* Return key-value pairs.
* The two elements at ret[n] and ret[n+1] form a key-value pair, n=0,2,4,...
*/
virtual Status rscan(const std::string &key_start, const std::string &key_end,
uint64_t limit, std::vector<std::string> *ret) = 0;
/**
* Return key-value pairs.
* The two elements at ret[n] and ret[n+1] form a key-value pair, n=0,2,4,...
*/
virtual Status multi_get(const std::vector<std::string> &keys, std::vector<std::string> *vals) = 0;
virtual Status multi_set(const std::map<std::string, std::string> &kvs) = 0;
virtual Status multi_del(const std::vector<std::string> &keys) = 0;
/// @}
/// @name Map(Hash) methods
/// @{
virtual Status hget(const std::string &name, const std::string &key, std::string *val) = 0;
virtual Status hset(const std::string &name, const std::string &key, const std::string &val) = 0;
virtual Status hdel(const std::string &name, const std::string &key) = 0;
virtual Status hincr(const std::string &name, const std::string &key, int64_t incrby, int64_t *ret) = 0;
virtual Status hsize(const std::string &name, int64_t *ret) = 0;
/**
* Delete all of the keys in a hashmap, return the number of keys deleted.
*/
virtual Status hclear(const std::string &name, int64_t *ret=NULL) = 0;
/**
* @param key_start Empty string means no limit.
* @param key_end Empty string means no limit.
*/
virtual Status hkeys(const std::string &name,
const std::string &key_start, const std::string &key_end,
uint64_t limit, std::vector<std::string> *ret) = 0;
/**
* Return key-value pairs.
* The two elements at ret[n] and ret[n+1] form a key-value pair, n=0,2,4,...
*/
virtual Status hscan(const std::string &name,
const std::string &key_start, const std::string &key_end,
uint64_t limit, std::vector<std::string> *ret) = 0;
/**
* Return key-value pairs.
* The two elements at ret[n] and ret[n+1] form a key-value pair, n=0,2,4,...
*/
virtual Status hrscan(const std::string &name,
const std::string &key_start, const std::string &key_end,
uint64_t limit, std::vector<std::string> *ret) = 0;
/**
* Return key-value pairs.
* The two elements at ret[n] and ret[n+1] form a key-value pair, n=0,2,4,...
*/
virtual Status multi_hget(const std::string &name, const std::vector<std::string> &keys,
std::vector<std::string> *ret) = 0;
virtual Status multi_hset(const std::string &name, const std::map<std::string, std::string> &kvs) = 0;
virtual Status multi_hdel(const std::string &name, const std::vector<std::string> &keys) = 0;
/// @}
/// @name Zset methods
/// @{
virtual Status zget(const std::string &name, const std::string &key, int64_t *ret) = 0;
virtual Status zset(const std::string &name, const std::string &key, int64_t score) = 0;
virtual Status zdel(const std::string &name, const std::string &key) = 0;
virtual Status zincr(const std::string &name, const std::string &key, int64_t incrby, int64_t *ret) = 0;
virtual Status zsize(const std::string &name, int64_t *ret) = 0;
/**
* Delete all of the keys in a zset, return the number of keys deleted.
*/
virtual Status zclear(const std::string &name, int64_t *ret=NULL) = 0;
/**
* <b>Important! This method may be extremly SLOW!</b>
*/
virtual Status zrank(const std::string &name, const std::string &key, int64_t *ret) = 0;
/**
* <b>Important! This method may be extremly SLOW!</b>
*/
virtual Status zrrank(const std::string &name, const std::string &key, int64_t *ret) = 0;
/**
* <b>Important! This method is SLOW for large offset!</b>
*/
virtual Status zrange(const std::string &name,
uint64_t offset, uint64_t limit,
std::vector<std::string> *ret) = 0;
/**
* <b>Important! This method is SLOW for large offset!</b>
*/
virtual Status zrrange(const std::string &name,
uint64_t offset, uint64_t limit,
std::vector<std::string> *ret) = 0;
/**
* @param score_start NULL means no limit.
* @param score_end NULL means no limit.
*/
virtual Status zkeys(const std::string &name, const std::string &key_start,
int64_t *score_start, int64_t *score_end,
uint64_t limit, std::vector<std::string> *ret) = 0;
/**
* Return key-score pairs.
* The two elements at ret[n] and ret[n+1] form a key-score pair, n=0,2,4,...
*/
virtual Status zscan(const std::string &name, const std::string &key_start,
int64_t *score_start, int64_t *score_end,
uint64_t limit, std::vector<std::string> *ret) = 0;
/**
* Return key-score pairs.
* The two elements at ret[n] and ret[n+1] form a key-score pair, n=0,2,4,...
*/
virtual Status zrscan(const std::string &name, const std::string &key_start,
int64_t *score_start, int64_t *score_end,
uint64_t limit, std::vector<std::string> *ret) = 0;
/**
* Return key-value pairs.
* The two elements at ret[n] and ret[n+1] form a key-value pair, n=0,2,4,...
*/
virtual Status multi_zget(const std::string &name, const std::vector<std::string> &keys,
std::vector<std::string> *scores) = 0;
virtual Status multi_zset(const std::string &name, const std::map<std::string, int64_t> &kss) = 0;
virtual Status multi_zdel(const std::string &name, const std::vector<std::string> &keys) = 0;
/// @}
virtual Status qpush(const std::string &name, const std::string &item, int64_t *ret_size=NULL) = 0;
virtual Status qpush(const std::string &name, const std::vector<std::string> &items, int64_t *ret_size=NULL) = 0;
virtual Status qpop(const std::string &name, std::string *ret) = 0;
virtual Status qpop(const std::string &name, int64_t limit, std::vector<std::string> *ret) = 0;
virtual Status qslice(const std::string &name, int64_t begin, int64_t end, std::vector<std::string> *ret) = 0;
virtual Status qrange(const std::string &name, int64_t begin, int64_t limit, std::vector<std::string> *ret) = 0;
virtual Status qclear(const std::string &name, int64_t *ret=NULL) = 0;
private:
// No copying allowed
Client(const Client&);
void operator=(const Client&);
};
}; // namespace ssdb
#endif
#include <iostream>
#include <thread>
#include "SSDB_client.h"
#include "nemo.h"
const int kBatchLen = 1000;
const int kSplitNum = 1000000;
void MigrateKv(const std::string& ip, const int port,
const std::string& password, nemo::Nemo* db,
const std::string& start = "", const std::string& end = "") {
ssdb::Client *client = ssdb::Client::connect(ip, port);
if (client == NULL) {
std::cout << "Kv client failed to connect to ssdb" << std::endl;
return;
}
const std::vector<std::string> *resp = client->request("auth", password);
if (!resp || resp->empty() || resp->front() != "ok") {
std::cout << "Kv client auth error" << std::endl;
delete client;
return;
}
std::cout << "kv client start to migrate, from " << start << " to " << end << std::endl;
std::vector<std::string> kvs;
ssdb::Status status_ssdb;
nemo::Status status_nemo;
std::string prev_start = start;
while (true) {
kvs.clear();
status_ssdb = client->scan(prev_start, end, kBatchLen, &kvs);
if(!status_ssdb.ok() && kvs.size() % 2 != 0) {
std::cout << "kv client scan error" << std::endl;
break;
}
if (kvs.size() == 0) {
break;
}
for (auto iter = kvs.begin(); iter != kvs.end(); iter+=2) {
// std::cout << "set " << *iter << " " << *(iter+1) << std::endl;
status_nemo = db->Set(*iter, *(iter+1));
if (!status_nemo.ok()) {
std::cout << "Kv client set error, key: " << *iter << std::endl;
}
}
prev_start = kvs.back();
}
}
void MigrateHash(const std::string& ip, const int port,
const std::string& password, nemo::Nemo* db,
std::vector<std::string> keys) {
ssdb::Client *client = ssdb::Client::connect(ip, port);
if (client == NULL) {
std::cout << "Hash client failed to connect to ssdb" << std::endl;
return;
}
const std::vector<std::string> *resp = client->request("auth", password);
if (!resp || resp->empty() || resp->front() != "ok") {
std::cout << "Hash client auth error" << std::endl;
delete client;
return;
}
std::cout << "Hash client start to migrate, from " << keys.front() << " to " << keys.back() << std::endl;
std::vector<std::string> fvs;
ssdb::Status status_ssdb;
nemo::Status status_nemo;
std::string prev_start_field = "";
for (auto iter = keys.begin(); iter != keys.end(); iter++) {
while (true) {
fvs.clear();
status_ssdb = client->hscan(*iter, prev_start_field, "", kBatchLen, &fvs);
if (!status_ssdb.ok() || fvs.size() % 2 != 0) {
std::cout << "Hash client hscan error" << std::endl;
delete client;
return;
}
if (fvs.empty()) {
break;
}
for (auto it = fvs.begin(); it != fvs.end(); it+=2) {
// std::cout << "hset " << *iter << " " << *it << " " << *(it+1) << std::endl;
status_nemo = db->HSet(*iter, *it, *(it+1));
if (!status_nemo.ok()) {
std::cout << "Hash client hset error, key: " << *iter << std::endl;
delete client;
return;
}
}
prev_start_field = fvs.back();
}
}
}
void MigrateQueue(const std::string& ip, const int port,
const std::string& password, nemo::Nemo* db,
std::vector<std::string> keys) {
ssdb::Client *client = ssdb::Client::connect(ip, port);
if (client == NULL) {
std::cout << "Queue client failed to connect to ssdb" << std::endl;
return;
}
const std::vector<std::string> *resp = client->request("auth", password);
if (!resp || resp->empty() || resp->front() != "ok") {
std::cout << "Queue client auth error" << std::endl;
delete client;
return;
}
std::cout << "Queue client start to migrate, from " << keys.front() << " to " << keys.back() << std::endl;
std::vector<std::string> fs;
ssdb::Status status_ssdb;
nemo::Status status_nemo;
std::string prev_start_field = "";
int64_t start = 0;
int64_t len = 0;
for (auto iter = keys.begin(); iter != keys.end(); iter++) {
start = 0;
while (true) {
fs.clear();
status_ssdb = client->qrange(*iter, start, kBatchLen, &fs);
if (!status_ssdb.ok()) {
std::cout << "Queue client range error" << std::endl;
delete client;
return;
}
if (fs.empty()) {
break;
}
for (auto it = fs.begin(); it != fs.end(); it++) {
// std::cout << "rpush " << *iter << " " << *it << " " << std::endl;
status_nemo = db->RPush(*iter, *it, &len);
if (!status_nemo.ok()) {
std::cout << "Queue client rpush error, key: " << *iter << std::endl;
delete client;
return;
}
}
start += fs.size();
}
}
}
void MigrateZset(const std::string& ip, const int port,
const std::string& password, nemo::Nemo* db,
std::vector<std::string> keys) {
ssdb::Client *client = ssdb::Client::connect(ip, port);
if (client == NULL) {
std::cout << "Zset client failed to connect to ssdb" << std::endl;
return;
}
const std::vector<std::string> *resp = client->request("auth", password);
if (!resp || resp->empty() || resp->front() != "ok") {
std::cout << "Zset client auth error" << std::endl;
delete client;
return;
}
std::cout << "Zset client start to migrate, from " << keys.front() << " to " << keys.back() << std::endl;
std::vector<std::string> sms;
ssdb::Status status_ssdb;
nemo::Status status_nemo;
std::string prev_start_member = "";
int64_t zadd_res;
for (auto iter = keys.begin(); iter != keys.end(); iter++) {
while (true) {
sms.clear();
status_ssdb = client->zscan(*iter, prev_start_member, NULL, NULL, kBatchLen, &sms);
if (!status_ssdb.ok() || sms.size() % 2 != 0) {
std::cout << "Zset client zscan error" << std::endl;
delete client;
return;
}
if (sms.empty()) {
break;
}
for (auto it = sms.begin(); it != sms.end(); it+=2) {
// std::cout << "zadd " << *iter << " " << *it << " " << *(it+1) << std::endl;
status_nemo = db->ZAdd(*iter, stod(*(it+1)), *it, &zadd_res);
if (!status_nemo.ok()) {
std::cout << "Zadd client zadd error, key: " << *iter << std::endl;
delete client;
return;
}
}
prev_start_member = sms[sms.size() - 2];
}
}
}
void DoKv(const std::string& ip, const int port,
const std::string& password, nemo::Nemo* db) {
ssdb::Client *client = ssdb::Client::connect(ip, port);
if (client == NULL) {
std::cout << "Kv center client failed to connect to ssdb" << std::endl;
return;
}
const std::vector<std::string> *resp = client->request("auth", password);
if (!resp || resp->empty() || resp->front() != "ok") {
std::cout << "Kv client auth error" << std::endl;
delete client;
return;
}
std::string start = "";
std::string end = "";
ssdb::Status status_ssdb;
std::vector<std::string> keys;
std::thread *threads[100];
int thread_num = 0;
std::string prev_start = "";
while (true) {
keys.clear();
end = "";
status_ssdb = client->keys(start, end, kSplitNum, &keys);
if (!status_ssdb.ok()) {
std::cout << "Kv center client keys error" << std::endl;
delete client;
break;
}
if (keys.empty()) {
std::cout << "Kv center client keys done, thread_num: " << thread_num << std::endl;
delete client;
break;
}
threads[thread_num] = new std::thread(MigrateKv, ip, port,
password, db, prev_start, keys.back());
thread_num++;
start = prev_start = keys.back();
}
for (int i = 0; i < thread_num; i++) {
threads[i]->join();
delete threads[i];
}
std::cout << "Kv migrate done" << std::endl;
}
void DoHash(const std::string& ip, const int port,
const std::string& password, nemo::Nemo* db) {
ssdb::Client *client = ssdb::Client::connect(ip, port);
if (client == NULL) {
std::cout << "Kv center client failed to connect to ssdb" << std::endl;
return;
}
const std::vector<std::string>* resp = client->request("auth", password);
if (!resp || resp->empty() || resp->front() != "ok") {
std::cout << "Kv client auth error" << std::endl;
delete client;
return;
}
std::string start = "";
std::string end = "";
ssdb::Status status_ssdb;
std::thread *threads[100];
int thread_num = 0;
std::string prev_start = "";
std::vector<std::string> keys;
while (true) {
keys.clear();
end = "";
resp = NULL;
resp = client->request("hlist", start,
end, std::to_string(kSplitNum));
if (!resp || resp->front() != "ok") {
std::cout << "Hash center client keys error" << std::endl;
delete client;
break;
}
keys.assign(resp->begin() + 1, resp->end());
if (keys.empty()) {
std::cout << "Hash center client keys done, thread_num: " << thread_num << std::endl;
delete client;
break;
}
threads[thread_num] = new std::thread(MigrateHash, ip, port,
password, db, keys);
thread_num++;
start = prev_start = resp->back();
}
for (int i = 0; i< thread_num; i++) {
threads[i]->join();
delete threads[i];
}
std::cout << "Hash migrate done" << std::endl;
}
void DoZset(const std::string& ip, const int port,
const std::string& password, nemo::Nemo* db) {
ssdb::Client *client = ssdb::Client::connect(ip, port);
if (client == NULL) {
std::cout << "Kv center client failed to connect to ssdb" << std::endl;
return;
}
const std::vector<std::string>* resp = client->request("auth", password);
if (!resp || resp->empty() || resp->front() != "ok") {
std::cout << "Kv client auth error" << std::endl;
delete client;
return;
}
std::string start = "";
std::string end = "";
ssdb::Status status_ssdb;
std::thread *threads[100];
int thread_num = 0;
std::string prev_start = "";
std::vector<std::string> keys;
while (true) {
keys.clear();
end = "";
resp = NULL;
resp = client->request("zlist", start,
end, std::to_string(kSplitNum));
if (!resp || resp->front() != "ok") {
std::cout << "Zset center client keys error" << std::endl;
delete client;
break;
}
keys.assign(resp->begin() + 1, resp->end());
if (keys.empty()) {
std::cout << "Zset center client keys done, thread_num: " << thread_num << std::endl;
delete client;
break;
}
threads[thread_num] = new std::thread(MigrateZset, ip, port,
password, db, keys);
thread_num++;
start = prev_start = resp->back();
}
for (int i = 0; i< thread_num; i++) {
threads[i]->join();
delete threads[i];
}
std::cout << "Zset migrate done" << std::endl;
}
void DoQueue(const std::string& ip, const int port,
const std::string& password, nemo::Nemo* db) {
ssdb::Client *client = ssdb::Client::connect(ip, port);
if (client == NULL) {
std::cout << "Queue center client failed to connect to ssdb" << std::endl;
return;
}
const std::vector<std::string>* resp = client->request("auth", password);
if (!resp || resp->empty() || resp->front() != "ok") {
std::cout << "Queue client auth error" << std::endl;
delete client;
return;
}
std::string start = "";
std::string end = "";
ssdb::Status status_ssdb;
std::thread *threads[100];
int thread_num = 0;
std::string prev_start = "";
std::vector<std::string> keys;
while (true) {
keys.clear();
end = "";
resp = NULL;
resp = client->request("qlist", start,
end, std::to_string(kSplitNum));
if (!resp || resp->front() != "ok") {
std::cout << "Queue center client keys error" << std::endl;
delete client;
break;
}
keys.assign(resp->begin() + 1, resp->end());
if (keys.empty()) {
std::cout << "Queue center client keys done, thread_num: " << thread_num << std::endl;
delete client;
break;
}
threads[thread_num] = new std::thread(MigrateQueue, ip, port,
password, db, keys);
thread_num++;
start = prev_start = resp->back();
}
for (int i = 0; i< thread_num; i++) {
threads[i]->join();
delete threads[i];
}
std::cout << "Queue migrate done" << std::endl;
}
void Usage() {
std::cout << "usage:" << std::endl
<< "./ssdb2pika ssdb_server_ip ssdb_server_port ssdb_server_passwd pika_db_path"
<< std::endl;
}
int main(int argc, char** argv) {
if (argc != 5) {
Usage();
return -1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
std::string password = argv[3];
std::string nemo_path = argv[4];
nemo::Options option;
option.write_buffer_size = 268435456; //256M
option.target_file_size_base = 20971520; //20M
option.max_background_flushes = 4;
option.max_background_compactions = 4;
nemo::Nemo* db = new nemo::Nemo(nemo_path, option);
std::thread thread_kv = std::thread(DoKv, ip, port, password, db);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::thread thread_hash = std::thread(DoHash, ip, port, password, db);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::thread thread_zset = std::thread(DoZset, ip, port, password, db);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::thread thread_queue = std::thread(DoQueue, ip, port, password, db);
thread_kv.join();
thread_hash.join();
thread_zset.join();
thread_queue.join();
delete db;
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册