Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
hanoi2005
redis
提交
40d224a9
R
redis
项目概览
hanoi2005
/
redis
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
redis
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
40d224a9
编写于
4月 19, 2009
作者:
A
antirez
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
SUNION, SUNIONSTORE, Initial work on non blocking replication
上级
6bea3d5f
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
223 addition
and
14 deletion
+223
-14
Changelog
Changelog
+1
-0
redis-cli.c
redis-cli.c
+2
-0
redis.c
redis.c
+206
-13
test-redis.tcl
test-redis.tcl
+14
-1
未找到文件。
Changelog
浏览文件 @
40d224a9
2009-04-10 Redis 0.091 released
2009-04-10 SINTER/SINTERSTORE/SLEMENTS fix: misisng keys are now not errors, but just like empty sets
2009-04-09 doc changes
2009-04-08 TODO changes, minor change to default redis.conf
...
...
redis-cli.c
浏览文件 @
40d224a9
...
...
@@ -80,6 +80,8 @@ static struct redisCommand cmdTable[] = {
{
"scard"
,
2
,
REDIS_CMD_INLINE
},
{
"sinter"
,
-
2
,
REDIS_CMD_INLINE
},
{
"sinterstore"
,
-
3
,
REDIS_CMD_INLINE
},
{
"sunion"
,
-
2
,
REDIS_CMD_INLINE
},
{
"sunionstore"
,
-
3
,
REDIS_CMD_INLINE
},
{
"smembers"
,
2
,
REDIS_CMD_INLINE
},
{
"incrby"
,
3
,
REDIS_CMD_INLINE
},
{
"decrby"
,
3
,
REDIS_CMD_INLINE
},
...
...
redis.c
浏览文件 @
40d224a9
...
...
@@ -126,11 +126,20 @@
#define REDIS_MASTER 4
/* This client is a master server */
#define REDIS_MONITOR 8
/* This client is a slave monitor, see MONITOR */
/* S
erver replication stat
e */
/* S
lave replication state - slave sid
e */
#define REDIS_REPL_NONE 0
/* No active replication */
#define REDIS_REPL_CONNECT 1
/* Must connect to master */
#define REDIS_REPL_CONNECTED 2
/* Connected to master */
/* Slave replication state - from the point of view of master
* Note that in SEND_BULK and ONLINE state the slave receives new updates
* in its output queue. In the WAIT_BGSAVE state instead the server is waiting
* to start the next background saving in order to send updates to it. */
#define REDIS_REPL_WAIT_BGSAVE_START 3
/* master waits bgsave to start feeding it */
#define REDIS_REPL_WAIT_BGSAVE_END 4
/* master waits bgsave to start bulk DB transmission */
#define REDIS_REPL_SEND_BULK 5
/* master is sending the bulk DB */
#define REDIS_REPL_ONLINE 6
/* bulk DB already transmitted, receive updates */
/* List related stuff */
#define REDIS_HEAD 0
#define REDIS_TAIL 1
...
...
@@ -176,13 +185,17 @@ typedef struct redisClient {
sds
querybuf
;
robj
*
argv
[
REDIS_MAX_ARGS
];
int
argc
;
int
bulklen
;
/* bulk read len. -1 if not in bulk read mode */
int
bulklen
;
/* bulk read len. -1 if not in bulk read mode */
list
*
reply
;
int
sentlen
;
time_t
lastinteraction
;
/* time of the last interaction, used for timeout */
int
flags
;
/* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
int
slaveseldb
;
/* slave selected db, if this client is a slave */
int
authenticated
;
/* when requirepass is non-NULL */
int
flags
;
/* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
int
slaveseldb
;
/* slave selected db, if this client is a slave */
int
authenticated
;
/* when requirepass is non-NULL */
int
replstate
;
/* replication state if this is a slave */
int
repldbfd
;
/* replication DB file descriptor */
int
repldboff
;
/* replication DB file offset */
off_t
repldbsize
;
/* replication DB file size */
}
redisClient
;
struct
saveparam
{
...
...
@@ -229,7 +242,7 @@ struct redisServer {
int
isslave
;
char
*
masterhost
;
int
masterport
;
redisClient
*
master
;
redisClient
*
master
;
/* client that is master for this slave */
int
replstate
;
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
...
...
@@ -331,6 +344,8 @@ static void sismemberCommand(redisClient *c);
static
void
scardCommand
(
redisClient
*
c
);
static
void
sinterCommand
(
redisClient
*
c
);
static
void
sinterstoreCommand
(
redisClient
*
c
);
static
void
sunionCommand
(
redisClient
*
c
);
static
void
sunionstoreCommand
(
redisClient
*
c
);
static
void
syncCommand
(
redisClient
*
c
);
static
void
flushdbCommand
(
redisClient
*
c
);
static
void
flushallCommand
(
redisClient
*
c
);
...
...
@@ -370,6 +385,8 @@ static struct redisCommand cmdTable[] = {
{
"scard"
,
scardCommand
,
2
,
REDIS_CMD_INLINE
},
{
"sinter"
,
sinterCommand
,
-
2
,
REDIS_CMD_INLINE
},
{
"sinterstore"
,
sinterstoreCommand
,
-
3
,
REDIS_CMD_INLINE
},
{
"sunion"
,
sunionCommand
,
-
2
,
REDIS_CMD_INLINE
},
{
"sunionstore"
,
sunionstoreCommand
,
-
3
,
REDIS_CMD_INLINE
},
{
"smembers"
,
sinterCommand
,
2
,
REDIS_CMD_INLINE
},
{
"incrby"
,
incrbyCommand
,
3
,
REDIS_CMD_INLINE
},
{
"decrby"
,
decrbyCommand
,
3
,
REDIS_CMD_INLINE
},
...
...
@@ -1232,8 +1249,20 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di
}
outv
[
outc
++
]
=
shared
.
crlf
;
/* Increment all the refcounts at start and decrement at end in order to
* be sure to free objects if there is no slave in a replication state
* able to be feed with commands */
for
(
j
=
0
;
j
<
outc
;
j
++
)
incrRefCount
(
outv
[
j
]);
while
(
ln
)
{
redisClient
*
slave
=
ln
->
value
;
/* Don't feed slaves that are still waiting for BGSAVE to start */
if
(
slave
->
replstate
==
REDIS_REPL_WAIT_BGSAVE_START
)
{
ln
=
ln
->
next
;
continue
;
}
/* Feed all the other slaves, MONITORs and so on */
if
(
slave
->
slaveseldb
!=
dictid
)
{
robj
*
selectcmd
;
...
...
@@ -1260,6 +1289,7 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di
for
(
j
=
0
;
j
<
outc
;
j
++
)
addReply
(
slave
,
outv
[
j
]);
ln
=
ln
->
next
;
}
for
(
j
=
0
;
j
<
outc
;
j
++
)
decrRefCount
(
outv
[
j
]);
}
static
void
readQueryFromClient
(
aeEventLoop
*
el
,
int
fd
,
void
*
privdata
,
int
mask
)
{
...
...
@@ -1363,6 +1393,11 @@ static int selectDb(redisClient *c, int id) {
return
REDIS_OK
;
}
static
void
*
dupClientReplyValue
(
void
*
o
)
{
incrRefCount
((
robj
*
)
o
);
return
0
;
}
static
redisClient
*
createClient
(
int
fd
)
{
redisClient
*
c
=
zmalloc
(
sizeof
(
*
c
));
...
...
@@ -1378,8 +1413,10 @@ static redisClient *createClient(int fd) {
c
->
flags
=
0
;
c
->
lastinteraction
=
time
(
NULL
);
c
->
authenticated
=
0
;
c
->
replstate
=
REDIS_REPL_NONE
;
if
((
c
->
reply
=
listCreate
())
==
NULL
)
oom
(
"listCreate"
);
listSetFreeMethod
(
c
->
reply
,
decrRefCount
);
listSetDupMethod
(
c
->
reply
,
dupClientReplyValue
);
if
(
aeCreateFileEvent
(
server
.
el
,
c
->
fd
,
AE_READABLE
,
readQueryFromClient
,
c
,
NULL
)
==
AE_ERR
)
{
freeClient
(
c
);
...
...
@@ -2861,7 +2898,6 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r
deleteKey
(
c
->
db
,
dstkey
);
dictAdd
(
c
->
db
->
dict
,
dstkey
,
dstset
);
incrRefCount
(
dstkey
);
server
.
dirty
++
;
}
/* Iterate all the elements of the first (smallest) set, and test
...
...
@@ -2886,15 +2922,16 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r
}
else
{
dictAdd
(
dstset
->
ptr
,
ele
,
NULL
);
incrRefCount
(
ele
);
server
.
dirty
++
;
}
}
dictReleaseIterator
(
di
);
if
(
!
dstkey
)
if
(
!
dstkey
)
{
lenobj
->
ptr
=
sdscatprintf
(
sdsempty
(),
"*%d
\r\n
"
,
cardinality
);
else
}
else
{
addReply
(
c
,
shared
.
ok
);
server
.
dirty
++
;
}
zfree
(
dv
);
}
...
...
@@ -2906,6 +2943,100 @@ static void sinterstoreCommand(redisClient *c) {
sinterGenericCommand
(
c
,
c
->
argv
+
2
,
c
->
argc
-
2
,
c
->
argv
[
1
]);
}
static
void
sunionGenericCommand
(
redisClient
*
c
,
robj
**
setskeys
,
int
setsnum
,
robj
*
dstkey
)
{
dict
**
dv
=
zmalloc
(
sizeof
(
dict
*
)
*
setsnum
);
dictIterator
*
di
;
dictEntry
*
de
;
robj
*
lenobj
=
NULL
,
*
dstset
=
NULL
;
int
j
,
cardinality
=
0
;
if
(
!
dv
)
oom
(
"sunionCommand"
);
for
(
j
=
0
;
j
<
setsnum
;
j
++
)
{
robj
*
setobj
;
setobj
=
dstkey
?
lookupKeyWrite
(
c
->
db
,
setskeys
[
j
])
:
lookupKeyRead
(
c
->
db
,
setskeys
[
j
]);
if
(
!
setobj
)
{
dv
[
j
]
=
NULL
;
continue
;
}
if
(
setobj
->
type
!=
REDIS_SET
)
{
zfree
(
dv
);
addReply
(
c
,
shared
.
wrongtypeerr
);
return
;
}
dv
[
j
]
=
setobj
->
ptr
;
}
/* We need a temp set object to store our union. If the dstkey
* is not NULL (that is, we are inside an SUNIONSTORE operation) then
* this set object will be the resulting object to set into the target key*/
dstset
=
createSetObject
();
/* The first thing we should output is the total number of elements...
* since this is a multi-bulk write, but at this stage we don't know
* the intersection set size, so we use a trick, append an empty object
* to the output list and save the pointer to later modify it with the
* right length */
if
(
!
dstkey
)
{
lenobj
=
createObject
(
REDIS_STRING
,
NULL
);
addReply
(
c
,
lenobj
);
decrRefCount
(
lenobj
);
}
else
{
/* If we have a target key where to store the resulting set
* create this key with an empty set inside */
deleteKey
(
c
->
db
,
dstkey
);
dictAdd
(
c
->
db
->
dict
,
dstkey
,
dstset
);
incrRefCount
(
dstkey
);
server
.
dirty
++
;
}
/* Iterate all the elements of all the sets, add every element a single
* time to the result set */
for
(
j
=
0
;
j
<
setsnum
;
j
++
)
{
if
(
!
dv
[
j
])
continue
;
/* non existing keys are like empty sets */
di
=
dictGetIterator
(
dv
[
j
]);
if
(
!
di
)
oom
(
"dictGetIterator"
);
while
((
de
=
dictNext
(
di
))
!=
NULL
)
{
robj
*
ele
;
/* dictAdd will not add the same element multiple times */
ele
=
dictGetEntryKey
(
de
);
if
(
dictAdd
(
dstset
->
ptr
,
ele
,
NULL
)
==
DICT_OK
)
{
incrRefCount
(
ele
);
if
(
!
dstkey
)
{
addReplySds
(
c
,
sdscatprintf
(
sdsempty
(),
"$%d
\r\n
"
,
sdslen
(
ele
->
ptr
)));
addReply
(
c
,
ele
);
addReply
(
c
,
shared
.
crlf
);
cardinality
++
;
}
}
}
dictReleaseIterator
(
di
);
}
if
(
!
dstkey
)
{
lenobj
->
ptr
=
sdscatprintf
(
sdsempty
(),
"*%d
\r\n
"
,
cardinality
);
decrRefCount
(
dstset
);
}
else
{
addReply
(
c
,
shared
.
ok
);
server
.
dirty
++
;
}
zfree
(
dv
);
}
static
void
sunionCommand
(
redisClient
*
c
)
{
sunionGenericCommand
(
c
,
c
->
argv
+
1
,
c
->
argc
-
1
,
NULL
);
}
static
void
sunionstoreCommand
(
redisClient
*
c
)
{
sunionGenericCommand
(
c
,
c
->
argv
+
2
,
c
->
argc
-
2
,
c
->
argv
[
1
]);
}
static
void
flushdbCommand
(
redisClient
*
c
)
{
dictEmpty
(
c
->
db
->
dict
);
dictEmpty
(
c
->
db
->
expires
);
...
...
@@ -3404,6 +3535,67 @@ static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
}
static
void
syncCommand
(
redisClient
*
c
)
{
/* ignore SYNC if aleady slave or in monitor mode */
if
(
c
->
flags
&
REDIS_SLAVE
)
return
;
/* SYNC can't be issued when the server has pending data to send to
* the client about already issued commands. We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
if
(
listLength
(
c
->
reply
)
!=
0
)
{
addReplySds
(
c
,
sdsnew
(
"-ERR SYNC is invalid with pending input
\r\n
"
));
return
;
}
redisLog
(
REDIS_NOTICE
,
"Slave ask for synchronization"
);
/* Here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
if
(
server
.
bgsaveinprogress
)
{
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save */
redisClient
*
slave
;
listNode
*
ln
;
ln
=
server
.
slaves
->
head
;
while
(
ln
)
{
slave
=
ln
->
value
;
if
(
slave
->
replstate
==
REDIS_REPL_WAIT_BGSAVE_END
)
break
;
ln
=
ln
->
next
;
}
if
(
ln
)
{
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
listRelease
(
c
->
reply
);
c
->
reply
=
listDup
(
slave
->
reply
);
if
(
!
c
->
reply
)
oom
(
"listDup copying slave reply list"
);
c
->
replstate
=
REDIS_REPL_WAIT_BGSAVE_END
;
redisLog
(
REDIS_NOTICE
,
"Waiting for end of BGSAVE for SYNC"
);
}
else
{
/* No way, we need to wait for the next BGSAVE in order to
* register differences */
c
->
replstate
=
REDIS_REPL_WAIT_BGSAVE_START
;
redisLog
(
REDIS_NOTICE
,
"Waiting for next BGSAVE for SYNC"
);
}
}
else
{
/* Ok we don't have a BGSAVE in progress, let's start one */
redisLog
(
REDIS_NOTICE
,
"Starting BGSAVE for SYNC"
);
if
(
rdbSaveBackground
(
server
.
dbfilename
)
!=
REDIS_OK
)
{
redisLog
(
REDIS_NOTICE
,
"Replication failed, can't BGSAVE"
);
addReplySds
(
c
,
sdsnew
(
"-ERR Unalbe to perform background save
\r\n
"
));
return
;
}
c
->
replstate
=
REDIS_REPL_WAIT_BGSAVE_END
;
}
c
->
flags
|=
REDIS_SLAVE
;
c
->
slaveseldb
=
0
;
if
(
!
listAddNodeTail
(
server
.
slaves
,
c
))
oom
(
"listAddNodeTail"
);
redisLog
(
REDIS_NOTICE
,
"Synchronization with slave succeeded"
);
return
;
}
#if 0
static void _syncCommand(redisClient *c) {
struct stat sb;
int fd = -1, len;
time_t start = time(NULL);
...
...
@@ -3412,7 +3604,7 @@ static void syncCommand(redisClient *c) {
/* ignore SYNC if aleady slave or in monitor mode */
if (c->flags & REDIS_SLAVE) return;
redisLog
(
REDIS_NOTICE
,
"Slave ask for syncronization"
);
redisLog(REDIS_NOTICE,"Slave ask for sync
h
ronization");
if (flushClientOutput(c) == REDIS_ERR ||
rdbSave(server.dbfilename) != REDIS_OK)
goto closeconn;
...
...
@@ -3438,15 +3630,16 @@ static void syncCommand(redisClient *c) {
c->flags |= REDIS_SLAVE;
c->slaveseldb = 0;
if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
redisLog
(
REDIS_NOTICE
,
"Syncronization with slave succeeded"
);
redisLog(REDIS_NOTICE,"Sync
h
ronization with slave succeeded");
return;
closeconn:
if (fd != -1) close(fd);
c->flags |= REDIS_CLOSE;
redisLog
(
REDIS_WARNING
,
"Syncronization with slave failed"
);
redisLog(REDIS_WARNING,"Sync
h
ronization with slave failed");
return;
}
#endif
static
int
syncWithMaster
(
void
)
{
char
buf
[
1024
],
tmpfile
[
256
];
...
...
test-redis.tcl
浏览文件 @
40d224a9
...
...
@@ -424,12 +424,21 @@ proc main {server port} {
}
lsort
[
$r
sinter set1 set2
]
}
{
995 996 997 998 999
}
test
{
SUNION with two sets
}
{
lsort
[
$r
sunion set1 set2
]
}
[
lsort -uniq
"
[
$r
smembers set1
]
[
$r
smembers set2
]
"
]
test
{
SINTERSTORE with two sets
}
{
$r sinterstore setres set1 set2
lsort
[
$r
smembers setres
]
}
{
995 996 997 998 999
}
test
{
SUNIONSTORE with two sets
}
{
$r sunionstore setres set1 set2
lsort
[
$r
smembers setres
]
}
[
lsort -uniq
"
[
$r
smembers set1
]
[
$r
smembers set2
]
"
]
test
{
SINTER against three sets
}
{
$r sadd set3 999
$r sadd set3 995
...
...
@@ -442,7 +451,11 @@ proc main {server port} {
$r sinterstore setres set1 set2 set3
lsort
[
$r
smembers setres
]
}
{
995 999
}
test
{
SUNION with non existing keys
}
{
lsort
[
$r
sunion nokey1 set1 set2 nokey2
]
}
[
lsort -uniq
"
[
$r
smembers set1
]
[
$r
smembers set2
]
"
]
test
{
SAVE - make sure there are all the types as values
}
{
$r lpush mysavelist hello
$r lpush mysavelist world
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录