提交 f2ffca7e 编写于 作者: M Manju Rajashekhar

Merge pull request #191 from Almasty/master

Memcache binary protocol support
......@@ -5,7 +5,7 @@ m4_define([NC_PATCH], 0)
m4_define([NC_BUGS], [manj@cs.stanford.edu])
# Initialize autoconf
AC_PREREQ([2.64])
AC_PREREQ([2.63])
AC_INIT([nutcracker], [NC_MAJOR.NC_MINOR.NC_PATCH], [NC_BUGS])
AC_CONFIG_SRCDIR([src/nc.c])
AC_CONFIG_AUX_DIR([config])
......
......@@ -149,6 +149,31 @@ client_close(struct context *ctx, struct conn *conn)
req_put(msg);
}
for (msg = TAILQ_FIRST(&conn->imsg_q); msg != NULL; msg = nmsg) {
nmsg = TAILQ_NEXT(msg, c_tqe);
/* dequeue the message (request) from client inq */
conn->dequeue_inq(ctx, conn, msg);
if (msg->swallow || msg->noreply) {
log_debug(LOG_INFO, "close c %d swallow req %"PRIu64" len %"PRIu32
" type %d", conn->sd, msg->id, msg->mlen, msg->type);
req_put(msg);
} else {
ASSERT(msg->owner == conn);
msg->done = 1;
msg->error = 1;
msg->err = conn->err;
conn->enqueue_outq(ctx, conn, msg);
log_debug(LOG_INFO, "close c %d schedule error for req %"PRIu64" "
"len %"PRIu32" type %d%c %s", conn->sd, msg->id,
msg->mlen, msg->type, conn->err ? ':' : ' ',
conn->err ? strerror(conn->err) : " ");
}
}
ASSERT(conn->smsg == NULL);
ASSERT(TAILQ_EMPTY(&conn->imsg_q));
......@@ -187,3 +212,30 @@ client_close(struct context *ctx, struct conn *conn)
conn_put(conn);
}
void
client_timeout(struct context *ctx, struct conn *conn)
{
rstatus_t status;
struct msg *msg, *nmsg, *pmsg;
ASSERT(conn->client && !conn->proxy);
for (msg = TAILQ_FIRST(&conn->imsg_q); msg != NULL; msg = nmsg) {
nmsg = TAILQ_NEXT(msg, c_tqe);
conn->dequeue_inq(ctx, conn, msg);
msg->done = 1;
msg->error = 1;
msg->err = ETIMEDOUT;
conn->enqueue_outq(ctx, conn, msg);
if (req_done(conn, TAILQ_FIRST(&conn->omsg_q))) {
status = event_add_out(ctx->evb, conn);
if (status != NC_OK) {
conn->err = errno;
}
}
}
}
......@@ -24,5 +24,6 @@ bool client_active(struct conn *conn);
void client_ref(struct conn *conn, void *owner);
void client_unref(struct conn *conn);
void client_close(struct context *ctx, struct conn *conn);
void client_timeout(struct context *ctx, struct conn *conn);
#endif
......@@ -41,6 +41,13 @@ static struct string dist_strings[] = {
};
#undef DEFINE_ACTION
#define DEFINE_ACTION(_proto, _name) string(#_name),
static struct string proto_strings[] = {
PROTOCOL_CODEC( DEFINE_ACTION )
null_string
};
#undef DEFINE_ACTION
static struct command conf_commands[] = {
{ string("listen"),
conf_set_listen,
......@@ -71,8 +78,12 @@ static struct command conf_commands[] = {
offsetof(struct conf_pool, client_connections) },
{ string("redis"),
conf_set_bool,
offsetof(struct conf_pool, redis) },
conf_set_protocol,
offsetof(struct conf_pool, protocol) },
{ string("protocol"),
conf_set_protocol,
offsetof(struct conf_pool, protocol) },
{ string("preconnect"),
conf_set_bool,
......@@ -183,7 +194,7 @@ conf_pool_init(struct conf_pool *cp, struct string *name)
cp->client_connections = CONF_UNSET_NUM;
cp->redis = CONF_UNSET_NUM;
cp->protocol = CONF_UNSET_PROTOCOL;
cp->preconnect = CONF_UNSET_NUM;
cp->auto_eject_hosts = CONF_UNSET_NUM;
cp->server_connections = CONF_UNSET_NUM;
......@@ -248,6 +259,7 @@ conf_pool_each_transform(void *elem, void *data)
TAILQ_INIT(&sp->c_conn_q);
array_null(&sp->server);
array_null(&sp->impacted_s_conn);
sp->ncontinuum = 0;
sp->nserver_continuum = 0;
sp->continuum = NULL;
......@@ -267,7 +279,7 @@ conf_pool_each_transform(void *elem, void *data)
sp->dist_type = cp->distribution;
sp->hash_tag = cp->hash_tag;
sp->redis = cp->redis ? 1 : 0;
sp->protocol = cp->protocol;
sp->timeout = cp->timeout;
sp->backlog = cp->backlog;
......@@ -279,8 +291,14 @@ conf_pool_each_transform(void *elem, void *data)
sp->auto_eject_hosts = cp->auto_eject_hosts ? 1 : 0;
sp->preconnect = cp->preconnect ? 1 : 0;
status = array_init(&sp->impacted_s_conn, array_n(&cp->server), sizeof(struct conn *));
if (status != NC_OK) {
return status;
}
status = server_init(&sp->server, &cp->server, sp);
if (status != NC_OK) {
array_deinit(&sp->impacted_s_conn);
return status;
}
......@@ -319,7 +337,7 @@ conf_dump(struct conf *cf)
log_debug(LOG_VVERB, " distribution: %d", cp->distribution);
log_debug(LOG_VVERB, " client_connections: %d",
cp->client_connections);
log_debug(LOG_VVERB, " redis: %d", cp->redis);
log_debug(LOG_VVERB, " protocol: %d", cp->protocol);
log_debug(LOG_VVERB, " preconnect: %d", cp->preconnect);
log_debug(LOG_VVERB, " auto_eject_hosts: %d", cp->auto_eject_hosts);
log_debug(LOG_VVERB, " server_connections: %d",
......@@ -1207,8 +1225,8 @@ conf_validate_pool(struct conf *cf, struct conf_pool *cp)
cp->client_connections = CONF_DEFAULT_CLIENT_CONNECTIONS;
if (cp->redis == CONF_UNSET_NUM) {
cp->redis = CONF_DEFAULT_REDIS;
if (cp->protocol == CONF_UNSET_PROTOCOL) {
cp->protocol = CONF_DEFAULT_PROTOCOL;
}
if (cp->preconnect == CONF_UNSET_NUM) {
......@@ -1743,3 +1761,55 @@ conf_set_hashtag(struct conf *cf, struct command *cmd, void *conf)
return CONF_OK;
}
char *
conf_set_protocol(struct conf *cf, struct command *cmd, void *conf)
{
rstatus_t status;
uint8_t *p;
protocol_type_t *pp, proto;
struct string *value, *proto_str, true_str, false_str;
p = conf;
pp = (protocol_type_t *)(p + cmd->offset);
value = array_top(&cf->arg);
proto = CONF_UNSET_PROTOCOL;
for (proto_str = proto_strings; proto_str->len != 0; proto_str++) {
if (string_compare(value, proto_str) != 0) {
continue;
}
proto = proto_str - proto_strings;
break;
}
/* support of redis: directive on top of protocol: one */
string_set_text(&true_str, "true");
string_set_text(&false_str, "false");
if (string_compare(value, &true_str) == 0) {
proto = REDIS;
} else if (string_compare(value, &false_str) == 0) {
proto = MEMCACHE_ASCII;
}
if (*pp != CONF_UNSET_PROTOCOL) {
if (*pp != proto) {
return "is a duplicate. Check for incompatible \"redis:\" and "
"\"protocol:\" directives";
} else {
return CONF_OK;
}
}
*pp = proto;
if (*pp != CONF_UNSET_PROTOCOL) {
return CONF_OK;
}
return "is not a valid protocol";
}
......@@ -25,6 +25,7 @@
#include <nc_core.h>
#include <hashkit/nc_hashkit.h>
#include <proto/nc_proto.h>
#define CONF_OK (void *) NULL
#define CONF_ERROR (void *) "has an invalid value"
......@@ -40,13 +41,14 @@
#define CONF_UNSET_PTR NULL
#define CONF_UNSET_HASH (hash_type_t) -1
#define CONF_UNSET_DIST (dist_type_t) -1
#define CONF_UNSET_PROTOCOL (protocol_type_t) -1
#define CONF_DEFAULT_HASH HASH_FNV1A_64
#define CONF_DEFAULT_DIST DIST_KETAMA
#define CONF_DEFAULT_PROTOCOL MEMCACHE_ASCII
#define CONF_DEFAULT_TIMEOUT -1
#define CONF_DEFAULT_LISTEN_BACKLOG 512
#define CONF_DEFAULT_CLIENT_CONNECTIONS 0
#define CONF_DEFAULT_REDIS false
#define CONF_DEFAULT_PRECONNECT false
#define CONF_DEFAULT_AUTO_EJECT_HOSTS false
#define CONF_DEFAULT_SERVER_RETRY_TIMEOUT 30 * 1000 /* in msec */
......@@ -77,10 +79,10 @@ struct conf_pool {
hash_type_t hash; /* hash: */
struct string hash_tag; /* hash_tag: */
dist_type_t distribution; /* distribution: */
protocol_type_t protocol; /* protocol: or redis: */
int timeout; /* timeout: */
int backlog; /* backlog: */
int client_connections; /* client_connections: */
int redis; /* redis: */
int preconnect; /* preconnect: */
int auto_eject_hosts; /* auto_eject_hosts: */
int server_connections; /* server_connections: */
......@@ -124,6 +126,7 @@ char *conf_set_bool(struct conf *cf, struct command *cmd, void *conf);
char *conf_set_hash(struct conf *cf, struct command *cmd, void *conf);
char *conf_set_distribution(struct conf *cf, struct command *cmd, void *conf);
char *conf_set_hashtag(struct conf *cf, struct command *cmd, void *conf);
char *conf_set_protocol(struct conf *cf, struct command *cmd, void *conf);
rstatus_t conf_server_each_transform(void *elem, void *data);
rstatus_t conf_pool_each_transform(void *elem, void *data);
......
......@@ -152,13 +152,13 @@ _conn_get(void)
conn->connected = 0;
conn->eof = 0;
conn->done = 0;
conn->redis = 0;
conn->protocol = -1;
return conn;
}
struct conn *
conn_get(void *owner, bool client, bool redis)
conn_get(void *owner, bool client, int protocol)
{
struct conn *conn;
......@@ -168,7 +168,7 @@ conn_get(void *owner, bool client, bool redis)
}
/* connection either handles redis or memcache messages */
conn->redis = redis ? 1 : 0;
conn->protocol = protocol;
conn->client = client ? 1 : 0;
......@@ -191,8 +191,8 @@ conn_get(void *owner, bool client, bool redis)
conn->ref = client_ref;
conn->unref = client_unref;
conn->enqueue_inq = NULL;
conn->dequeue_inq = NULL;
conn->enqueue_inq = req_client_enqueue_imsgq;
conn->dequeue_inq = req_client_dequeue_imsgq;
conn->enqueue_outq = req_client_enqueue_omsgq;
conn->dequeue_outq = req_client_dequeue_omsgq;
} else {
......@@ -238,7 +238,7 @@ conn_get_proxy(void *owner)
return NULL;
}
conn->redis = pool->redis;
conn->protocol = pool->protocol;
conn->proxy = 1;
......
......@@ -70,6 +70,8 @@ struct conn {
size_t recv_bytes; /* received (read) bytes */
size_t send_bytes; /* sent (written) bytes */
int protocol; /* protocol (redis or memcache) */
uint32_t events; /* connection io events */
err_t err; /* connection errno */
unsigned recv_active:1; /* recv active? */
......@@ -83,13 +85,12 @@ struct conn {
unsigned connected:1; /* connected? */
unsigned eof:1; /* eof? aka passive close? */
unsigned done:1; /* done? aka close? */
unsigned redis:1; /* redis? */
};
TAILQ_HEAD(conn_tqh, conn);
struct context *conn_to_ctx(struct conn *conn);
struct conn *conn_get(void *owner, bool client, bool redis);
struct conn *conn_get(void *owner, bool client, int protocol);
struct conn *conn_get_proxy(void *owner);
void conn_put(struct conn *conn);
ssize_t conn_recv(struct conn *conn, void *buf, size_t size);
......
......@@ -19,6 +19,7 @@
#include <unistd.h>
#include <nc_core.h>
#include <nc_conf.h>
#include <nc_client.h>
#include <nc_server.h>
#include <nc_proxy.h>
......@@ -266,7 +267,11 @@ core_timeout(struct context *ctx)
msg_tmo_delete(msg);
conn->err = ETIMEDOUT;
core_close(ctx, conn);
if (conn->client && !conn->proxy) {
client_timeout(ctx, conn);
} else {
core_close(ctx, conn);
}
}
}
......
......@@ -150,7 +150,7 @@ msg_tmo_insert(struct msg *msg, struct conn *conn)
ASSERT(msg->request);
ASSERT(!msg->quit && !msg->noreply);
timeout = server_timeout(conn);
timeout = server_pool_timeout(conn);
if (timeout <= 0) {
return;
}
......@@ -255,13 +255,15 @@ done:
msg->first_fragment = 0;
msg->last_fragment = 0;
msg->swallow = 0;
msg->redis = 0;
msg->protocol = -1;
msg->bufferable = 0;
msg->broadcastable = 0;
return msg;
}
struct msg *
msg_get(struct conn *conn, bool request, bool redis)
msg_get(struct conn *conn, bool request, int protocol)
{
struct msg *msg;
......@@ -272,9 +274,10 @@ msg_get(struct conn *conn, bool request, bool redis)
msg->owner = conn;
msg->request = request ? 1 : 0;
msg->redis = redis ? 1 : 0;
msg->protocol = protocol;
if (redis) {
switch (protocol) {
case REDIS:
if (request) {
msg->parser = redis_parse_req;
} else {
......@@ -284,7 +287,9 @@ msg_get(struct conn *conn, bool request, bool redis)
msg->post_splitcopy = redis_post_splitcopy;
msg->pre_coalesce = redis_pre_coalesce;
msg->post_coalesce = redis_post_coalesce;
} else {
break;
case MEMCACHE_ASCII:
if (request) {
msg->parser = memcache_parse_req;
} else {
......@@ -294,6 +299,23 @@ msg_get(struct conn *conn, bool request, bool redis)
msg->post_splitcopy = memcache_post_splitcopy;
msg->pre_coalesce = memcache_pre_coalesce;
msg->post_coalesce = memcache_post_coalesce;
break;
case MEMCACHE_BINARY:
if (request) {
msg->parser = mcdbin_parse_req;
} else {
msg->parser = mcdbin_parse_rsp;
}
msg->pre_splitcopy = mcdbin_pre_splitcopy;
msg->post_splitcopy = mcdbin_post_splitcopy;
msg->pre_coalesce = mcdbin_pre_coalesce;
msg->post_coalesce = mcdbin_post_coalesce;
break;
case PROTOCOL_SENTINEL:
default:
NOT_REACHED();
}
log_debug(LOG_VVERB, "get msg %p id %"PRIu64" request %d owner sd %d",
......@@ -303,32 +325,41 @@ msg_get(struct conn *conn, bool request, bool redis)
}
struct msg *
msg_get_error(bool redis, err_t err)
msg_get_error(int protocol, err_t err)
{
struct msg *msg;
struct mbuf *mbuf;
int n;
struct msg *msg, *mmsg;
char *errstr = err ? strerror(err) : "unknown";
char *protstr = redis ? "-ERR" : "SERVER_ERROR";
mmsg = NULL;
msg = _msg_get();
if (msg == NULL) {
return NULL;
}
msg->state = 0;
msg->type = MSG_RSP_MC_SERVER_ERROR;
switch (protocol) {
case REDIS:
mmsg = redis_generate_error(msg, err);
break;
case MEMCACHE_ASCII:
mmsg = memcache_generate_error(msg, err);
break;
case MEMCACHE_BINARY:
mmsg = mcdbin_generate_error(msg, err);
break;
mbuf = mbuf_get();
if (mbuf == NULL) {
case PROTOCOL_SENTINEL:
default:
mmsg = NULL;
NOT_REACHED();
}
if (mmsg == NULL) {
msg_put(msg);
return NULL;
}
mbuf_insert(&msg->mhdr, mbuf);
n = nc_scnprintf(mbuf->last, mbuf_size(mbuf), "%s %s"CRLF, protstr, errstr);
mbuf->last += n;
msg->mlen = (uint32_t)n;
ASSERT(mmsg == msg);
log_debug(LOG_VVERB, "get msg %p id %"PRIu64" len %"PRIu32" error '%s'",
msg, msg->id, msg->mlen, errstr);
......@@ -336,6 +367,44 @@ msg_get_error(bool redis, err_t err)
return msg;
}
struct msg *
msg_get_terminator(struct conn *conn, bool request, int protocol)
{
struct msg *msg, *mmsg;
mmsg = NULL;
msg = msg_get(conn, request, protocol);
switch (protocol) {
case REDIS:
mmsg = redis_get_terminator(msg);
break;
case MEMCACHE_ASCII:
mmsg = memcache_get_terminator(msg);
break;
case MEMCACHE_BINARY:
mmsg = mcdbin_get_terminator(msg);
break;
case PROTOCOL_SENTINEL:
default:
mmsg = NULL;
NOT_REACHED();
}
if (mmsg == NULL) {
msg_put(msg);
return NULL;
}
ASSERT(mmsg == msg);
log_debug(LOG_VERB, "get chain-terminator msg %p id %"PRIu64" len %"PRIu32,
msg, msg->id, msg->mlen);
return msg;
}
static void
msg_free(struct msg *msg)
{
......@@ -436,7 +505,7 @@ msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg)
return NC_ENOMEM;
}
nmsg = msg_get(msg->owner, msg->request, conn->redis);
nmsg = msg_get(msg->owner, msg->request, conn->protocol);
if (nmsg == NULL) {
mbuf_put(nbuf);
return NC_ENOMEM;
......@@ -474,7 +543,7 @@ msg_fragment(struct context *ctx, struct conn *conn, struct msg *msg)
return status;
}
nmsg = msg_get(msg->owner, msg->request, msg->redis);
nmsg = msg_get(msg->owner, msg->request, msg->protocol);
if (nmsg == NULL) {
mbuf_put(nbuf);
return NC_ENOMEM;
......@@ -527,7 +596,7 @@ msg_fragment(struct context *ctx, struct conn *conn, struct msg *msg)
*
*/
if (msg->frag_id == 0) {
msg->frag_id = ++frag_id;
msg->frag_id = get_next_frag_id();
msg->first_fragment = 1;
msg->nfrag = 1;
msg->frag_owner = msg;
......@@ -826,3 +895,9 @@ msg_send(struct context *ctx, struct conn *conn)
return NC_OK;
}
uint64_t
get_next_frag_id(void)
{
return ++frag_id;
}
......@@ -46,6 +46,8 @@ typedef enum msg_type {
MSG_REQ_MC_INCR, /* memcache arithmetic request */
MSG_REQ_MC_DECR,
MSG_REQ_MC_QUIT, /* memcache quit request */
MSG_REQ_MC_NOOP,
MSG_REQ_MC_GET_AND_TOUCH,
MSG_RSP_MC_NUM, /* memcache arithmetic response */
MSG_RSP_MC_STORED, /* memcache cas and storage response */
MSG_RSP_MC_NOT_STORED,
......@@ -57,6 +59,7 @@ typedef enum msg_type {
MSG_RSP_MC_ERROR, /* memcache error responses */
MSG_RSP_MC_CLIENT_ERROR,
MSG_RSP_MC_SERVER_ERROR,
MSG_RSP_MC_NOOP,
MSG_REQ_REDIS_DEL, /* redis commands - keys */
MSG_REQ_REDIS_EXISTS,
MSG_REQ_REDIS_EXPIRE,
......@@ -200,6 +203,8 @@ struct msg {
uint32_t nfrag; /* # fragment */
uint64_t frag_id; /* id of fragmented message */
int protocol; /* protocol (redis or memcache) */
err_t err; /* errno on error? */
unsigned error:1; /* error? */
unsigned ferror:1; /* one or more fragments are in error? */
......@@ -211,7 +216,8 @@ struct msg {
unsigned first_fragment:1;/* first fragment? */
unsigned last_fragment:1; /* last fragment? */
unsigned swallow:1; /* swallow response? */
unsigned redis:1; /* redis? */
unsigned bufferable:1; /* can be buffered */
unsigned broadcastable:1; /* can be broascasted */
};
TAILQ_HEAD(msg_tqh, msg);
......@@ -222,9 +228,10 @@ void msg_tmo_delete(struct msg *msg);
void msg_init(void);
void msg_deinit(void);
struct msg *msg_get(struct conn *conn, bool request, bool redis);
struct msg *msg_get(struct conn *conn, bool request, int protocol);
void msg_put(struct msg *msg);
struct msg *msg_get_error(bool redis, err_t err);
struct msg *msg_get_error(int protocol, err_t err);
struct msg *msg_get_terminator(struct conn *conn, bool request, int protocol);
void msg_dump(struct msg *msg);
bool msg_empty(struct msg *msg);
rstatus_t msg_recv(struct context *ctx, struct conn *conn);
......@@ -234,7 +241,9 @@ struct msg *req_get(struct conn *conn);
void req_put(struct msg *msg);
bool req_done(struct conn *conn, struct msg *msg);
bool req_error(struct conn *conn, struct msg *msg);
void req_client_enqueue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg);
void req_server_enqueue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg);
void req_client_dequeue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg);
void req_server_dequeue_imsgq(struct context *ctx, struct conn *conn, struct msg *msg);
void req_client_enqueue_omsgq(struct context *ctx, struct conn *conn, struct msg *msg);
void req_server_enqueue_omsgq(struct context *ctx, struct conn *conn, struct msg *msg);
......@@ -252,4 +261,6 @@ void rsp_recv_done(struct context *ctx, struct conn *conn, struct msg *msg, stru
struct msg *rsp_send_next(struct context *ctx, struct conn *conn);
void rsp_send_done(struct context *ctx, struct conn *conn, struct msg *msg);
uint64_t get_next_frag_id(void);
#endif
......@@ -199,9 +199,9 @@ proxy_each_init(void *elem, void *data)
return status;
}
log_debug(LOG_NOTICE, "p %d listening on '%.*s' in %s pool %"PRIu32" '%.*s'"
log_debug(LOG_NOTICE, "p %d listening on '%.*s' in %d pool %"PRIu32" '%.*s'"
" with %"PRIu32" servers", p->sd, pool->addrstr.len,
pool->addrstr.data, pool->redis ? "redis" : "memcache",
pool->addrstr.data, pool->protocol,
pool->idx, pool->name.len, pool->name.data,
array_n(&pool->server));
......@@ -294,7 +294,7 @@ proxy_accept(struct context *ctx, struct conn *p)
break;
}
c = conn_get(p->owner, true, p->redis);
c = conn_get(p->owner, true, p->protocol);
if (c == NULL) {
log_error("get conn for c %d from p