提交 ce353c24 编写于 作者: I Igor Canadi

Nuke tools/shell

Summary: We don't use or build this code

Test Plan: builds

Reviewers: dhruba

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17979
上级 86ae8203
#include <boost/shared_ptr.hpp>
#include "DBClientProxy.h"
#include "thrift/lib/cpp/protocol/TBinaryProtocol.h"
#include "thrift/lib/cpp/transport/TSocket.h"
#include "thrift/lib/cpp/transport/TTransportUtils.h"
using namespace std;
using namespace boost;
using namespace Tleveldb;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
namespace rocksdb {
DBClientProxy::DBClientProxy(const string & host, int port) :
host_(host),
port_(port),
dbToHandle_(),
dbClient_() {
}
DBClientProxy::~DBClientProxy() {
cleanUp();
}
void DBClientProxy::connect(void) {
cleanUp();
printf("Connecting to %s:%d\n", host_.c_str(), port_);
try {
boost::shared_ptr<TSocket> socket(new TSocket(host_, port_));
boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
boost::shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
dbClient_.reset(new DBClient(protocol));
transport->open();
} catch (const std::exception & e) {
dbClient_.reset();
throw;
}
}
void DBClientProxy::cleanUp(void) {
if(dbClient_.get()) {
for(map<string, DBHandle>::iterator itor = dbToHandle_.begin();
itor != dbToHandle_.end();
++itor) {
dbClient_->Close(itor->second, itor->first);
}
dbClient_.reset();
}
dbToHandle_.clear();
}
void DBClientProxy::open(const string & db) {
if(!dbClient_.get()) {
printf("please connect() first\n");
return;
}
// printf("opening database : %s\n", db.c_str());
// we use default DBOptions here
DBOptions opt;
DBHandle handle;
try {
dbClient_->Open(handle, db, opt);
} catch (const LeveldbException & e) {
printf("%s\n", e.message.c_str());
if(kIOError == e.errorCode) {
printf("no such database : %s\n", db.c_str());
return;
}else {
printf("Unknown error : %d\n", e.errorCode);
return;
}
}
dbToHandle_[db] = handle;
}
bool DBClientProxy::create(const string & db) {
if(!dbClient_.get()) {
printf("please connect() first\n");
return false;
}
printf("creating database : %s\n", db.c_str());
DBOptions opt;
opt.create_if_missing = true;
opt.error_if_exists = true;
DBHandle handle;
try {
dbClient_->Open(handle, db, opt);
}catch (const LeveldbException & e) {
printf("%s\n", e.message.c_str());
printf("error code : %d\n", e.errorCode);
if(kNotFound == e.errorCode) {
printf("no such database : %s\n", db.c_str());
return false;;
} else {
printf("Unknown error : %d\n", e.errorCode);
return false;
}
}
dbToHandle_[db] = handle;
return true;
}
map<string, DBHandle>::iterator
DBClientProxy::getHandle(const string & db) {
map<string, DBHandle>::iterator itor = dbToHandle_.find(db);
if(dbToHandle_.end() == itor) {
open(db);
itor = dbToHandle_.find(db);
}
return itor;
}
bool DBClientProxy::get(const string & db,
const string & key,
string & value) {
if(!dbClient_.get()) {
printf("please connect() first\n");
return false;
}
map<string, DBHandle>::iterator itor = getHandle(db);
if(dbToHandle_.end() == itor) {
return false;
}
ResultItem ret;
Slice k;
k.data = key;
k.size = key.size();
// we use default values of options here
ReadOptions opt;
dbClient_->Get(ret,
itor->second,
k,
opt);
if(kOk == ret.status) {
value = ret.value.data;
return true;
} else if(kNotFound == ret.status) {
printf("no such key : %s\n", key.c_str());
return false;
} else {
printf("get data error : %d\n", ret.status);
return false;
}
}
bool DBClientProxy::put(const string & db,
const string & key,
const string & value) {
if(!dbClient_.get()) {
printf("please connect() first\n");
return false;
}
map<string, DBHandle>::iterator itor = getHandle(db);
if(dbToHandle_.end() == itor) {
return false;
}
kv temp;
temp.key.data = key;
temp.key.size = key.size();
temp.value.data = value;
temp.value.size = value.size();
WriteOptions opt;
opt.sync = true;
Code code;
code = dbClient_->Put(itor->second,
temp,
opt);
if(kOk == code) {
// printf("set value finished\n");
return true;
} else {
printf("put data error : %d\n", code);
return false;
}
}
bool DBClientProxy::scan(const string & db,
const string & start_key,
const string & end_key,
const string & limit,
vector<pair<string, string> > & kvs) {
if(!dbClient_.get()) {
printf("please connect() first\n");
return false;
}
int limitInt = -1;
limitInt = atoi(limit.c_str());
if(limitInt <= 0) {
printf("Error while parse limit : %s\n", limit.c_str());
return false;
}
if(start_key > end_key) {
printf("empty range.\n");
return false;
}
map<string, DBHandle>::iterator itor = getHandle(db);
if(dbToHandle_.end() == itor) {
return false;
}
ResultIterator ret;
// we use the default values of options here
ReadOptions opt;
Slice k;
k.data = start_key;
k.size = start_key.size();
dbClient_->NewIterator(ret,
itor->second,
opt,
seekToKey,
k);
Iterator it;
if(kOk == ret.status) {
it = ret.iterator;
} else {
printf("get iterator error : %d\n", ret.status);
return false;
}
int idx = 0;
string ck = start_key;
while(idx < limitInt && ck < end_key) {
ResultPair retPair;
dbClient_->GetNext(retPair, itor->second, it);
if(kOk == retPair.status) {
++idx;
ck = retPair.keyvalue.key.data;
if (ck < end_key) {
kvs.push_back(make_pair(retPair.keyvalue.key.data,
retPair.keyvalue.value.data));
}
} else if(kEnd == retPair.status) {
printf("not enough values\n");
return true;
} else {
printf("GetNext() error : %d\n", retPair.status);
return false;
}
}
return true;
}
} // namespace
#ifndef TOOLS_SHELL_DBCLIENTPROXY
#define TOOLS_SHELL_DBCLIENTPROXY
#include <vector>
#include <map>
#include <string>
#include <boost/utility.hpp>
#include <boost/shared_ptr.hpp>
#include "DB.h"
/*
* class DBClientProxy maintains:
* 1. a connection to rocksdb service
* 2. a map from db names to opened db handles
*
* it's client codes' responsibility to catch all possible exceptions.
*/
namespace rocksdb {
class DBClientProxy : private boost::noncopyable {
public:
// connect to host_:port_
void connect(void);
// return true on success, false otherwise
bool get(const std::string & db,
const std::string & key,
std::string & value);
// return true on success, false otherwise
bool put(const std::string & db,
const std::string & key,
const std::string & value);
// return true on success, false otherwise
bool scan(const std::string & db,
const std::string & start_key,
const std::string & end_key,
const std::string & limit,
std::vector<std::pair<std::string, std::string> > & kvs);
// return true on success, false otherwise
bool create(const std::string & db);
DBClientProxy(const std::string & host, int port);
~DBClientProxy();
private:
// some internal help functions
void cleanUp(void);
void open(const std::string & db);
std::map<std::string, Trocksdb::DBHandle>::iterator getHandle(const std::string & db);
const std::string host_;
const int port_;
std::map<std::string, Trocksdb::DBHandle> dbToHandle_;
boost::shared_ptr<Trocksdb::DBClient> dbClient_;
};
} // namespace
#endif
#include "ShellContext.h"
int main(int argc, char ** argv) {
ShellContext c(argc, argv);
c.run();
}
#include <iostream>
#include <boost/shared_ptr.hpp>
#include "ShellContext.h"
#include "ShellState.h"
#include "thrift/lib/cpp/protocol/TBinaryProtocol.h"
#include "thrift/lib/cpp/transport/TSocket.h"
#include "thrift/lib/cpp/transport/TTransportUtils.h"
using namespace std;
using namespace boost;
using namespace Tleveldb;
using namespace rocksdb;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
void ShellContext::changeState(ShellState * pState) {
pShellState_ = pState;
}
void ShellContext::stop(void) {
exit_ = true;
}
bool ShellContext::ParseInput(void) {
if(argc_ != 3) {
printf("leveldb_shell host port\n");
return false;
}
port_ = atoi(argv_[2]);
if(port_ <= 0) {
printf("Error while parse port : %s\n", argv_[2]);
return false;
}
clientProxy_.reset(new DBClientProxy(argv_[1], port_));
if(!clientProxy_.get()) {
return false;
} else {
return true;
}
}
void ShellContext::connect(void) {
clientProxy_->connect();
}
void ShellContext::create(const string & db) {
if (clientProxy_->create(db)) {
printf("%s created\n", db.c_str());
}
}
void ShellContext::get(const string & db,
const string & key) {
string v;
if (clientProxy_->get(db, key, v)) {
printf("%s\n", v.c_str());
}
}
void ShellContext::put(const string & db,
const string & key,
const string & value) {
if (clientProxy_->put(db, key, value)) {
printf("(%s, %s) has been set\n", key.c_str(), value.c_str());
}
}
void ShellContext::scan(const string & db,
const string & start_key,
const string & end_key,
const string & limit) {
vector<pair<string, string> > kvs;
if (clientProxy_->scan(db, start_key, end_key, limit, kvs)) {
for(unsigned int i = 0; i < kvs.size(); ++i) {
printf("%d (%s, %s)\n", i, kvs[i].first.c_str(), kvs[i].second.c_str());
}
}
}
void ShellContext::run(void) {
while(!exit_) {
pShellState_->run(this);
}
}
ShellContext::ShellContext(int argc, char ** argv) :
pShellState_(ShellStateStart::getInstance()),
exit_(false),
argc_(argc),
argv_(argv),
port_(-1),
clientProxy_() {
}
#ifndef TOOLS_SHELL_SHELLCONTEXT
#define TOOLS_SHELL_SHELLCONTEXT
#include <map>
#include <string>
#include <boost/utility.hpp>
#include <boost/shared_ptr.hpp>
#include "DB.h"
#include "DBClientProxy.h"
class ShellState;
class ShellContext : private boost::noncopyable {
public:
void changeState(ShellState * pState);
void stop(void);
bool ParseInput(void);
void connect(void);
void get(const std::string & db,
const std::string & key);
void put(const std::string & db,
const std::string & key,
const std::string & value);
void scan(const std::string & db,
const std::string & start_key,
const std::string & end_key,
const std::string & limit);
void create(const std::string & db);
void run(void);
ShellContext(int argc, char ** argv);
private:
ShellState * pShellState_;
bool exit_;
int argc_;
char ** argv_;
int port_;
boost::shared_ptr<rocksdb::DBClientProxy> clientProxy_;
};
#endif
#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include "ShellState.h"
#include "ShellContext.h"
#include "transport/TTransportException.h"
using namespace std;
using namespace apache::thrift::transport;
const char * PMT = ">> ";
void ShellStateStart::run(ShellContext * c) {
if(!c->ParseInput()) {
c->changeState(ShellStateStop::getInstance());
} else {
c->changeState(ShellStateConnecting::getInstance());
}
}
void ShellStateStop::run(ShellContext * c) {
c->stop();
}
void ShellStateConnecting::run(ShellContext * c) {
try {
c->connect();
} catch (const TTransportException & e) {
cout << e.what() << endl;
c->changeState(ShellStateStop::getInstance());
return;
}
c->changeState(ShellStateConnected::getInstance());
}
void ShellStateConnected::unknownCmd(void) {
cout << "Unknown command!" << endl;
cout << "Use help to list all available commands" << endl;
}
void ShellStateConnected::helpMsg(void) {
cout << "Currently supported commands:" << endl;
cout << "create db" << endl;
cout << "get db key" << endl;
cout << "scan db start_key end_key limit" << endl;
cout << "put db key value" << endl;
cout << "exit/quit" << endl;
}
void ShellStateConnected::handleConError(ShellContext * c) {
cout << "Connection down" << endl;
cout << "Reconnect ? (y/n) :" << endl;
string s;
while(getline(cin, s)) {
if("y" == s) {
c->changeState(ShellStateConnecting::getInstance());
break;
} else if("n" == s) {
c->changeState(ShellStateStop::getInstance());
break;
} else {
cout << "Reconnect ? (y/n) :" << endl;
}
}
}
void ShellStateConnected::run(ShellContext * c) {
string line;
cout << PMT;
getline(cin, line);
istringstream is(line);
vector<string> params;
string param;
while(is >> param) {
params.push_back(param);
}
// empty input line
if(params.empty())
return;
if("quit" == params[0] || "exit" == params[0]) {
c->changeState(ShellStateStop::getInstance());
} else if("get" == params[0]) {
if(params.size() == 3) {
try {
c->get(params[1], params[2]);
} catch (const TTransportException & e) {
cout << e.what() << endl;
handleConError(c);
}
} else {
unknownCmd();
}
} else if("create" == params[0]) {
if(params.size() == 2) {
try {
c->create(params[1]);
} catch (const TTransportException & e) {
cout << e.what() << endl;
handleConError(c);
}
} else {
unknownCmd();
}
}else if("put" == params[0]) {
if(params.size() == 4) {
try {
c->put(params[1], params[2], params[3]);
} catch (const TTransportException & e) {
cout << e.what() << endl;
handleConError(c);
}
} else {
unknownCmd();
}
} else if("scan" == params[0]) {
if(params.size() == 5) {
try {
c->scan(params[1], params[2], params[3], params[4]);
} catch (const TTransportException & e) {
cout << e.what() << endl;
handleConError(c);
}
} else {
unknownCmd();
}
} else if("help" == params[0]) {
helpMsg();
} else {
unknownCmd();
}
}
#ifndef TOOLS_SHELL_SHELLSTATE
#define TOOLS_SHELL_SHELLSTATE
class ShellContext;
/*
* Currently, there are four types of state in total
* 1. start state: the first state the program enters
* 2. connecting state: the program try to connect to a rocksdb server, whose
* previous states could be "start" or "connected" states
* 3. connected states: the program has already connected to a server, and is
* processing user commands
* 4. stop state: the last state the program enters, do some cleaning up things
*/
class ShellState {
public:
virtual void run(ShellContext *) = 0;
virtual ~ShellState() {}
};
class ShellStateStart : public ShellState {
public:
static ShellStateStart * getInstance(void) {
static ShellStateStart instance;
return &instance;
}
virtual void run(ShellContext *);
private:
ShellStateStart() {}
virtual ~ShellStateStart() {}
};
class ShellStateStop : public ShellState {
public:
static ShellStateStop * getInstance(void) {
static ShellStateStop instance;
return &instance;
}
virtual void run(ShellContext *);
private:
ShellStateStop() {}
virtual ~ShellStateStop() {}
};
class ShellStateConnecting : public ShellState {
public:
static ShellStateConnecting * getInstance(void) {
static ShellStateConnecting instance;
return &instance;
}
virtual void run(ShellContext *);
private:
ShellStateConnecting() {}
virtual ~ShellStateConnecting() {}
};
class ShellStateConnected : public ShellState {
public:
static ShellStateConnected * getInstance(void) {
static ShellStateConnected instance;
return &instance;
}
virtual void run(ShellContext *);
private:
ShellStateConnected() {}
virtual ~ShellStateConnected() {}
void unknownCmd();
void handleConError(ShellContext *);
void helpMsg();
};
#endif
/**
* Tests for DBClientProxy class for leveldb
* @author Bo Liu (newpoo.liu@gmail.com)
* Copyright 2012 Facebook
*/
#include <algorithm>
#include <vector>
#include <string>
#include <protocol/TBinaryProtocol.h>
#include <transport/TSocket.h>
#include <transport/TBufferTransports.h>
#include <util/testharness.h>
#include <DB.h>
#include <AssocService.h>
#include <leveldb_types.h>
#include "server_options.h"
#include "../DBClientProxy.h"
using namespace rocksdb;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using boost::shared_ptr;
using namespace Tleveldb;
using namespace std;
extern "C" void startServer(int argc, char**argv);
extern "C" void stopServer(int port);
extern ServerOptions server_options;
static const string db1("db1");
static void testDBClientProxy(DBClientProxy & dbcp) {
bool flag;
const int NOK = 100;
const int BUFSIZE = 16;
int testcase = 0;
vector<string> keys, values;
vector<pair<string, string> > kvs, correctKvs;
string k, v;
for(int i = 0; i < NOK; ++i) {
char bufKey[BUFSIZE];
char bufValue[BUFSIZE];
snprintf(bufKey, BUFSIZE, "key%d", i);
snprintf(bufValue, BUFSIZE, "value%d", i);
keys.push_back(bufKey);
values.push_back(bufValue);
correctKvs.push_back((make_pair(string(bufKey), string(bufValue))));
}
sort(correctKvs.begin(), correctKvs.end());
// can not do get(), put(), scan() or create() before connected.
flag = dbcp.get(db1, keys[0], v);
ASSERT_TRUE(false == flag);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
flag = dbcp.put(db1, keys[0], keys[1]);
ASSERT_TRUE(false == flag);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
flag = dbcp.scan(db1, "a", "w", "100", kvs);
ASSERT_TRUE(false == flag);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
flag = dbcp.create(db1);
ASSERT_TRUE(false == flag);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
dbcp.connect();
// create a database
flag = dbcp.create(db1);
ASSERT_TRUE(true == flag);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
// no such key
flag = dbcp.get(db1, keys[0], v);
ASSERT_TRUE(false == flag);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
// scan() success with empty returned key-value pairs
kvs.clear();
flag = dbcp.scan(db1, "a", "w", "100", kvs);
ASSERT_TRUE(true == flag);
ASSERT_TRUE(kvs.empty());
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
// put()
for(int i = 0; i < NOK; ++i) {
flag = dbcp.put(db1, keys[i], values[i]);
ASSERT_TRUE(true == flag);
}
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
// scan all of key-value pairs
kvs.clear();
flag = dbcp.scan(db1, "a", "w", "100", kvs);
ASSERT_TRUE(true == flag);
ASSERT_TRUE(kvs == correctKvs);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
// scan the first 20 key-value pairs
{
kvs.clear();
flag = dbcp.scan(db1, "a", "w", "20", kvs);
ASSERT_TRUE(true == flag);
vector<pair<string, string> > tkvs(correctKvs.begin(), correctKvs.begin() + 20);
ASSERT_TRUE(kvs == tkvs);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
}
// scan key[10] to key[50]
{
kvs.clear();
flag = dbcp.scan(db1, correctKvs[10].first, correctKvs[50].first, "100", kvs);
ASSERT_TRUE(true == flag);
vector<pair<string, string> > tkvs(correctKvs.begin() + 10, correctKvs.begin() + 50);
ASSERT_TRUE(kvs == tkvs);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
}
// scan "key10" to "key40" by limit constraint
{
kvs.clear();
flag = dbcp.scan(db1, correctKvs[10].first.c_str(), "w", "30", kvs);
ASSERT_TRUE(true == flag);
vector<pair<string, string> > tkvs(correctKvs.begin() + 10, correctKvs.begin() + 40);
ASSERT_TRUE(kvs == tkvs);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
}
// get()
flag = dbcp.get(db1, "unknownKey", v);
ASSERT_TRUE(false == flag);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
flag = dbcp.get(db1, keys[0], v);
ASSERT_TRUE(true == flag);
ASSERT_TRUE(v == values[0]);
printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase);
}
static void cleanupDir(std::string dir) {
// remove old data, if any
char* cleanup = new char[100];
snprintf(cleanup, 100, "rm -rf %s", dir.c_str());
system(cleanup);
}
int main(int argc, char **argv) {
// create a server
startServer(argc, argv);
printf("Server thread created.\n");
// give some time to the server to initialize itself
while (server_options.getPort() == 0) {
sleep(1);
}
cleanupDir(server_options.getDataDirectory(db1));
DBClientProxy dbcp("localhost", server_options.getPort());
testDBClientProxy(dbcp);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册