提交 c4a1aa2b 编写于 作者: L linyuanjin

add unix socket support

上级 95a44a94
......@@ -245,7 +245,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
error_handler_(this, immutable_db_options_, &mutex_),
atomic_flush_install_cv_(&mutex_),
bytedance_tags_("dbname=" + dbname),
console_runner_(env_, immutable_db_options_.info_log.get()) {
console_runner_(dbname, env_, immutable_db_options_.info_log.get()) {
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
assert(batch_per_txn_ || seq_per_batch_);
......
......@@ -20,6 +20,7 @@ constexpr unsigned int kTCPKeepAlive = 300;
constexpr unsigned int kTimeout = 360;
constexpr unsigned int kReadLength = 4096;
constexpr unsigned int kMaxInputBuffer = 10485760;
constexpr unsigned int kUnixSocketPerm = 700;
static void ReleaseOrMarkClient(int fd, Client *c, EventLoop<Client> *el) {
if (c->ref_count == 0) {
......@@ -135,7 +136,8 @@ static void ServerCron(long *last_cron_time, long curr_time,
}
}
int ServerMain(ServerRunner *runner, Env *env, Logger *log) {
int ServerMain(ServerRunner *runner, const std::string &path, Env *env,
Logger *log) {
const int el_fd = EventLoop<Client>::Open();
if (el_fd < 0) {
ROCKS_LOG_ERROR(log, "Failed creating the event loop. Error message: '%s'",
......@@ -151,12 +153,25 @@ int ServerMain(ServerRunner *runner, Env *env, Logger *log) {
}
char err[ANET_ERR_LEN];
const int ac_fd =
anetTcpServer(err, kPort, const_cast<char *>(kBindAddr), kBacklog);
if (ac_fd < 0) {
ROCKS_LOG_ERROR(log, "Failed creating the TCP server. Error message: '%s'",
err);
return 1;
int ac_fd;
if (path.empty()) { // currently, it's just for debug
ac_fd = anetTcpServer(err, kPort, const_cast<char *>(kBindAddr), kBacklog);
if (ac_fd < 0) {
ROCKS_LOG_ERROR(
log, "Failed creating the TCP server. Error message: '%s'", err);
return 1;
}
} else {
std::string sock_path = path + "/console.sock";
unlink(sock_path.c_str()); /* don't care if this fails */
ac_fd = anetUnixServer(err, (char *)sock_path.c_str(), kUnixSocketPerm,
kBacklog);
if (ac_fd < 0) {
ROCKS_LOG_ERROR(
log, "Failed creating the Unix socket server. Error message: '%s'",
err);
return 1;
}
}
anetNonBlock(nullptr, ac_fd);
......
......@@ -16,7 +16,8 @@ using namespace gujia;
struct ServerRunner;
int ServerMain(ServerRunner* runner, rocksdb::Env* env, rocksdb::Logger* log);
int ServerMain(ServerRunner* runner, const std::string& path, rocksdb::Env* env,
rocksdb::Logger* log);
struct Client {
RespMachine resp;
......@@ -31,8 +32,10 @@ struct Client {
};
struct ServerRunner {
ServerRunner(rocksdb::Env* env, rocksdb::Logger* log) {
std::thread job([this, env, log]() { ServerMain(this, env, log); });
ServerRunner(const std::string& path, rocksdb::Env* env,
rocksdb::Logger* log) {
auto p = &path;
std::thread job([this, p, env, log]() { ServerMain(this, *p, env, log); });
job.detach();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册