提交 8dd32632 编写于 作者: A antirez

Cluster: use a number of gossip sections proportional to cluster size.

Otherwise it is impossible to receive the majority of failure reports in
the node_timeout*2 window in larger clusters.

Still with a 200 nodes cluster, 20 gossip sections are a very reasonable
amount of bytes to send.

A side effect of this change is also fater cluster nodes joins for large
clusters, because the cluster layout makes less time to propagate.
上级 5031c239
...@@ -2037,7 +2037,8 @@ void clusterBroadcastMessage(void *buf, size_t len) { ...@@ -2037,7 +2037,8 @@ void clusterBroadcastMessage(void *buf, size_t len) {
dictReleaseIterator(di); dictReleaseIterator(di);
} }
/* Build the message header */ /* Build the message header. hdr must point to a buffer at least
* sizeof(clusterMsg) in bytes. */
void clusterBuildMessageHdr(clusterMsg *hdr, int type) { void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
int totlen = 0; int totlen = 0;
uint64_t offset; uint64_t offset;
...@@ -2098,40 +2099,60 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { ...@@ -2098,40 +2099,60 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
/* Send a PING or PONG packet to the specified node, making sure to add enough /* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip informations. */ * gossip informations. */
void clusterSendPing(clusterLink *link, int type) { void clusterSendPing(clusterLink *link, int type) {
unsigned char buf[sizeof(clusterMsg)+sizeof(clusterMsgDataGossip)*3]; unsigned char *buf;
clusterMsg *hdr = (clusterMsg*) buf; clusterMsg *hdr;
int gossipcount = 0, totlen; int gossipcount = 0; /* Number of gossip sections added so far. */
/* freshnodes is the number of nodes we can still use to populate the int wanted; /* Number of gossip sections we want to append if possible. */
* gossip section of the ping packet. Basically we start with the nodes int totlen; /* Total packet length. */
* we have in memory minus two (ourself and the node we are sending the /* freshnodes is the max number of nodes we can hope to append at all:
* message to). Every time we add a node we decrement the counter, so when * nodes available minus two (ourself and the node we are sending the
* it will drop to <= zero we know there is no more gossip info we can * message to). However practically there may be less valid nodes since
* send. */ * nodes in handshake state, disconnected, are not considered. */
int freshnodes = dictSize(server.cluster->nodes)-2; int freshnodes = dictSize(server.cluster->nodes)-2;
/* How many gossip sections we want to add? 1/10 of the available nodes
* and anyway at least 3. */
wanted = freshnodes/10;
if (wanted < 3) wanted = 3;
/* Compute the maxium totlen to allocate our buffer. We'll fix the totlen
* later according to the number of gossip sections we really were able
* to put inside the packet. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*wanted);
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
buf = zcalloc(totlen);
hdr = (clusterMsg*) buf;
/* Populate the header. */
if (link->node && type == CLUSTERMSG_TYPE_PING) if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime(); link->node->ping_sent = mstime();
clusterBuildMessageHdr(hdr,type); clusterBuildMessageHdr(hdr,type);
/* Populate the gossip fields */ /* Populate the gossip fields */
while(freshnodes > 0 && gossipcount < 3) { int maxiterations = wanted+10;
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
dictEntry *de = dictGetRandomKey(server.cluster->nodes); dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de); clusterNode *this = dictGetVal(de);
clusterMsgDataGossip *gossip; clusterMsgDataGossip *gossip;
int j; int j;
/* Don't include this node: the whole packet header is about us
* already, so we just gossip about other nodes. */
if (this == myself) continue;
/* In the gossip section don't include: /* In the gossip section don't include:
* 1) Myself. * 1) Nodes in HANDSHAKE state.
* 2) Nodes in HANDSHAKE state.
* 3) Nodes with the NOADDR flag set. * 3) Nodes with the NOADDR flag set.
* 4) Disconnected nodes if they don't have configured slots. * 4) Disconnected nodes if they don't have configured slots.
*/ */
if (this == myself || if (this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||
this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0)) (this->link == NULL && this->numslots == 0))
{ {
freshnodes--; /* otherwise we may loop forever. */ freshnodes--; /* Tecnically not correct, but saves CPU. */
continue; continue;
} }
/* Check if we already added this node */ /* Check if we already added this node */
...@@ -2152,11 +2173,15 @@ void clusterSendPing(clusterLink *link, int type) { ...@@ -2152,11 +2173,15 @@ void clusterSendPing(clusterLink *link, int type) {
gossip->flags = htons(this->flags); gossip->flags = htons(this->flags);
gossipcount++; gossipcount++;
} }
/* Ready to send... fix the totlen fiend and queue the message in the
* output buffer. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount); totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
hdr->count = htons(gossipcount); hdr->count = htons(gossipcount);
hdr->totlen = htonl(totlen); hdr->totlen = htonl(totlen);
clusterSendMessage(link,buf,totlen); clusterSendMessage(link,buf,totlen);
zfree(buf);
} }
/* Send a PONG packet to every connected node that's not in handshake state /* Send a PONG packet to every connected node that's not in handshake state
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册