提交 feb6c288 编写于 作者: J jonathan pickett

fix for issue #97: Added a modified sample to demonstrate large async GET...

fix for issue #97: Added a modified sample to demonstrate large async GET operation. Also discovered that reply reading in async.c would stop if the reply data exceeded the buffer allocated for the read. redisAeAddRead() would not reschedule a read if already in read mode. To fix this is added redisAeForceAddRead().
上级 9f871c4e
......@@ -69,6 +69,15 @@ static void redisAeAddRead(void *privdata) {
}
}
#ifdef _WIN32
static void redisAeForceAddRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
e->reading = 1;
aeCreateFileEvent(loop, e->fd, AE_READABLE, redisAeReadEvent, e);
}
#endif
static void redisAeDelRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
......@@ -120,6 +129,9 @@ static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
/* Register functions to start/stop listening for events */
ac->ev.addRead = redisAeAddRead;
#ifdef _WIN32
ac->ev.forceAddRead = redisAeForceAddRead;
#endif
ac->ev.delRead = redisAeDelRead;
ac->ev.addWrite = redisAeAddWrite;
ac->ev.delWrite = redisAeDelWrite;
......
......@@ -49,6 +49,11 @@
#define _EL_ADD_READ(ctx) do { \
if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
} while(0)
#ifdef _WIN32
#define _EL_FORCE_ADD_READ(ctx) do { \
if ((ctx)->ev.forceAddRead) (ctx)->ev.forceAddRead((ctx)->ev.data); \
} while (0)
#endif
#define _EL_DEL_READ(ctx) do { \
if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
} while(0)
......@@ -126,6 +131,9 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
ac->ev.data = NULL;
ac->ev.addRead = NULL;
#ifdef _WIN32
ac->ev.forceAddRead = NULL;
#endif
ac->ev.delRead = NULL;
ac->ev.addWrite = NULL;
ac->ev.delWrite = NULL;
......@@ -501,7 +509,11 @@ void redisAsyncHandleRead(redisAsyncContext *ac) {
__redisAsyncDisconnect(ac);
} else {
/* Always re-schedule reads */
#ifdef _WIN32
_EL_FORCE_ADD_READ(ac);
#else
_EL_ADD_READ(ac);
#endif
redisProcessCallbacks(ac);
}
}
......
......@@ -76,6 +76,9 @@ typedef struct redisAsyncContext {
/* Hooks that are called when the library expects to start
* reading/writing. These functions should be idempotent. */
void (*addRead)(void *privdata);
#ifdef _WIN32
void (*forceAddRead)(void *privdata);
#endif
void (*delRead)(void *privdata);
void (*addWrite)(void *privdata);
void (*delWrite)(void *privdata);
......
......@@ -56,7 +56,7 @@
</PrecompiledHeaderFile>
<PrecompiledHeaderOutputFile>
</PrecompiledHeaderOutputFile>
<AdditionalIncludeDirectories>..\..\deps\hiredis;..\..\src</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories>..\..\..\deps\hiredis\;..\..\deps\hiredis;..\..\src</AdditionalIncludeDirectories>
<RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
</ClCompile>
<Link>
......@@ -77,7 +77,7 @@
</PrecompiledHeaderFile>
<PrecompiledHeaderOutputFile>
</PrecompiledHeaderOutputFile>
<AdditionalIncludeDirectories>..\..\deps\hiredis;..\..\src</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories>..\..\..\deps\hiredis\;..\..\deps\hiredis;..\..\src</AdditionalIncludeDirectories>
<RuntimeLibrary>MultiThreaded</RuntimeLibrary>
</ClCompile>
<Link>
......@@ -88,10 +88,10 @@
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="..\..\..\deps\hiredis\example-ae.c" />
<ClCompile Include="..\..\..\src\adlist.c" />
<ClCompile Include="..\..\..\src\ae.c" />
<ClCompile Include="..\..\..\src\zmalloc.c" />
<ClCompile Include="example-ae-win.c" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\..\src\ae.h" />
......
#ifdef _WIN32
#include "..\..\src\Win32_Interop\win32fixes.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include "hiredis.h"
#include "async.h"
#include "adapters\ae.h"
/* Put event loop in the global scope, so it can be explicitly stopped */
static aeEventLoop *loop;
static int getCallbackCalls = 0;
void getCallbackContinue(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
if (reply == NULL) return;
getCallbackCalls++;
printf("getCallback called %d times.\n", getCallbackCalls);
printf("argv[%s]: %s\n", (char*)privdata, reply->str);
}
void getCallbackEnd(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
if (reply == NULL) return;
getCallbackCalls++;
printf("getCallback called %d times.\n", getCallbackCalls);
printf("argv[%s]: %s\n", (char*)privdata, reply->str);
redisAsyncDisconnect(c);
}
void connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
return;
}
printf("Connected...\n");
}
void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
return;
}
printf("Disconnected...\n");
aeStop(loop);
}
int main (int argc, char **argv) {
#ifndef _WIN32
signal(SIGPIPE, SIG_IGN);
#endif
#ifdef _WIN32
/* For Win32_IOCP the event loop must be created before the async connect */
loop = aeCreateEventLoop(1024 * 10);
#endif
redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
if (c->err) {
/* Let *c leak for now... */
printf("Error: %s\n", c->errstr);
return 1;
}
#ifndef _WIN32
loop = aeCreateEventLoop(1024*10);
#endif
redisAeAttach(loop, c);
redisAsyncSetConnectCallback(c,connectCallback);
redisAsyncSetDisconnectCallback(c,disconnectCallback);
redisAsyncCommand(c, NULL, NULL, "SET key %s", argv[argc-1], strlen(argv[argc-1]));
for (int i = 0; i < 20000; i++) {
printf("calling get %i times\n", i);
redisAsyncCommand(c, getCallbackContinue, "0", "GET key");
}
redisAsyncCommand(c, getCallbackEnd, "0", "GET key");
aeMain(loop);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册