提交 17d68f9c 编写于 作者: A antirez

Merge branch 'aggregates' of git://github.com/pietern/redis

...@@ -5389,8 +5389,26 @@ static int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) { ...@@ -5389,8 +5389,26 @@ static int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) {
return size1 - size2; return size1 - size2;
} }
#define REDIS_AGGR_SUM 1
#define REDIS_AGGR_MIN 2
#define REDIS_AGGR_MAX 3
inline static void zunionInterAggregate(double *target, double val, int aggregate) {
if (aggregate == REDIS_AGGR_SUM) {
*target = *target + val;
} else if (aggregate == REDIS_AGGR_MIN) {
*target = val < *target ? val : *target;
} else if (aggregate == REDIS_AGGR_MAX) {
*target = val > *target ? val : *target;
} else {
/* safety net */
redisAssert(0 != 0);
}
}
static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
int i, j, zsetnum; int i, j, zsetnum;
int aggregate = REDIS_AGGR_SUM;
zsetopsrc *src; zsetopsrc *src;
robj *dstobj; robj *dstobj;
zset *dstzset; zset *dstzset;
...@@ -5431,19 +5449,28 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { ...@@ -5431,19 +5449,28 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
/* parse optional extra arguments */ /* parse optional extra arguments */
if (j < c->argc) { if (j < c->argc) {
int remaining = c->argc-j; int remaining = c->argc - j;
while (remaining) { while (remaining) {
if (!strcasecmp(c->argv[j]->ptr,"weights")) { if (remaining >= (zsetnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
j++; remaining--; j++; remaining--;
if (remaining < zsetnum) {
zfree(src);
addReplySds(c,sdsnew("-ERR not enough weights for ZUNION/ZINTER\r\n"));
return;
}
for (i = 0; i < zsetnum; i++, j++, remaining--) { for (i = 0; i < zsetnum; i++, j++, remaining--) {
src[i].weight = strtod(c->argv[j]->ptr, NULL); src[i].weight = strtod(c->argv[j]->ptr, NULL);
} }
} else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
j++; remaining--;
if (!strcasecmp(c->argv[j]->ptr,"sum")) {
aggregate = REDIS_AGGR_SUM;
} else if (!strcasecmp(c->argv[j]->ptr,"min")) {
aggregate = REDIS_AGGR_MIN;
} else if (!strcasecmp(c->argv[j]->ptr,"max")) {
aggregate = REDIS_AGGR_MAX;
} else {
zfree(src);
addReply(c,shared.syntaxerr);
return;
}
j++; remaining--;
} else { } else {
zfree(src); zfree(src);
addReply(c,shared.syntaxerr); addReply(c,shared.syntaxerr);
...@@ -5452,27 +5479,28 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { ...@@ -5452,27 +5479,28 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
} }
} }
/* sort sets from the smallest to largest, this will improve our
* algorithm's performance */
qsort(src,zsetnum,sizeof(zsetopsrc), qsortCompareZsetopsrcByCardinality);
dstobj = createZsetObject(); dstobj = createZsetObject();
dstzset = dstobj->ptr; dstzset = dstobj->ptr;
if (op == REDIS_OP_INTER) { if (op == REDIS_OP_INTER) {
/* sort sets from the smallest to largest, this will improve our
* algorithm's performance */
qsort(src,zsetnum,sizeof(zsetopsrc), qsortCompareZsetopsrcByCardinality);
/* skip going over all entries if the smallest zset is NULL or empty */ /* skip going over all entries if the smallest zset is NULL or empty */
if (src[0].dict && dictSize(src[0].dict) > 0) { if (src[0].dict && dictSize(src[0].dict) > 0) {
/* precondition: as src[0].dict is non-empty and the zsets are ordered /* precondition: as src[0].dict is non-empty and the zsets are ordered
* from small to large, all src[i > 0].dict are non-empty too */ * from small to large, all src[i > 0].dict are non-empty too */
di = dictGetIterator(src[0].dict); di = dictGetIterator(src[0].dict);
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
double *score = zmalloc(sizeof(double)); double *score = zmalloc(sizeof(double)), value;
*score = 0.0; *score = src[0].weight * (*(double*)dictGetEntryVal(de));
for (j = 0; j < zsetnum; j++) { for (j = 1; j < zsetnum; j++) {
dictEntry *other = (j == 0) ? de : dictFind(src[j].dict,dictGetEntryKey(de)); dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
if (other) { if (other) {
*score = *score + src[j].weight * (*(double*)dictGetEntryVal(other)); value = src[j].weight * (*(double*)dictGetEntryVal(other));
zunionInterAggregate(score, value, aggregate);
} else { } else {
break; break;
} }
...@@ -5500,14 +5528,16 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { ...@@ -5500,14 +5528,16 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
/* skip key when already processed */ /* skip key when already processed */
if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue; if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue;
double *score = zmalloc(sizeof(double)); double *score = zmalloc(sizeof(double)), value;
*score = 0.0; *score = src[i].weight * (*(double*)dictGetEntryVal(de));
for (j = 0; j < zsetnum; j++) {
if (!src[j].dict) continue;
dictEntry *other = (i == j) ? de : dictFind(src[j].dict,dictGetEntryKey(de)); /* because the zsets are sorted by size, its only possible
* for sets at larger indices to hold this entry */
for (j = (i+1); j < zsetnum; j++) {
dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
if (other) { if (other) {
*score = *score + src[j].weight * (*(double*)dictGetEntryVal(other)); value = src[j].weight * (*(double*)dictGetEntryVal(other));
zunionInterAggregate(score, value, aggregate);
} }
} }
......
...@@ -1491,6 +1491,14 @@ proc main {server port} { ...@@ -1491,6 +1491,14 @@ proc main {server port} {
list [$r zunion zsetc 2 zseta zsetb weights 2 3] [$r zrange zsetc 0 -1 withscores] list [$r zunion zsetc 2 zseta zsetb weights 2 3] [$r zrange zsetc 0 -1 withscores]
} {4 {a 2 b 7 d 9 c 12}} } {4 {a 2 b 7 d 9 c 12}}
test {ZUNION with AGGREGATE MIN} {
list [$r zunion zsetc 2 zseta zsetb aggregate min] [$r zrange zsetc 0 -1 withscores]
} {4 {a 1 b 1 c 2 d 3}}
test {ZUNION with AGGREGATE MAX} {
list [$r zunion zsetc 2 zseta zsetb aggregate max] [$r zrange zsetc 0 -1 withscores]
} {4 {a 1 b 2 c 3 d 3}}
test {ZINTER basics} { test {ZINTER basics} {
list [$r zinter zsetc 2 zseta zsetb] [$r zrange zsetc 0 -1 withscores] list [$r zinter zsetc 2 zseta zsetb] [$r zrange zsetc 0 -1 withscores]
} {2 {b 3 c 5}} } {2 {b 3 c 5}}
...@@ -1499,6 +1507,14 @@ proc main {server port} { ...@@ -1499,6 +1507,14 @@ proc main {server port} {
list [$r zinter zsetc 2 zseta zsetb weights 2 3] [$r zrange zsetc 0 -1 withscores] list [$r zinter zsetc 2 zseta zsetb weights 2 3] [$r zrange zsetc 0 -1 withscores]
} {2 {b 7 c 12}} } {2 {b 7 c 12}}
test {ZINTER with AGGREGATE MIN} {
list [$r zinter zsetc 2 zseta zsetb aggregate min] [$r zrange zsetc 0 -1 withscores]
} {2 {b 1 c 2}}
test {ZINTER with AGGREGATE MAX} {
list [$r zinter zsetc 2 zseta zsetb aggregate max] [$r zrange zsetc 0 -1 withscores]
} {2 {b 2 c 3}}
test {SORT against sorted sets} { test {SORT against sorted sets} {
$r del zset $r del zset
$r zadd zset 1 a $r zadd zset 1 a
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册