提交 d6704c9b 编写于 作者: A antirez

hiredis library updated.

This version of hiredis merges modifications of the Redis fork with
latest changes in the hiredis repository.

The same version was pushed on the hiredis repository and will probably
merged into the master branch in short time.
上级 cada7f96
......@@ -73,7 +73,7 @@ convert it to the protocol used to communicate with Redis.
One or more spaces separates arguments, so you can use the specifiers
anywhere in an argument:
reply = redisCommand("SET key:%s %s", myid, value);
reply = redisCommand(context, "SET key:%s %s", myid, value);
### Using replies
......@@ -320,6 +320,10 @@ The reply parsing API consists of the following functions:
int redisReaderFeed(redisReader *reader, const char *buf, size_t len);
int redisReaderGetReply(redisReader *reader, void **reply);
The same set of functions are used internally by hiredis when creating a
normal Redis context, the above API just exposes it to the user for a direct
usage.
### Usage
The function `redisReaderCreate` creates a `redisReader` structure that holds a
......@@ -346,6 +350,29 @@ immediately after creating the `redisReader`.
For example, [hiredis-rb](https://github.com/pietern/hiredis-rb/blob/master/ext/hiredis_ext/reader.c)
uses customized reply object functions to create Ruby objects.
### Reader max buffer
Both when using the Reader API directly or when using it indirectly via a
normal Redis context, the redisReader structure uses a buffer in order to
accumulate data from the server.
Usually this buffer is destroyed when it is empty and is larger than 16
kb in order to avoid wasting memory in unused buffers
However when working with very big payloads destroying the buffer may slow
down performances considerably, so it is possible to modify the max size of
an idle buffer changing the value of the `maxbuf` field of the reader structure
to the desired value. The special value of 0 means that there is no maximum
value for an idle buffer, so the buffer will never get freed.
For instance if you have a normal Redis context you can set the maximum idle
buffer to zero (unlimited) just with:
context->reader->maxbuf = 0;
This should be done only in order to maximize performances when working with
large payloads. The context should be set back to `REDIS_READER_MAX_BUF` again
as soon as possible in order to prevent allocation of useless memory.
## AUTHORS
Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and
......
......@@ -372,6 +372,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
__redisAsyncDisconnect(ac);
return;
}
/* If monitor mode, repush callback */
if(c->flags & REDIS_MONITORING) {
__redisPushCallback(&ac->replies,&cb);
}
/* When the connection is not being disconnected, simply stop
* trying to get replies and wait for the next loop tick. */
......@@ -381,22 +386,31 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
/* Even if the context is subscribed, pending regular callbacks will
* get a reply before pub/sub messages arrive. */
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
/* A spontaneous reply in a not-subscribed context can only be the
* error reply that is sent when a new connection exceeds the
* maximum number of allowed connections on the server side. This
* is seen as an error instead of a regular reply because the
* server closes the connection after sending it. To prevent the
* error from being overwritten by an EOF error the connection is
* closed here. See issue #43. */
if ( !(c->flags & REDIS_SUBSCRIBED) && ((redisReply*)reply)->type == REDIS_REPLY_ERROR ) {
/*
* A spontaneous reply in a not-subscribed context can be the error
* reply that is sent when a new connection exceeds the maximum
* number of allowed connections on the server side.
*
* This is seen as an error instead of a regular reply because the
* server closes the connection after sending it.
*
* To prevent the error from being overwritten by an EOF error the
* connection is closed here. See issue #43.
*
* Another possibility is that the server is loading its dataset.
* In this case we also want to close the connection, and have the
* user wait until the server is ready to take our request.
*/
if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
c->err = REDIS_ERR_OTHER;
snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
__redisAsyncDisconnect(ac);
return;
}
/* No more regular callbacks and no errors, the context *must* be subscribed. */
assert(c->flags & REDIS_SUBSCRIBED);
__redisGetSubscribeCallback(ac,reply,&cb);
/* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
if(c->flags & REDIS_SUBSCRIBED)
__redisGetSubscribeCallback(ac,reply,&cb);
}
if (cb.fn != NULL) {
......@@ -557,6 +571,10 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
/* (P)UNSUBSCRIBE does not have its own response: every channel or
* pattern that is unsubscribed will receive a message. This means we
* should not append a callback function for this command. */
} else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
/* Set monitor flag and push callback */
c->flags |= REDIS_MONITORING;
__redisPushCallback(&ac->replies,&cb);
} else {
if (c->flags & REDIS_SUBSCRIBED)
/* This will likely result in an error reply, but it needs to be
......
......@@ -446,7 +446,7 @@ static int processMultiBulkItem(redisReader *r) {
long elements;
int root = 0;
/* Set error for nested multi bulks with depth > 2 */
/* Set error for nested multi bulks with depth > 7 */
if (r->ridx == 8) {
__redisReaderSetError(r,REDIS_ERR_PROTOCOL,
"No support for nested multi bulk replies with depth > 7");
......@@ -564,6 +564,7 @@ redisReader *redisReaderCreate(void) {
r->errstr[0] = '\0';
r->fn = &defaultFunctions;
r->buf = sdsempty();
r->maxbuf = REDIS_READER_MAX_BUF;
if (r->buf == NULL) {
free(r);
return NULL;
......@@ -590,9 +591,8 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
/* Copy the provided buffer. */
if (buf != NULL && len >= 1) {
#if 0
/* Destroy internal buffer when it is empty and is quite large. */
if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) {
sdsfree(r->buf);
r->buf = sdsempty();
r->pos = 0;
......@@ -600,7 +600,6 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
/* r->buf should not be NULL since we just free'd a larger one. */
assert(r->buf != NULL);
}
#endif
newbuf = sdscatlen(r->buf,buf,len);
if (newbuf == NULL) {
......
......@@ -76,6 +76,9 @@
/* Flag that is set when the async context has one or more subscriptions. */
#define REDIS_SUBSCRIBED 0x20
/* Flag that is set when monitor mode is active */
#define REDIS_MONITORING 0x40
#define REDIS_REPLY_STRING 1
#define REDIS_REPLY_ARRAY 2
#define REDIS_REPLY_INTEGER 3
......@@ -83,6 +86,8 @@
#define REDIS_REPLY_STATUS 5
#define REDIS_REPLY_ERROR 6
#define REDIS_READER_MAX_BUF (1024*16) /* Default max unused reader buffer. */
#ifdef __cplusplus
extern "C" {
#endif
......@@ -122,6 +127,7 @@ typedef struct redisReader {
char *buf; /* Read buffer */
size_t pos; /* Buffer cursor */
size_t len; /* Buffer length */
size_t maxbuf; /* Max length of unused buffer */
redisReadTask rstack[9];
int ridx; /* Index of current read task */
......
......@@ -45,6 +45,8 @@
#include <errno.h>
#include <stdarg.h>
#include <stdio.h>
#include <poll.h>
#include <limits.h>
#include "net.h"
#include "sds.h"
......@@ -121,28 +123,38 @@ static int redisSetTcpNoDelay(redisContext *c, int fd) {
return REDIS_OK;
}
#define __MAX_MSEC (((LONG_MAX) - 999) / 1000)
static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) {
struct timeval to;
struct timeval *toptr = NULL;
fd_set wfd;
struct pollfd wfd[1];
long msec;
msec = -1;
wfd[0].fd = fd;
wfd[0].events = POLLOUT;
/* Only use timeout when not NULL. */
if (timeout != NULL) {
to = *timeout;
toptr = &to;
if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) {
close(fd);
return REDIS_ERR;
}
msec = (timeout->tv_sec * 1000) + ((timeout->tv_usec + 999) / 1000);
if (msec < 0 || msec > INT_MAX) {
msec = INT_MAX;
}
}
if (errno == EINPROGRESS) {
FD_ZERO(&wfd);
FD_SET(fd, &wfd);
int res;
if (select(FD_SETSIZE, NULL, &wfd, NULL, toptr) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,"select(2)");
if ((res = poll(wfd, 1, msec)) == -1) {
__redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)");
close(fd);
return REDIS_ERR;
}
if (!FD_ISSET(fd, &wfd)) {
} else if (res == 0) {
errno = ETIMEDOUT;
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(fd);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册