提交 8bbc0768 编写于 作者: A antirez

BITOP command 10x speed improvement.

This commit adds a fast-path to the BITOP that can be used for all the
bytes from 0 to the minimal length of the string, and if there are
at max 16 input keys.

Often the intersected bitmaps are roughly the same size, so this
optimization can provide a 10x speed boost to most real world usages
of the command.

Bytes are processed four full words at a time, in loops specialized
for the specific BITOP sub-command, without the need to check for
length issues with the inputs (since we run this algorithm only as far
as there is data from all the keys at the same time).

The remaining part of the string is intersected in the usual way using
the slow but generic algorith.

It is possible to do better than this with inputs that are not roughly
the same size, sorting the input keys by length, by initializing the
result string in a smarter way, and noticing that the final part of the
output string composed of only data from the longest string does not
need any proecessing since AND, OR and XOR against an empty string does
not alter the output (zero in the first case, and the original string in
the other two cases).

More implementations will be implemented later likely, but this should
be enough to release Redis 2.6-RC4 with bitops merged in.

Note: this commit also adds better testing for BITOP NOT command, that
is currently the faster and hard to optimize further since it just
flips the bits of a single input string.
上级 f0d962c0
......@@ -162,6 +162,7 @@ void bitopCommand(redisClient *c) {
robj **objects; /* Array of soruce objects. */
unsigned char **src; /* Array of source strings pointers. */
long *len, maxlen = 0; /* Array of length of src strings, and max len. */
long minlen = 0; /* Min len among the input keys. */
unsigned char *res = NULL; /* Resulting string. */
/* Parse the operation name. */
......@@ -213,6 +214,7 @@ void bitopCommand(redisClient *c) {
src[j] = objects[j]->ptr;
len[j] = sdslen(objects[j]->ptr);
if (len[j] > maxlen) maxlen = len[j];
if (j == 0 || len[j] < minlen) minlen = len[j];
}
/* Compute the bit operation, if at least one string is not empty. */
......@@ -221,7 +223,73 @@ void bitopCommand(redisClient *c) {
unsigned char output, byte;
long i;
for (j = 0; j < maxlen; j++) {
/* Fast path: as far as we have data for all the input bitmaps we
* can take a fast path that performs much better than the
* vanilla algorithm. */
j = 0;
if (minlen && numkeys <= 16) {
unsigned long *lp[16];
unsigned long *lres = (unsigned long*) res;
/* Note: sds pointer is always aligned to 8 byte boundary. */
memcpy(lp,src,sizeof(unsigned long*)*numkeys);
memcpy(res,src[0],minlen);
/* Different branches per different operations for speed (sorry). */
if (op == BITOP_AND) {
while(minlen >= sizeof(unsigned long)*4) {
for (i = 1; i < numkeys; i++) {
lres[0] &= lp[i][0];
lres[1] &= lp[i][1];
lres[2] &= lp[i][2];
lres[3] &= lp[i][3];
lp[i]+=4;
}
lres+=4;
j += sizeof(unsigned long)*4;
minlen -= sizeof(unsigned long)*4;
}
} else if (op == BITOP_OR) {
while(minlen >= sizeof(unsigned long)*4) {
for (i = 1; i < numkeys; i++) {
lres[0] |= lp[i][0];
lres[1] |= lp[i][1];
lres[2] |= lp[i][2];
lres[3] |= lp[i][3];
lp[i]+=4;
}
lres+=4;
j += sizeof(unsigned long)*4;
minlen -= sizeof(unsigned long)*4;
}
} else if (op == BITOP_XOR) {
while(minlen >= sizeof(unsigned long)*4) {
for (i = 1; i < numkeys; i++) {
lres[0] ^= lp[i][0];
lres[1] ^= lp[i][1];
lres[2] ^= lp[i][2];
lres[3] ^= lp[i][3];
lp[i]+=4;
}
lres+=4;
j += sizeof(unsigned long)*4;
minlen -= sizeof(unsigned long)*4;
}
} else if (op == BITOP_NOT) {
while(minlen >= sizeof(unsigned long)*4) {
lres[0] = ~lres[0];
lres[1] = ~lres[1];
lres[2] = ~lres[2];
lres[3] = ~lres[3];
lres+=4;
j += sizeof(unsigned long)*4;
minlen -= sizeof(unsigned long)*4;
}
}
}
/* j is set to the next byte to process by the previous loop. */
for (; j < maxlen; j++) {
output = (len[0] <= j) ? 0 : src[0][j];
if (op == BITOP_NOT) output = ~output;
for (i = 1; i < numkeys; i++) {
......
......@@ -24,13 +24,13 @@ proc simulate_bit_op {op args} {
set out {}
for {set x 0} {$x < $maxlen} {incr x} {
set bit [string range $b(0) $x $x]
if {$op eq {not}} {set bit [expr {!$bit}]}
for {set j 1} {$j < $count} {incr j} {
set bit2 [string range $b($j) $x $x]
switch $op {
and {set bit [expr {$bit & $bit2}]}
or {set bit [expr {$bit | $bit2}]}
xor {set bit [expr {$bit ^ $bit2}]}
not {set bit [expr {!$bit}]}
}
}
append out $bit
......@@ -135,6 +135,16 @@ start_server {tags {"bitops"}} {
}
}
test {BITOP NOT fuzzing} {
for {set i 0} {$i < 10} {incr i} {
r flushall
set str [randstring 0 1000]
r set str $str
r bitop not target str
assert_equal [r get target] [simulate_bit_op not $str]
}
}
test {BITOP with integer encoded source objects} {
r set a 1
r set b 2
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册