sentinel.c 176.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
/* Redis Sentinel implementation
 *
 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

31
#include "server.h"
32
#include "hiredis.h"
33 34 35 36
#ifdef USE_OPENSSL
#include "openssl/ssl.h"
#include "hiredis_ssl.h"
#endif
37 38 39 40 41
#include "async.h"

#include <ctype.h>
#include <arpa/inet.h>
#include <sys/socket.h>
42
#include <sys/wait.h>
43
#include <fcntl.h>
44

45 46
extern char **environ;

47 48 49 50
#ifdef USE_OPENSSL
extern SSL_CTX *redis_tls_ctx;
#endif

51 52 53 54 55 56 57 58 59 60 61 62 63 64
#define REDIS_SENTINEL_PORT 26379

/* ======================== Sentinel global state =========================== */

/* Address object, used to describe an ip:port pair. */
typedef struct sentinelAddr {
    char *ip;
    int port;
} sentinelAddr;

/* A Sentinel Redis Instance object is monitoring. */
#define SRI_MASTER  (1<<0)
#define SRI_SLAVE   (1<<1)
#define SRI_SENTINEL (1<<2)
A
antirez 已提交
65 66 67
#define SRI_S_DOWN (1<<3)   /* Subjectively down (no quorum). */
#define SRI_O_DOWN (1<<4)   /* Objectively down (confirmed by others). */
#define SRI_MASTER_DOWN (1<<5) /* A Sentinel with this flag set thinks that
68
                                   its master is down. */
A
antirez 已提交
69
#define SRI_FAILOVER_IN_PROGRESS (1<<6) /* Failover is in progress for
70
                                           this master. */
A
antirez 已提交
71 72 73 74 75 76
#define SRI_PROMOTED (1<<7)            /* Slave selected for promotion. */
#define SRI_RECONF_SENT (1<<8)     /* SLAVEOF <newmaster> sent. */
#define SRI_RECONF_INPROG (1<<9)   /* Slave synchronization in progress. */
#define SRI_RECONF_DONE (1<<10)     /* Slave synchronized with new master. */
#define SRI_FORCE_FAILOVER (1<<11)  /* Force failover with master up. */
#define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */
77

78
/* Note: times are in milliseconds. */
79 80 81
#define SENTINEL_INFO_PERIOD 10000
#define SENTINEL_PING_PERIOD 1000
#define SENTINEL_ASK_PERIOD 1000
82
#define SENTINEL_PUBLISH_PERIOD 2000
83
#define SENTINEL_DEFAULT_DOWN_AFTER 30000
84 85 86 87
#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
#define SENTINEL_TILT_TRIGGER 2000
#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
88
#define SENTINEL_SLAVE_RECONF_TIMEOUT 10000
89 90
#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
91
#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*3*1000)
92
#define SENTINEL_MAX_PENDING_COMMANDS 100
A
antirez 已提交
93
#define SENTINEL_ELECTION_TIMEOUT 10000
94
#define SENTINEL_MAX_DESYNC 1000
95
#define SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG 1
96 97 98

/* Failover machine different states. */
#define SENTINEL_FAILOVER_STATE_NONE 0  /* No failover in progress. */
99
#define SENTINEL_FAILOVER_STATE_WAIT_START 1  /* Wait for failover_start_time*/
100 101 102 103
#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
104
#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* Monitor promoted slave. */
105 106 107 108

#define SENTINEL_MASTER_LINK_STATUS_UP 0
#define SENTINEL_MASTER_LINK_STATUS_DOWN 1

109 110 111
/* Generic flags that can be used with different functions.
 * They use higher bits to avoid colliding with the function specific
 * flags. */
112
#define SENTINEL_NO_FLAGS 0
113 114 115
#define SENTINEL_GENERATE_EVENT (1<<16)
#define SENTINEL_LEADER (1<<17)
#define SENTINEL_OBSERVER (1<<18)
116

117 118 119 120 121 122 123 124 125
/* Script execution flags and limits. */
#define SENTINEL_SCRIPT_NONE 0
#define SENTINEL_SCRIPT_RUNNING 1
#define SENTINEL_SCRIPT_MAX_QUEUE 256
#define SENTINEL_SCRIPT_MAX_RUNNING 16
#define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
#define SENTINEL_SCRIPT_MAX_RETRY 10
#define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */

126 127 128 129 130
/* SENTINEL SIMULATE-FAILURE command flags. */
#define SENTINEL_SIMFAILURE_NONE 0
#define SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION (1<<0)
#define SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION (1<<1)

A
antirez 已提交
131 132 133
/* The link to a sentinelRedisInstance. When we have the same set of Sentinels
 * monitoring many masters, we have different instances representing the
 * same Sentinels, one per master, and we need to share the hiredis connections
134
 * among them. Oherwise if 5 Sentinels are monitoring 100 masters we create
A
antirez 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148
 * 500 outgoing connections instead of 5.
 *
 * So this structure represents a reference counted link in terms of the two
 * hiredis connections for commands and Pub/Sub, and the fields needed for
 * failure detection, since the ping/pong time are now local to the link: if
 * the link is available, the instance is avaialbe. This way we don't just
 * have 5 connections instead of 500, we also send 5 pings instead of 500.
 *
 * Links are shared only for Sentinels: master and slave instances have
 * a link with refcount = 1, always. */
typedef struct instanceLink {
    int refcount;          /* Number of sentinelRedisInstance owners. */
    int disconnected;      /* Non-zero if we need to reconnect cc or pc. */
    int pending_commands;  /* Number of commands sent waiting for a reply. */
149 150 151 152 153 154 155
    redisAsyncContext *cc; /* Hiredis context for commands. */
    redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
    mstime_t cc_conn_time; /* cc connection time. */
    mstime_t pc_conn_time; /* pc connection time. */
    mstime_t pc_last_activity; /* Last time we received any message. */
    mstime_t last_avail_time; /* Last time the instance replied to ping with
                                 a reply we consider valid. */
156 157 158 159 160 161 162 163 164
    mstime_t act_ping_time;   /* Time at which the last pending ping (no pong
                                 received after it) was sent. This field is
                                 set to 0 when a pong is received, and set again
                                 to the current time if the value is 0 and a new
                                 ping is sent. */
    mstime_t last_ping_time;  /* Time at which we sent the last ping. This is
                                 only used to avoid sending too many pings
                                 during failure. Idle time is computed using
                                 the act_ping_time field. */
165 166 167
    mstime_t last_pong_time;  /* Last time the instance replied to ping,
                                 whatever the reply was. That's used to check
                                 if the link is idle and must be reconnected. */
168 169
    mstime_t last_reconn_time;  /* Last reconnection attempt performed when
                                   the link was down. */
A
antirez 已提交
170 171 172 173 174 175 176 177 178
} instanceLink;

typedef struct sentinelRedisInstance {
    int flags;      /* See SRI_... defines */
    char *name;     /* Master name from the point of view of this sentinel. */
    char *runid;    /* Run ID of this instance, or unique ID if is a Sentinel.*/
    uint64_t config_epoch;  /* Configuration epoch. */
    sentinelAddr *addr; /* Master host. */
    instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
179 180
    mstime_t last_pub_time;   /* Last time we sent hello via Pub/Sub. */
    mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
181
                                 we received a hello from this Sentinel
182 183 184 185 186 187 188
                                 via Pub/Sub. */
    mstime_t last_master_down_reply_time; /* Time of last reply to
                                             SENTINEL is-master-down command. */
    mstime_t s_down_since_time; /* Subjectively down since time. */
    mstime_t o_down_since_time; /* Objectively down since time. */
    mstime_t down_after_period; /* Consider it down after that period. */
    mstime_t info_refresh;  /* Time at which we received INFO output from it. */
189 190 191 192
    dict *renamed_commands;     /* Commands renamed in this instance:
                                   Sentinel will use the alternative commands
                                   mapped on this table to send things like
                                   SLAVEOF, CONFING, INFO, ... */
193

194 195 196 197 198 199 200
    /* Role and the first time we observed it.
     * This is useful in order to delay replacing what the instance reports
     * with our own configuration. We need to always wait some time in order
     * to give a chance to the leader to report the new configuration before
     * we do silly things. */
    int role_reported;
    mstime_t role_reported_time;
201
    mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
202

203 204 205
    /* Master specific. */
    dict *sentinels;    /* Other sentinels monitoring the same master. */
    dict *slaves;       /* Slaves for this master instance. */
206
    unsigned int quorum;/* Number of sentinels that need to agree on failure. */
207
    int parallel_syncs; /* How many slaves to reconfigure at same time. */
208 209
    char *auth_pass;    /* Password to use for AUTH against master & replica. */
    char *auth_user;    /* Username for ACLs AUTH against master & replica. */
210 211 212 213 214

    /* Slave specific. */
    mstime_t master_link_down_time; /* Slave replication link down time. */
    int slave_priority; /* Slave priority according to its INFO output. */
    mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
215
    struct sentinelRedisInstance *master; /* Master instance if it's slave. */
216 217 218
    char *slave_master_host;    /* Master host as reported by INFO */
    int slave_master_port;      /* Master port as reported by INFO */
    int slave_master_link_status; /* Master link status as reported by INFO */
219
    unsigned long long slave_repl_offset; /* Slave replication offset. */
220 221 222 223
    /* Failover */
    char *leader;       /* If this is a master instance, this is the runid of
                           the Sentinel that should perform the failover. If
                           this is a Sentinel, this is the runid of the Sentinel
224 225
                           that this Sentinel voted as leader. */
    uint64_t leader_epoch; /* Epoch of the 'leader' field. */
226
    uint64_t failover_epoch; /* Epoch of the currently started failover. */
227 228
    int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
    mstime_t failover_state_change_time;
229
    mstime_t failover_start_time;   /* Last failover attempt start time. */
230
    mstime_t failover_timeout;      /* Max time to refresh failover state. */
231 232
    mstime_t failover_delay_logged; /* For what failover_start_time value we
                                       logged the failover delay. */
233 234 235
    struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
    /* Scripts executed to notify admin or reconfigure clients: when they
     * are set to NULL no script is executed. */
236
    char *notification_script;
237
    char *client_reconfig_script;
238
    sds info; /* cached INFO output */
239 240 241 242
} sentinelRedisInstance;

/* Main state. */
struct sentinelState {
A
antirez 已提交
243
    char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
244
    uint64_t current_epoch;         /* Current epoch. */
245 246 247 248
    dict *masters;      /* Dictionary of master sentinelRedisInstances.
                           Key is the instance name, value is the
                           sentinelRedisInstance structure pointer. */
    int tilt;           /* Are we in TILT mode? */
249
    int running_scripts;    /* Number of scripts in execution right now. */
250 251 252 253 254 255 256
    mstime_t tilt_start_time;       /* When TITL started. */
    mstime_t previous_time;         /* Last time we ran the time handler. */
    list *scripts_queue;            /* Queue of user scripts to execute. */
    char *announce_ip;  /* IP addr that is gossiped to other sentinels if
                           not NULL. */
    int announce_port;  /* Port that is gossiped to other sentinels if
                           non zero. */
257
    unsigned long simfailure_flags; /* Failures simulation. */
258 259
    int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
                                  paths at runtime? */
260 261
} sentinel;

262 263 264 265 266 267 268 269 270 271 272 273 274
/* A script execution job. */
typedef struct sentinelScriptJob {
    int flags;              /* Script job flags: SENTINEL_SCRIPT_* */
    int retry_num;          /* Number of times we tried to execute it. */
    char **argv;            /* Arguments to call the script. */
    mstime_t start_time;    /* Script execution time if the script is running,
                               otherwise 0 if we are allowed to retry the
                               execution at any time. If the script is not
                               running and it's not 0, it means: do not run
                               before the specified time. */
    pid_t pid;              /* Script execution pid. */
} sentinelScriptJob;

275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
/* ======================= hiredis ae.c adapters =============================
 * Note: this implementation is taken from hiredis/adapters/ae.h, however
 * we have our modified copy for Sentinel in order to use our allocator
 * and to have full control over how the adapter works. */

typedef struct redisAeEvents {
    redisAsyncContext *context;
    aeEventLoop *loop;
    int fd;
    int reading, writing;
} redisAeEvents;

static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
    ((void)el); ((void)fd); ((void)mask);

    redisAeEvents *e = (redisAeEvents*)privdata;
    redisAsyncHandleRead(e->context);
}

static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
    ((void)el); ((void)fd); ((void)mask);

    redisAeEvents *e = (redisAeEvents*)privdata;
    redisAsyncHandleWrite(e->context);
}

static void redisAeAddRead(void *privdata) {
    redisAeEvents *e = (redisAeEvents*)privdata;
    aeEventLoop *loop = e->loop;
    if (!e->reading) {
        e->reading = 1;
        aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
    }
}

static void redisAeDelRead(void *privdata) {
    redisAeEvents *e = (redisAeEvents*)privdata;
    aeEventLoop *loop = e->loop;
    if (e->reading) {
        e->reading = 0;
        aeDeleteFileEvent(loop,e->fd,AE_READABLE);
    }
}

static void redisAeAddWrite(void *privdata) {
    redisAeEvents *e = (redisAeEvents*)privdata;
    aeEventLoop *loop = e->loop;
    if (!e->writing) {
        e->writing = 1;
        aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
    }
}

static void redisAeDelWrite(void *privdata) {
    redisAeEvents *e = (redisAeEvents*)privdata;
    aeEventLoop *loop = e->loop;
    if (e->writing) {
        e->writing = 0;
        aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
    }
}

static void redisAeCleanup(void *privdata) {
    redisAeEvents *e = (redisAeEvents*)privdata;
    redisAeDelRead(privdata);
    redisAeDelWrite(privdata);
    zfree(e);
}

static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
    redisContext *c = &(ac->c);
    redisAeEvents *e;

    /* Nothing should be attached when something is already attached */
    if (ac->ev.data != NULL)
350
        return C_ERR;
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366

    /* Create container for context and r/w events */
    e = (redisAeEvents*)zmalloc(sizeof(*e));
    e->context = ac;
    e->loop = loop;
    e->fd = c->fd;
    e->reading = e->writing = 0;

    /* Register functions to start/stop listening for events */
    ac->ev.addRead = redisAeAddRead;
    ac->ev.delRead = redisAeDelRead;
    ac->ev.addWrite = redisAeAddWrite;
    ac->ev.delWrite = redisAeDelWrite;
    ac->ev.cleanup = redisAeCleanup;
    ac->ev.data = e;

367
    return C_OK;
368 369 370 371 372 373 374 375 376 377 378
}

/* ============================= Prototypes ================================= */

void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
sentinelRedisInstance *sentinelGetMasterByName(char *name);
char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
int yesnotoi(char *s);
A
antirez 已提交
379
void instanceLinkConnectionError(const redisAsyncContext *c);
380
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
381
void sentinelAbortFailover(sentinelRedisInstance *ri);
382
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
383
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
384
void sentinelScheduleScriptExecution(char *path, ...);
385
void sentinelStartFailover(sentinelRedisInstance *master);
A
antirez 已提交
386
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
387
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port);
388
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch);
389
void sentinelFlushConfig(void);
390
void sentinelGenerateInitialMonitorEvents(void);
391
int sentinelSendPing(sentinelRedisInstance *ri);
392
int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master);
393
sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid);
394
void sentinelSimFailureCrash(void);
395 396 397

/* ========================= Dictionary types =============================== */

398
uint64_t dictSdsHash(const void *key);
399
uint64_t dictSdsCaseHash(const void *key);
400
int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
401
int dictSdsKeyCaseCompare(void *privdata, const void *key1, const void *key2);
402 403 404
void releaseSentinelRedisInstance(sentinelRedisInstance *ri);

void dictInstancesValDestructor (void *privdata, void *obj) {
A
antirez 已提交
405
    UNUSED(privdata);
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
    releaseSentinelRedisInstance(obj);
}

/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
 *
 * also used for: sentinelRedisInstance->sentinels dictionary that maps
 * sentinels ip:port to last seen time in Pub/Sub hello message. */
dictType instancesDictType = {
    dictSdsHash,               /* hash function */
    NULL,                      /* key dup */
    NULL,                      /* val dup */
    dictSdsKeyCompare,         /* key compare */
    NULL,                      /* key destructor */
    dictInstancesValDestructor /* val destructor */
};

/* Instance runid (sds) -> votes (long casted to void*)
 *
 * This is useful into sentinelGetObjectiveLeader() function in order to
 * count the votes and understand who is the leader. */
dictType leaderVotesDictType = {
    dictSdsHash,               /* hash function */
    NULL,                      /* key dup */
    NULL,                      /* val dup */
    dictSdsKeyCompare,         /* key compare */
    NULL,                      /* key destructor */
    NULL                       /* val destructor */
};

435 436
/* Instance renamed commands table. */
dictType renamedCommandsDictType = {
437
    dictSdsCaseHash,           /* hash function */
438 439 440 441 442 443 444
    NULL,                      /* key dup */
    NULL,                      /* val dup */
    dictSdsKeyCaseCompare,     /* key compare */
    dictSdsDestructor,         /* key destructor */
    dictSdsDestructor          /* val destructor */
};

445 446
/* =========================== Initialization =============================== */

447 448 449 450 451
void sentinelCommand(client *c);
void sentinelInfoCommand(client *c);
void sentinelSetCommand(client *c);
void sentinelPublishCommand(client *c);
void sentinelRoleCommand(client *c);
452 453 454 455 456 457 458

struct redisCommand sentinelcmds[] = {
    {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
    {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
    {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
A
antirez 已提交
459
    {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
460
    {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
461
    {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
462 463
    {"role",sentinelRoleCommand,1,"ok-loading",0,NULL,0,0,0,0,0},
    {"client",clientCommand,-2,"read-only no-script",0,NULL,0,0,0,0,0},
A
antirez 已提交
464
    {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0},
465 466
    {"auth",authCommand,2,"no-auth no-script ok-loading ok-stale fast",0,NULL,0,0,0,0,0},
    {"hello",helloCommand,-2,"no-auth no-script fast",0,NULL,0,0,0,0,0}
467 468 469 470 471 472
};

/* This function overwrites a few normal Redis config default with Sentinel
 * specific defaults. */
void initSentinelConfig(void) {
    server.port = REDIS_SENTINEL_PORT;
A
antirez 已提交
473
    server.protected_mode = 0; /* Sentinel must be exposed. */
474 475 476 477
}

/* Perform the Sentinel mode initialization. */
void initSentinel(void) {
478
    unsigned int j;
479 480 481

    /* Remove usual Redis commands from the command table, then just add
     * the SENTINEL command. */
482
    dictEmpty(server.commands,NULL);
483 484 485 486 487
    for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
        int retval;
        struct redisCommand *cmd = sentinelcmds+j;

        retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
A
antirez 已提交
488
        serverAssert(retval == DICT_OK);
489 490 491 492 493

        /* Translate the command string flags description into an actual
         * set of flags. */
        if (populateCommandTableParseFlags(cmd,cmd->sflags) == C_ERR)
            serverPanic("Unsupported command flag");
494 495 496
    }

    /* Initialize various data structures. */
A
antirez 已提交
497
    sentinel.current_epoch = 0;
498 499
    sentinel.masters = dictCreate(&instancesDictType,NULL);
    sentinel.tilt = 0;
500
    sentinel.tilt_start_time = 0;
501
    sentinel.previous_time = mstime();
502 503
    sentinel.running_scripts = 0;
    sentinel.scripts_queue = listCreate();
504 505
    sentinel.announce_ip = NULL;
    sentinel.announce_port = 0;
506
    sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
507
    sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG;
508
    memset(sentinel.myid,0,sizeof(sentinel.myid));
509 510
}

A
antirez 已提交
511 512 513
/* This function gets called when the server is in Sentinel mode, started,
 * loaded the configuration, and is ready for normal operations. */
void sentinelIsRunning(void) {
514
    int j;
A
antirez 已提交
515

516
    if (server.configfile == NULL) {
A
antirez 已提交
517
        serverLog(LL_WARNING,
518
            "Sentinel started without a config file. Exiting...");
519
        exit(1);
520
    } else if (access(server.configfile,W_OK) == -1) {
A
antirez 已提交
521
        serverLog(LL_WARNING,
522 523
            "Sentinel config file %s is not writable: %s. Exiting...",
            server.configfile,strerror(errno));
A
antirez 已提交
524 525
        exit(1);
    }
526

527 528 529
    /* If this Sentinel has yet no ID set in the configuration file, we
     * pick a random one and persist the config on disk. From now on this
     * will be this Sentinel ID across restarts. */
A
antirez 已提交
530
    for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
531 532
        if (sentinel.myid[j] != 0) break;

A
antirez 已提交
533
    if (j == CONFIG_RUN_ID_SIZE) {
J
Jack Drogon 已提交
534
        /* Pick ID and persist the config. */
A
antirez 已提交
535
        getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
536 537 538 539
        sentinelFlushConfig();
    }

    /* Log its ID to make debugging of issues simpler. */
A
antirez 已提交
540
    serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);
541

542 543 544
    /* We want to generate a +monitor event for every configured master
     * at startup. */
    sentinelGenerateInitialMonitorEvents();
A
antirez 已提交
545 546
}

547 548 549 550 551 552 553 554
/* ============================== sentinelAddr ============================== */

/* Create a sentinelAddr object and return it on success.
 * On error NULL is returned and errno is set to:
 *  ENOENT: Can't resolve the hostname.
 *  EINVAL: Invalid port number.
 */
sentinelAddr *createSentinelAddr(char *hostname, int port) {
A
antirez 已提交
555
    char ip[NET_IP_STR_LEN];
556 557
    sentinelAddr *sa;

558
    if (port < 0 || port > 65535) {
559 560 561
        errno = EINVAL;
        return NULL;
    }
562
    if (anetResolve(NULL,hostname,ip,sizeof(ip)) == ANET_ERR) {
563 564 565 566
        errno = ENOENT;
        return NULL;
    }
    sa = zmalloc(sizeof(*sa));
567
    sa->ip = sdsnew(ip);
568 569 570 571
    sa->port = port;
    return sa;
}

572 573 574 575 576 577 578 579 580 581
/* Return a duplicate of the source address. */
sentinelAddr *dupSentinelAddr(sentinelAddr *src) {
    sentinelAddr *sa;

    sa = zmalloc(sizeof(*sa));
    sa->ip = sdsnew(src->ip);
    sa->port = src->port;
    return sa;
}

582 583 584 585 586 587
/* Free a Sentinel address. Can't fail. */
void releaseSentinelAddr(sentinelAddr *sa) {
    sdsfree(sa->ip);
    zfree(sa);
}

588 589 590 591 592
/* Return non-zero if two addresses are equal. */
int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) {
    return a->port == b->port && !strcasecmp(a->ip,b->ip);
}

593 594 595
/* =========================== Events notification ========================== */

/* Send an event to log, pub/sub, user notification script.
596
 *
A
antirez 已提交
597
 * 'level' is the log level for logging. Only LL_WARNING events will trigger
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
 * the execution of the user notification script.
 *
 * 'type' is the message type, also used as a pub/sub channel name.
 *
 * 'ri', is the redis instance target of this event if applicable, and is
 * used to obtain the path of the notification script to execute.
 *
 * The remaining arguments are printf-alike.
 * If the format specifier starts with the two characters "%@" then ri is
 * not NULL, and the message is prefixed with an instance identifier in the
 * following format:
 *
 *  <instance type> <instance name> <ip> <port>
 *
 *  If the instance type is not master, than the additional string is
 *  added to specify the originating master:
 *
 *  @ <master name> <master ip> <master port>
 *
 *  Any other specifier after "%@" is processed by printf itself.
 */
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
                   const char *fmt, ...) {
    va_list ap;
A
antirez 已提交
622
    char msg[LOG_MAX_LEN];
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653
    robj *channel, *payload;

    /* Handle %@ */
    if (fmt[0] == '%' && fmt[1] == '@') {
        sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
                                         NULL : ri->master;

        if (master) {
            snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
                sentinelRedisInstanceTypeStr(ri),
                ri->name, ri->addr->ip, ri->addr->port,
                master->name, master->addr->ip, master->addr->port);
        } else {
            snprintf(msg, sizeof(msg), "%s %s %s %d",
                sentinelRedisInstanceTypeStr(ri),
                ri->name, ri->addr->ip, ri->addr->port);
        }
        fmt += 2;
    } else {
        msg[0] = '\0';
    }

    /* Use vsprintf for the rest of the formatting if any. */
    if (fmt[0] != '\0') {
        va_start(ap, fmt);
        vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
        va_end(ap);
    }

    /* Log the message if the log level allows it to be logged. */
    if (level >= server.verbosity)
A
antirez 已提交
654
        serverLog(level,"%s %s",type,msg);
655 656

    /* Publish the message via Pub/Sub if it's not a debugging one. */
A
antirez 已提交
657
    if (level != LL_DEBUG) {
658 659 660 661 662 663 664 665
        channel = createStringObject(type,strlen(type));
        payload = createStringObject(msg,strlen(msg));
        pubsubPublishMessage(channel,payload);
        decrRefCount(channel);
        decrRefCount(payload);
    }

    /* Call the notification script if applicable. */
A
antirez 已提交
666
    if (level == LL_WARNING && ri != NULL) {
667 668
        sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
                                         ri : ri->master;
669
        if (master && master->notification_script) {
670 671 672 673 674 675
            sentinelScheduleScriptExecution(master->notification_script,
                type,msg,NULL);
        }
    }
}

676 677 678 679 680 681 682 683 684 685 686
/* This function is called only at startup and is used to generate a
 * +monitor event for every configured master. The same events are also
 * generated when a master to monitor is added at runtime via the
 * SENTINEL MONITOR command. */
void sentinelGenerateInitialMonitorEvents(void) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetIterator(sentinel.masters);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
A
antirez 已提交
687
        sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
688 689 690 691
    }
    dictReleaseIterator(di);
}

692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
/* ============================ script execution ============================ */

/* Release a script job structure and all the associated data. */
void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
    int j = 0;

    while(sj->argv[j]) sdsfree(sj->argv[j++]);
    zfree(sj->argv);
    zfree(sj);
}

#define SENTINEL_SCRIPT_MAX_ARGS 16
void sentinelScheduleScriptExecution(char *path, ...) {
    va_list ap;
    char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
    int argc = 1;
    sentinelScriptJob *sj;

    va_start(ap, path);
    while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
        argv[argc] = va_arg(ap,char*);
        if (!argv[argc]) break;
        argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
        argc++;
    }
    va_end(ap);
    argv[0] = sdsnew(path);
719

720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743
    sj = zmalloc(sizeof(*sj));
    sj->flags = SENTINEL_SCRIPT_NONE;
    sj->retry_num = 0;
    sj->argv = zmalloc(sizeof(char*)*(argc+1));
    sj->start_time = 0;
    sj->pid = 0;
    memcpy(sj->argv,argv,sizeof(char*)*(argc+1));

    listAddNodeTail(sentinel.scripts_queue,sj);

    /* Remove the oldest non running script if we already hit the limit. */
    if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
        listNode *ln;
        listIter li;

        listRewind(sentinel.scripts_queue,&li);
        while ((ln = listNext(&li)) != NULL) {
            sj = ln->value;

            if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
            /* The first node is the oldest as we add on tail. */
            listDelNode(sentinel.scripts_queue,ln);
            sentinelReleaseScriptJob(sj);
            break;
744
        }
A
antirez 已提交
745
        serverAssert(listLength(sentinel.scripts_queue) <=
746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
                    SENTINEL_SCRIPT_MAX_QUEUE);
    }
}

/* Lookup a script in the scripts queue via pid, and returns the list node
 * (so that we can easily remove it from the queue if needed). */
listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
    listNode *ln;
    listIter li;

    listRewind(sentinel.scripts_queue,&li);
    while ((ln = listNext(&li)) != NULL) {
        sentinelScriptJob *sj = ln->value;

        if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
            return ln;
    }
    return NULL;
}

/* Run pending scripts if we are not already at max number of running
 * scripts. */
void sentinelRunPendingScripts(void) {
    listNode *ln;
    listIter li;
    mstime_t now = mstime();

    /* Find jobs that are not running and run them, from the top to the
     * tail of the queue, so we run older jobs first. */
    listRewind(sentinel.scripts_queue,&li);
    while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
           (ln = listNext(&li)) != NULL)
    {
        sentinelScriptJob *sj = ln->value;
        pid_t pid;

        /* Skip if already running. */
        if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;

        /* Skip if it's a retry, but not enough time has elapsed. */
        if (sj->start_time && sj->start_time > now) continue;

        sj->flags |= SENTINEL_SCRIPT_RUNNING;
        sj->start_time = mstime();
        sj->retry_num++;
        pid = fork();

        if (pid == -1) {
            /* Parent (fork error).
             * We report fork errors as signal 99, in order to unify the
             * reporting with other kind of errors. */
A
antirez 已提交
797
            sentinelEvent(LL_WARNING,"-script-error",NULL,
798 799 800 801 802 803 804 805 806 807 808
                          "%s %d %d", sj->argv[0], 99, 0);
            sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
            sj->pid = 0;
        } else if (pid == 0) {
            /* Child */
            execve(sj->argv[0],sj->argv,environ);
            /* If we are here an error occurred. */
            _exit(2); /* Don't retry execution. */
        } else {
            sentinel.running_scripts++;
            sj->pid = pid;
A
antirez 已提交
809
            sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842
        }
    }
}

/* How much to delay the execution of a script that we need to retry after
 * an error?
 *
 * We double the retry delay for every further retry we do. So for instance
 * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
 * starting from the second attempt to execute the script the delays are:
 * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
mstime_t sentinelScriptRetryDelay(int retry_num) {
    mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;

    while (retry_num-- > 1) delay *= 2;
    return delay;
}

/* Check for scripts that terminated, and remove them from the queue if the
 * script terminated successfully. If instead the script was terminated by
 * a signal, or returned exit code "1", it is scheduled to run again if
 * the max number of retries did not already elapsed. */
void sentinelCollectTerminatedScripts(void) {
    int statloc;
    pid_t pid;

    while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
        int exitcode = WEXITSTATUS(statloc);
        int bysignal = 0;
        listNode *ln;
        sentinelScriptJob *sj;

        if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
A
antirez 已提交
843
        sentinelEvent(LL_DEBUG,"-script-child",NULL,"%ld %d %d",
844
            (long)pid, exitcode, bysignal);
845

846 847
        ln = sentinelGetScriptListNodeByPid(pid);
        if (ln == NULL) {
A
antirez 已提交
848
            serverLog(LL_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866
            continue;
        }
        sj = ln->value;

        /* If the script was terminated by a signal or returns an
         * exit code of "1" (that means: please retry), we reschedule it
         * if the max number of retries is not already reached. */
        if ((bysignal || exitcode == 1) &&
            sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
        {
            sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
            sj->pid = 0;
            sj->start_time = mstime() +
                             sentinelScriptRetryDelay(sj->retry_num);
        } else {
            /* Otherwise let's remove the script, but log the event if the
             * execution did not terminated in the best of the ways. */
            if (bysignal || exitcode != 0) {
A
antirez 已提交
867
                sentinelEvent(LL_WARNING,"-script-error",NULL,
868 869 870 871 872
                              "%s %d %d", sj->argv[0], bysignal, exitcode);
            }
            listDelNode(sentinel.scripts_queue,ln);
            sentinelReleaseScriptJob(sj);
        }
873
        sentinel.running_scripts--;
874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890
    }
}

/* Kill scripts in timeout, they'll be collected by the
 * sentinelCollectTerminatedScripts() function. */
void sentinelKillTimedoutScripts(void) {
    listNode *ln;
    listIter li;
    mstime_t now = mstime();

    listRewind(sentinel.scripts_queue,&li);
    while ((ln = listNext(&li)) != NULL) {
        sentinelScriptJob *sj = ln->value;

        if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
            (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
        {
A
antirez 已提交
891
            sentinelEvent(LL_WARNING,"-script-timeout",NULL,"%s %ld",
892 893 894 895 896 897 898
                sj->argv[0], (long)sj->pid);
            kill(sj->pid,SIGKILL);
        }
    }
}

/* Implements SENTINEL PENDING-SCRIPTS command. */
899
void sentinelPendingScriptsCommand(client *c) {
900 901 902
    listNode *ln;
    listIter li;

A
antirez 已提交
903
    addReplyArrayLen(c,listLength(sentinel.scripts_queue));
904 905 906 907 908
    listRewind(sentinel.scripts_queue,&li);
    while ((ln = listNext(&li)) != NULL) {
        sentinelScriptJob *sj = ln->value;
        int j = 0;

A
antirez 已提交
909
        addReplyMapLen(c,5);
910 911 912

        addReplyBulkCString(c,"argv");
        while (sj->argv[j]) j++;
A
antirez 已提交
913
        addReplyArrayLen(c,j);
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935
        j = 0;
        while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);

        addReplyBulkCString(c,"flags");
        addReplyBulkCString(c,
            (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");

        addReplyBulkCString(c,"pid");
        addReplyBulkLongLong(c,sj->pid);

        if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
            addReplyBulkCString(c,"run-time");
            addReplyBulkLongLong(c,mstime() - sj->start_time);
        } else {
            mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
            if (delay < 0) delay = 0;
            addReplyBulkCString(c,"run-delay");
            addReplyBulkLongLong(c,delay);
        }

        addReplyBulkCString(c,"retry-num");
        addReplyBulkLongLong(c,sj->retry_num);
936 937 938
    }
}

939 940 941 942 943
/* This function calls, if any, the client reconfiguration script with the
 * following parameters:
 *
 * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
 *
944
 * It is called every time a failover is performed.
945
 *
946
 * <state> is currently always "failover".
947 948 949
 * <role> is either "leader" or "observer".
 *
 * from/to fields are respectively master -> promoted slave addresses for
950
 * "start" and "end". */
951 952 953 954 955 956 957 958 959
void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
    char fromport[32], toport[32];

    if (master->client_reconfig_script == NULL) return;
    ll2string(fromport,sizeof(fromport),from->port);
    ll2string(toport,sizeof(toport),to->port);
    sentinelScheduleScriptExecution(master->client_reconfig_script,
        master->name,
        (role == SENTINEL_LEADER) ? "leader" : "observer",
960
        state, from->ip, fromport, to->ip, toport, NULL);
961 962
}

A
antirez 已提交
963 964 965 966 967 968 969 970 971 972 973 974 975
/* =============================== instanceLink ============================= */

/* Create a not yet connected link object. */
instanceLink *createInstanceLink(void) {
    instanceLink *link = zmalloc(sizeof(*link));

    link->refcount = 1;
    link->disconnected = 1;
    link->pending_commands = 0;
    link->cc = NULL;
    link->pc = NULL;
    link->cc_conn_time = 0;
    link->pc_conn_time = 0;
976
    link->last_reconn_time = 0;
A
antirez 已提交
977
    link->pc_last_activity = 0;
978
    /* We set the act_ping_time to "now" even if we actually don't have yet
A
antirez 已提交
979 980 981
     * a connection with the node, nor we sent a ping.
     * This is useful to detect a timeout in case we'll not be able to connect
     * with the node at all. */
982 983
    link->act_ping_time = mstime();
    link->last_ping_time = 0;
A
antirez 已提交
984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012
    link->last_avail_time = mstime();
    link->last_pong_time = mstime();
    return link;
}

/* Disconnect an hiredis connection in the context of an instance link. */
void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
    if (c == NULL) return;

    if (link->cc == c) {
        link->cc = NULL;
        link->pending_commands = 0;
    }
    if (link->pc == c) link->pc = NULL;
    c->data = NULL;
    link->disconnected = 1;
    redisAsyncFree(c);
}

/* Decrement the refcount of a link object, if it drops to zero, actually
 * free it and return NULL. Otherwise don't do anything and return the pointer
 * to the object.
 *
 * If we are not going to free the link and ri is not NULL, we rebind all the
 * pending requests in link->cc (hiredis connection for commands) to a
 * callback that will just ignore them. This is useful to avoid processing
 * replies for an instance that no longer exists. */
instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri)
{
A
antirez 已提交
1013
    serverAssert(link->refcount > 0);
A
antirez 已提交
1014 1015
    link->refcount--;
    if (link->refcount != 0) {
1016
        if (ri && ri->link->cc) {
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
            /* This instance may have pending callbacks in the hiredis async
             * context, having as 'privdata' the instance that we are going to
             * free. Let's rewrite the callback list, directly exploiting
             * hiredis internal data structures, in order to bind them with
             * a callback that will ignore the reply at all. */
            redisCallback *cb;
            redisCallbackList *callbacks = &link->cc->replies;

            cb = callbacks->head;
            while(cb) {
                if (cb->privdata == ri) {
                    cb->fn = sentinelDiscardReplyCallback;
                    cb->privdata = NULL; /* Not strictly needed. */
                }
                cb = cb->next;
            }
A
antirez 已提交
1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
        }
        return link; /* Other active users. */
    }

    instanceLinkCloseConnection(link,link->cc);
    instanceLinkCloseConnection(link,link->pc);
    zfree(link);
    return NULL;
}

/* This function will attempt to share the instance link we already have
 * for the same Sentinel in the context of a different master, with the
 * instance we are passing as argument.
 *
 * This way multiple Sentinel objects that refer all to the same physical
 * Sentinel instance but in the context of different masters will use
 * a single connection, will send a single PING per second for failure
1050 1051
 * detection and so forth.
 *
1052 1053
 * Return C_OK if a matching Sentinel was found in the context of a
 * different master and sharing was performed. Otherwise C_ERR
1054 1055
 * is returned. */
int sentinelTryConnectionSharing(sentinelRedisInstance *ri) {
A
antirez 已提交
1056
    serverAssert(ri->flags & SRI_SENTINEL);
1057 1058 1059
    dictIterator *di;
    dictEntry *de;

1060 1061
    if (ri->runid == NULL) return C_ERR; /* No way to identify it. */
    if (ri->link->refcount > 1) return C_ERR; /* Already shared. */
A
antirez 已提交
1062

1063 1064 1065
    di = dictGetIterator(sentinel.masters);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *master = dictGetVal(de), *match;
1066
        /* We want to share with the same physical Sentinel referenced
1067 1068 1069 1070
         * in other masters, so skip our master. */
        if (master == ri->master) continue;
        match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
                                                       NULL,0,ri->runid);
1071
        if (match == NULL) continue; /* No match. */
1072 1073 1074 1075 1076 1077 1078
        if (match == ri) continue; /* Should never happen but... safer. */

        /* We identified a matching Sentinel, great! Let's free our link
         * and use the one of the matching Sentinel. */
        releaseInstanceLink(ri->link,NULL);
        ri->link = match->link;
        match->link->refcount++;
1079
        dictReleaseIterator(di);
1080
        return C_OK;
1081 1082
    }
    dictReleaseIterator(di);
1083
    return C_ERR;
A
antirez 已提交
1084 1085
}

1086 1087 1088 1089 1090 1091 1092
/* When we detect a Sentinel to switch address (reporting a different IP/port
 * pair in Hello messages), let's update all the matching Sentinels in the
 * context of other masters as well and disconnect the links, so that everybody
 * will be updated.
 *
 * Return the number of updated Sentinel addresses. */
int sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance *ri) {
A
antirez 已提交
1093
    serverAssert(ri->flags & SRI_SENTINEL);
1094 1095 1096 1097 1098 1099 1100 1101 1102
    dictIterator *di;
    dictEntry *de;
    int reconfigured = 0;

    di = dictGetIterator(sentinel.masters);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *master = dictGetVal(de), *match;
        match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
                                                       NULL,0,ri->runid);
1103 1104 1105 1106 1107 1108
        /* If there is no match, this master does not know about this
         * Sentinel, try with the next one. */
        if (match == NULL) continue;

        /* Disconnect the old links if connected. */
        if (match->link->cc != NULL)
1109
            instanceLinkCloseConnection(match->link,match->link->cc);
1110
        if (match->link->pc != NULL)
1111
            instanceLinkCloseConnection(match->link,match->link->pc);
1112

1113
        if (match == ri) continue; /* Address already updated for it. */
1114

1115 1116 1117 1118 1119 1120 1121 1122
        /* Update the address of the matching Sentinel by copying the address
         * of the Sentinel object that received the address update. */
        releaseSentinelAddr(match->addr);
        match->addr = dupSentinelAddr(ri->addr);
        reconfigured++;
    }
    dictReleaseIterator(di);
    if (reconfigured)
A
antirez 已提交
1123
        sentinelEvent(LL_NOTICE,"+sentinel-address-update", ri,
1124 1125 1126 1127
                    "%@ %d additional matching instances", reconfigured);
    return reconfigured;
}

A
antirez 已提交
1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
/* This function is called when an hiredis connection reported an error.
 * We set it to NULL and mark the link as disconnected so that it will be
 * reconnected again.
 *
 * Note: we don't free the hiredis context as hiredis will do it for us
 * for async connections. */
void instanceLinkConnectionError(const redisAsyncContext *c) {
    instanceLink *link = c->data;
    int pubsub;

    if (!link) return;

    pubsub = (link->pc == c);
    if (pubsub)
        link->pc = NULL;
    else
        link->cc = NULL;
    link->disconnected = 1;
}

/* Hiredis connection established / disconnected callbacks. We need them
 * just to cleanup our link state. */
void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
1151
    if (status != C_OK) instanceLinkConnectionError(c);
A
antirez 已提交
1152 1153 1154
}

void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
A
antirez 已提交
1155
    UNUSED(status);
A
antirez 已提交
1156 1157 1158
    instanceLinkConnectionError(c);
}

1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179
/* ========================== sentinelRedisInstance ========================= */

/* Create a redis instance, the following fields must be populated by the
 * caller if needed:
 * runid: set to NULL but will be populated once INFO output is received.
 * info_refresh: is set to 0 to mean that we never received INFO so far.
 *
 * If SRI_MASTER is set into initial flags the instance is added to
 * sentinel.masters table.
 *
 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
 * instance is added into master->slaves or master->sentinels table.
 *
 * If the instance is a slave or sentinel, the name parameter is ignored and
 * is created automatically as hostname:port.
 *
 * The function fails if hostname can't be resolved or port is out of range.
 * When this happens NULL is returned and errno is set accordingly to the
 * createSentinelAddr() function.
 *
 * The function may also fail and return NULL with errno set to EBUSY if
1180 1181
 * a master with the same name, a slave with the same address, or a sentinel
 * with the same ID already exists. */
A
antirez 已提交
1182

1183 1184 1185
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
    sentinelRedisInstance *ri;
    sentinelAddr *addr;
1186
    dict *table = NULL;
A
antirez 已提交
1187
    char slavename[NET_PEER_ID_LEN], *sdsname;
1188

A
antirez 已提交
1189 1190
    serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
    serverAssert((flags & SRI_MASTER) || master != NULL);
1191 1192 1193 1194 1195

    /* Check address validity. */
    addr = createSentinelAddr(hostname,port);
    if (addr == NULL) return NULL;

1196 1197
    /* For slaves use ip:port as name. */
    if (flags & SRI_SLAVE) {
1198
        anetFormatAddr(slavename, sizeof(slavename), hostname, port);
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210
        name = slavename;
    }

    /* Make sure the entry is not duplicated. This may happen when the same
     * name for a master is used multiple times inside the configuration or
     * if we try to add multiple times a slave or sentinel with same ip/port
     * to a master. */
    if (flags & SRI_MASTER) table = sentinel.masters;
    else if (flags & SRI_SLAVE) table = master->slaves;
    else if (flags & SRI_SENTINEL) table = master->sentinels;
    sdsname = sdsnew(name);
    if (dictFind(table,sdsname)) {
C
clark.kang 已提交
1211
        releaseSentinelAddr(addr);
1212 1213 1214 1215 1216 1217 1218 1219 1220
        sdsfree(sdsname);
        errno = EBUSY;
        return NULL;
    }

    /* Create the instance object. */
    ri = zmalloc(sizeof(*ri));
    /* Note that all the instances are started in the disconnected state,
     * the event loop will take care of connecting them. */
A
antirez 已提交
1221
    ri->flags = flags;
1222 1223
    ri->name = sdsname;
    ri->runid = NULL;
A
antirez 已提交
1224
    ri->config_epoch = 0;
1225
    ri->addr = addr;
A
antirez 已提交
1226
    ri->link = createInstanceLink();
1227 1228 1229 1230 1231 1232
    ri->last_pub_time = mstime();
    ri->last_hello_time = mstime();
    ri->last_master_down_reply_time = mstime();
    ri->s_down_since_time = 0;
    ri->o_down_since_time = 0;
    ri->down_after_period = master ? master->down_after_period :
1233
                            SENTINEL_DEFAULT_DOWN_AFTER;
1234
    ri->master_link_down_time = 0;
A
antirez 已提交
1235
    ri->auth_pass = NULL;
1236
    ri->auth_user = NULL;
1237 1238 1239 1240 1241
    ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
    ri->slave_reconf_sent_time = 0;
    ri->slave_master_host = NULL;
    ri->slave_master_port = 0;
    ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
1242
    ri->slave_repl_offset = 0;
1243 1244 1245 1246 1247 1248
    ri->sentinels = dictCreate(&instancesDictType,NULL);
    ri->quorum = quorum;
    ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
    ri->master = master;
    ri->slaves = dictCreate(&instancesDictType,NULL);
    ri->info_refresh = 0;
1249
    ri->renamed_commands = dictCreate(&renamedCommandsDictType,NULL);
1250 1251 1252

    /* Failover state. */
    ri->leader = NULL;
1253
    ri->leader_epoch = 0;
1254
    ri->failover_epoch = 0;
1255 1256 1257 1258
    ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
    ri->failover_state_change_time = 0;
    ri->failover_start_time = 0;
    ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
1259
    ri->failover_delay_logged = 0;
1260
    ri->promoted_slave = NULL;
1261
    ri->notification_script = NULL;
1262
    ri->client_reconfig_script = NULL;
1263
    ri->info = NULL;
1264

1265 1266 1267
    /* Role */
    ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE);
    ri->role_reported_time = mstime();
1268
    ri->slave_conf_change_time = mstime();
1269

1270 1271 1272 1273 1274 1275
    /* Add into the right table. */
    dictAdd(table, ri->name, ri);
    return ri;
}

/* Release this instance and all its slaves, sentinels, hiredis connections.
1276
 * This function does not take care of unlinking the instance from the main
1277 1278 1279 1280 1281 1282 1283
 * masters table (if it is a master) or from its master sentinels/slaves table
 * if it is a slave or sentinel. */
void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
    /* Release all its slaves or sentinels if any. */
    dictRelease(ri->sentinels);
    dictRelease(ri->slaves);

A
antirez 已提交
1284 1285
    /* Disconnect the instance. */
    releaseInstanceLink(ri->link,ri);
1286 1287 1288 1289

    /* Free other resources. */
    sdsfree(ri->name);
    sdsfree(ri->runid);
1290
    sdsfree(ri->notification_script);
1291 1292 1293
    sdsfree(ri->client_reconfig_script);
    sdsfree(ri->slave_master_host);
    sdsfree(ri->leader);
A
antirez 已提交
1294
    sdsfree(ri->auth_pass);
1295
    sdsfree(ri->auth_user);
1296
    sdsfree(ri->info);
1297
    releaseSentinelAddr(ri->addr);
1298
    dictRelease(ri->renamed_commands);
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312

    /* Clear state into the master if needed. */
    if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
        ri->master->promoted_slave = NULL;

    zfree(ri);
}

/* Lookup a slave in a master Redis instance, by ip and port. */
sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
                sentinelRedisInstance *ri, char *ip, int port)
{
    sds key;
    sentinelRedisInstance *slave;
A
antirez 已提交
1313
    char buf[NET_PEER_ID_LEN];
1314

A
antirez 已提交
1315
    serverAssert(ri->flags & SRI_MASTER);
A
antirez 已提交
1316 1317
    anetFormatAddr(buf,sizeof(buf),ip,port);
    key = sdsnew(buf);
1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330
    slave = dictFetchValue(ri->slaves,key);
    sdsfree(key);
    return slave;
}

/* Return the name of the type of the instance as a string. */
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
    if (ri->flags & SRI_MASTER) return "master";
    else if (ri->flags & SRI_SLAVE) return "slave";
    else if (ri->flags & SRI_SENTINEL) return "sentinel";
    else return "unknown";
}

1331 1332
/* This function remove the Sentinel with the specified ID from the
 * specified master.
1333
 *
1334
 * If "runid" is NULL the function returns ASAP.
1335
 *
1336 1337 1338
 * This function is useful because on Sentinels address switch, we want to
 * remove our old entry and add a new one for the same ID but with the new
 * address.
1339
 *
1340 1341 1342
 * The function returns 1 if the matching Sentinel was removed, otherwise
 * 0 if there was no Sentinel with this ID. */
int removeMatchingSentinelFromMaster(sentinelRedisInstance *master, char *runid) {
1343 1344 1345 1346
    dictIterator *di;
    dictEntry *de;
    int removed = 0;

1347 1348
    if (runid == NULL) return 0;

1349 1350 1351 1352
    di = dictGetSafeIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

1353
        if (ri->runid && strcmp(ri->runid,runid) == 0) {
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372
            dictDelete(master->sentinels,ri->name);
            removed++;
        }
    }
    dictReleaseIterator(di);
    return removed;
}

/* Search an instance with the same runid, ip and port into a dictionary
 * of instances. Return NULL if not found, otherwise return the instance
 * pointer.
 *
 * runid or ip can be NULL. In such a case the search is performed only
 * by the non-NULL field. */
sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
    dictIterator *di;
    dictEntry *de;
    sentinelRedisInstance *instance = NULL;

A
antirez 已提交
1373
    serverAssert(ip || runid);   /* User must pass at least one search param. */
1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

        if (runid && !ri->runid) continue;
        if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
            (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
                            ri->addr->port == port)))
        {
            instance = ri;
            break;
        }
    }
    dictReleaseIterator(di);
    return instance;
}

1391
/* Master lookup by name */
1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431
sentinelRedisInstance *sentinelGetMasterByName(char *name) {
    sentinelRedisInstance *ri;
    sds sdsname = sdsnew(name);

    ri = dictFetchValue(sentinel.masters,sdsname);
    sdsfree(sdsname);
    return ri;
}

/* Add the specified flags to all the instances in the specified dictionary. */
void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        ri->flags |= flags;
    }
    dictReleaseIterator(di);
}

/* Remove the specified flags to all the instances in the specified
 * dictionary. */
void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        ri->flags &= ~flags;
    }
    dictReleaseIterator(di);
}

/* Reset the state of a monitored master:
 * 1) Remove all slaves.
 * 2) Remove all sentinels.
 * 3) Remove most of the flags resulting from runtime operations.
1432 1433 1434
 * 4) Reset timers to their default value. For example after a reset it will be
 *    possible to failover again the same master ASAP, without waiting the
 *    failover timeout delay.
1435 1436 1437
 * 5) In the process of doing this undo the failover if in progress.
 * 6) Disconnect the connections with the master (will reconnect automatically).
 */
1438 1439

#define SENTINEL_RESET_NO_SENTINELS (1<<0)
1440
void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
A
antirez 已提交
1441
    serverAssert(ri->flags & SRI_MASTER);
1442 1443
    dictRelease(ri->slaves);
    ri->slaves = dictCreate(&instancesDictType,NULL);
1444 1445 1446 1447
    if (!(flags & SENTINEL_RESET_NO_SENTINELS)) {
        dictRelease(ri->sentinels);
        ri->sentinels = dictCreate(&instancesDictType,NULL);
    }
A
antirez 已提交
1448 1449 1450
    instanceLinkCloseConnection(ri->link,ri->link->cc);
    instanceLinkCloseConnection(ri->link,ri->link->pc);
    ri->flags &= SRI_MASTER;
1451 1452 1453 1454 1455 1456
    if (ri->leader) {
        sdsfree(ri->leader);
        ri->leader = NULL;
    }
    ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
    ri->failover_state_change_time = 0;
1457
    ri->failover_start_time = 0; /* We can failover again ASAP. */
1458
    ri->promoted_slave = NULL;
1459 1460 1461 1462
    sdsfree(ri->runid);
    sdsfree(ri->slave_master_host);
    ri->runid = NULL;
    ri->slave_master_host = NULL;
1463 1464
    ri->link->act_ping_time = mstime();
    ri->link->last_ping_time = 0;
A
antirez 已提交
1465 1466
    ri->link->last_avail_time = mstime();
    ri->link->last_pong_time = mstime();
1467 1468
    ri->role_reported_time = mstime();
    ri->role_reported = SRI_MASTER;
1469
    if (flags & SENTINEL_GENERATE_EVENT)
A
antirez 已提交
1470
        sentinelEvent(LL_WARNING,"+reset-master",ri,"%@");
1471 1472 1473 1474
}

/* Call sentinelResetMaster() on every master with a name matching the specified
 * pattern. */
1475
int sentinelResetMastersByPattern(char *pattern, int flags) {
1476 1477 1478 1479 1480 1481 1482 1483 1484 1485
    dictIterator *di;
    dictEntry *de;
    int reset = 0;

    di = dictGetIterator(sentinel.masters);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

        if (ri->name) {
            if (stringmatch(pattern,ri->name,0)) {
1486
                sentinelResetMaster(ri,flags);
1487 1488 1489 1490 1491 1492 1493 1494
                reset++;
            }
        }
    }
    dictReleaseIterator(di);
    return reset;
}

1495 1496 1497
/* Reset the specified master with sentinelResetMaster(), and also change
 * the ip:port address, but take the name of the instance unmodified.
 *
1498
 * This is used to handle the +switch-master event.
1499
 *
1500 1501
 * The function returns C_ERR if the address can't be resolved for some
 * reason. Otherwise C_OK is returned.  */
1502 1503
int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
    sentinelAddr *oldaddr, *newaddr;
1504 1505 1506 1507
    sentinelAddr **slaves = NULL;
    int numslaves = 0, j;
    dictIterator *di;
    dictEntry *de;
1508 1509

    newaddr = createSentinelAddr(ip,port);
1510
    if (newaddr == NULL) return C_ERR;
1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523

    /* Make a list of slaves to add back after the reset.
     * Don't include the one having the address we are switching to. */
    di = dictGetIterator(master->slaves);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);

        if (sentinelAddrIsEqual(slave->addr,newaddr)) continue;
        slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
        slaves[numslaves++] = createSentinelAddr(slave->addr->ip,
                                                 slave->addr->port);
    }
    dictReleaseIterator(di);
1524

1525 1526 1527 1528 1529 1530 1531 1532 1533 1534
    /* If we are switching to a different address, include the old address
     * as a slave as well, so that we'll be able to sense / reconfigure
     * the old master. */
    if (!sentinelAddrIsEqual(newaddr,master->addr)) {
        slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
        slaves[numslaves++] = createSentinelAddr(master->addr->ip,
                                                 master->addr->port);
    }

    /* Reset and switch address. */
1535
    sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
1536 1537
    oldaddr = master->addr;
    master->addr = newaddr;
1538 1539 1540
    master->o_down_since_time = 0;
    master->s_down_since_time = 0;

1541 1542 1543 1544 1545 1546 1547
    /* Add slaves back. */
    for (j = 0; j < numslaves; j++) {
        sentinelRedisInstance *slave;

        slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
                    slaves[j]->port, master->quorum, master);
        releaseSentinelAddr(slaves[j]);
A
antirez 已提交
1548
        if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
1549 1550 1551
    }
    zfree(slaves);

1552 1553 1554
    /* Release the old address at the end so we are safe even if the function
     * gets the master->addr->ip and master->addr->port as arguments. */
    releaseSentinelAddr(oldaddr);
1555
    sentinelFlushConfig();
1556
    return C_OK;
1557 1558
}

1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569
/* Return non-zero if there was no SDOWN or ODOWN error associated to this
 * instance in the latest 'ms' milliseconds. */
int sentinelRedisInstanceNoDownFor(sentinelRedisInstance *ri, mstime_t ms) {
    mstime_t most_recent;

    most_recent = ri->s_down_since_time;
    if (ri->o_down_since_time > most_recent)
        most_recent = ri->o_down_since_time;
    return most_recent == 0 || (mstime() - most_recent) > ms;
}

1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587
/* Return the current master address, that is, its address or the address
 * of the promoted slave if already operational. */
sentinelAddr *sentinelGetCurrentMasterAddress(sentinelRedisInstance *master) {
    /* If we are failing over the master, and the state is already
     * SENTINEL_FAILOVER_STATE_RECONF_SLAVES or greater, it means that we
     * already have the new configuration epoch in the master, and the
     * slave acknowledged the configuration switch. Advertise the new
     * address. */
    if ((master->flags & SRI_FAILOVER_IN_PROGRESS) &&
        master->promoted_slave &&
        master->failover_state >= SENTINEL_FAILOVER_STATE_RECONF_SLAVES)
    {
        return master->promoted_slave->addr;
    } else {
        return master->addr;
    }
}

1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605
/* This function sets the down_after_period field value in 'master' to all
 * the slaves and sentinel instances connected to this master. */
void sentinelPropagateDownAfterPeriod(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    int j;
    dict *d[] = {master->slaves, master->sentinels, NULL};

    for (j = 0; d[j]; j++) {
        di = dictGetIterator(d[j]);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);
            ri->down_after_period = master->down_after_period;
        }
        dictReleaseIterator(di);
    }
}

1606 1607 1608 1609 1610 1611 1612
char *sentinelGetInstanceTypeString(sentinelRedisInstance *ri) {
    if (ri->flags & SRI_MASTER) return "master";
    else if (ri->flags & SRI_SLAVE) return "slave";
    else if (ri->flags & SRI_SENTINEL) return "sentinel";
    else return "unknown";
}

1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627
/* This function is used in order to send commands to Redis instances: the
 * commands we send from Sentinel may be renamed, a common case is a master
 * with CONFIG and SLAVEOF commands renamed for security concerns. In that
 * case we check the ri->renamed_command table (or if the instance is a slave,
 * we check the one of the master), and map the command that we should send
 * to the set of renamed commads. However, if the command was not renamed,
 * we just return "command" itself. */
char *sentinelInstanceMapCommand(sentinelRedisInstance *ri, char *command) {
    sds sc = sdsnew(command);
    if (ri->master) ri = ri->master;
    char *retval = dictFetchValue(ri->renamed_commands, sc);
    sdsfree(sc);
    return retval ? retval : command;
}

1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652
/* ============================ Config handling ============================= */
char *sentinelHandleConfiguration(char **argv, int argc) {
    sentinelRedisInstance *ri;

    if (!strcasecmp(argv[0],"monitor") && argc == 5) {
        /* monitor <name> <host> <port> <quorum> */
        int quorum = atoi(argv[4]);

        if (quorum <= 0) return "Quorum must be 1 or greater.";
        if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
                                        atoi(argv[3]),quorum,NULL) == NULL)
        {
            switch(errno) {
            case EBUSY: return "Duplicated master name.";
            case ENOENT: return "Can't resolve master instance hostname.";
            case EINVAL: return "Invalid port number";
            }
        }
    } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
        /* down-after-milliseconds <name> <milliseconds> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        ri->down_after_period = atoi(argv[2]);
        if (ri->down_after_period <= 0)
            return "negative or zero time parameter.";
1653
        sentinelPropagateDownAfterPeriod(ri);
1654 1655 1656 1657 1658 1659 1660
    } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
        /* failover-timeout <name> <milliseconds> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        ri->failover_timeout = atoi(argv[2]);
        if (ri->failover_timeout <= 0)
            return "negative or zero time parameter.";
1661
    } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
1662 1663 1664 1665
        /* parallel-syncs <name> <milliseconds> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        ri->parallel_syncs = atoi(argv[2]);
1666
    } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
1667 1668 1669 1670 1671 1672
        /* notification-script <name> <path> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        if (access(argv[2],X_OK) == -1)
            return "Notification script seems non existing or non executable.";
        ri->notification_script = sdsnew(argv[2]);
1673
    } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
1674 1675 1676 1677 1678 1679 1680
        /* client-reconfig-script <name> <path> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        if (access(argv[2],X_OK) == -1)
            return "Client reconfiguration script seems non existing or "
                   "non executable.";
        ri->client_reconfig_script = sdsnew(argv[2]);
1681
    } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
A
antirez 已提交
1682 1683 1684 1685
        /* auth-pass <name> <password> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        ri->auth_pass = sdsnew(argv[2]);
1686 1687 1688 1689 1690
    } else if (!strcasecmp(argv[0],"auth-user") && argc == 3) {
        /* auth-user <name> <username> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        ri->auth_user = sdsnew(argv[2]);
1691 1692
    } else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) {
        /* current-epoch <epoch> */
1693
        unsigned long long current_epoch = strtoull(argv[1],NULL,10);
1694 1695
        if (current_epoch > sentinel.current_epoch)
            sentinel.current_epoch = current_epoch;
1696
    } else if (!strcasecmp(argv[0],"myid") && argc == 2) {
A
antirez 已提交
1697
        if (strlen(argv[1]) != CONFIG_RUN_ID_SIZE)
1698
            return "Malformed Sentinel id in myid option.";
A
antirez 已提交
1699
        memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE);
1700 1701 1702 1703 1704
    } else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) {
        /* config-epoch <name> <epoch> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        ri->config_epoch = strtoull(argv[2],NULL,10);
1705 1706 1707
        /* The following update of current_epoch is not really useful as
         * now the current epoch is persisted on the config file, but
         * we leave this check here for redundancy. */
1708 1709
        if (ri->config_epoch > sentinel.current_epoch)
            sentinel.current_epoch = ri->config_epoch;
1710 1711 1712 1713 1714
    } else if (!strcasecmp(argv[0],"leader-epoch") && argc == 3) {
        /* leader-epoch <name> <epoch> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        ri->leader_epoch = strtoull(argv[2],NULL,10);
1715 1716 1717
    } else if ((!strcasecmp(argv[0],"known-slave") ||
                !strcasecmp(argv[0],"known-replica")) && argc == 4)
    {
1718 1719
        sentinelRedisInstance *slave;

1720
        /* known-replica <name> <ip> <port> */
1721 1722 1723 1724 1725
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
                    atoi(argv[3]), ri->quorum, ri)) == NULL)
        {
1726
            return "Wrong hostname or port for replica.";
1727
        }
1728 1729
    } else if (!strcasecmp(argv[0],"known-sentinel") &&
               (argc == 4 || argc == 5)) {
1730 1731
        sentinelRedisInstance *si;

1732 1733 1734 1735 1736 1737 1738 1739 1740
        if (argc == 5) { /* Ignore the old form without runid. */
            /* known-sentinel <name> <ip> <port> [runid] */
            ri = sentinelGetMasterByName(argv[1]);
            if (!ri) return "No such master with specified name.";
            if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
                        atoi(argv[3]), ri->quorum, ri)) == NULL)
            {
                return "Wrong hostname or port for sentinel.";
            }
A
antirez 已提交
1741
            si->runid = sdsnew(argv[4]);
1742
            sentinelTryConnectionSharing(si);
A
antirez 已提交
1743
        }
1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754
    } else if (!strcasecmp(argv[0],"rename-command") && argc == 4) {
        /* rename-command <name> <command> <renamed-command> */
        ri = sentinelGetMasterByName(argv[1]);
        if (!ri) return "No such master with specified name.";
        sds oldcmd = sdsnew(argv[2]);
        sds newcmd = sdsnew(argv[3]);
        if (dictAdd(ri->renamed_commands,oldcmd,newcmd) != DICT_OK) {
            sdsfree(oldcmd);
            sdsfree(newcmd);
            return "Same command renamed multiple times with rename-command.";
        }
1755 1756
    } else if (!strcasecmp(argv[0],"announce-ip") && argc == 2) {
        /* announce-ip <ip-address> */
1757 1758
        if (strlen(argv[1]))
            sentinel.announce_ip = sdsnew(argv[1]);
1759 1760 1761
    } else if (!strcasecmp(argv[0],"announce-port") && argc == 2) {
        /* announce-port <port> */
        sentinel.announce_port = atoi(argv[1]);
1762 1763 1764 1765 1766 1767
    } else if (!strcasecmp(argv[0],"deny-scripts-reconfig") && argc == 2) {
        /* deny-scripts-reconfig <yes|no> */
        if ((sentinel.deny_scripts_reconfig = yesnotoi(argv[1])) == -1) {
            return "Please specify yes or no for the "
                   "deny-scripts-reconfig options.";
        }
1768 1769 1770 1771 1772 1773
    } else {
        return "Unrecognized sentinel configuration statement.";
    }
    return NULL;
}

1774 1775 1776 1777 1778 1779 1780 1781
/* Implements CONFIG REWRITE for "sentinel" option.
 * This is used not just to rewrite the configuration given by the user
 * (the configured masters) but also in order to retain the state of
 * Sentinel across restarts: config epoch of masters, associated slaves
 * and sentinel instances, and so forth. */
void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
    dictIterator *di, *di2;
    dictEntry *de;
1782
    sds line;
1783

1784 1785 1786 1787
    /* sentinel unique ID. */
    line = sdscatprintf(sdsempty(), "sentinel myid %s", sentinel.myid);
    rewriteConfigRewriteLine(state,"sentinel",line,1);

1788 1789 1790 1791 1792 1793
    /* sentinel deny-scripts-reconfig. */
    line = sdscatprintf(sdsempty(), "sentinel deny-scripts-reconfig %s",
        sentinel.deny_scripts_reconfig ? "yes" : "no");
    rewriteConfigRewriteLine(state,"sentinel",line,
        sentinel.deny_scripts_reconfig != SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG);

1794 1795 1796 1797
    /* For every master emit a "sentinel monitor" config entry. */
    di = dictGetIterator(sentinel.masters);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *master, *ri;
1798
        sentinelAddr *master_addr;
1799 1800 1801

        /* sentinel monitor */
        master = dictGetVal(de);
1802
        master_addr = sentinelGetCurrentMasterAddress(master);
1803
        line = sdscatprintf(sdsempty(),"sentinel monitor %s %s %d %d",
1804
            master->name, master_addr->ip, master_addr->port,
1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847
            master->quorum);
        rewriteConfigRewriteLine(state,"sentinel",line,1);

        /* sentinel down-after-milliseconds */
        if (master->down_after_period != SENTINEL_DEFAULT_DOWN_AFTER) {
            line = sdscatprintf(sdsempty(),
                "sentinel down-after-milliseconds %s %ld",
                master->name, (long) master->down_after_period);
            rewriteConfigRewriteLine(state,"sentinel",line,1);
        }

        /* sentinel failover-timeout */
        if (master->failover_timeout != SENTINEL_DEFAULT_FAILOVER_TIMEOUT) {
            line = sdscatprintf(sdsempty(),
                "sentinel failover-timeout %s %ld",
                master->name, (long) master->failover_timeout);
            rewriteConfigRewriteLine(state,"sentinel",line,1);
        }

        /* sentinel parallel-syncs */
        if (master->parallel_syncs != SENTINEL_DEFAULT_PARALLEL_SYNCS) {
            line = sdscatprintf(sdsempty(),
                "sentinel parallel-syncs %s %d",
                master->name, master->parallel_syncs);
            rewriteConfigRewriteLine(state,"sentinel",line,1);
        }

        /* sentinel notification-script */
        if (master->notification_script) {
            line = sdscatprintf(sdsempty(),
                "sentinel notification-script %s %s",
                master->name, master->notification_script);
            rewriteConfigRewriteLine(state,"sentinel",line,1);
        }

        /* sentinel client-reconfig-script */
        if (master->client_reconfig_script) {
            line = sdscatprintf(sdsempty(),
                "sentinel client-reconfig-script %s %s",
                master->name, master->client_reconfig_script);
            rewriteConfigRewriteLine(state,"sentinel",line,1);
        }

1848
        /* sentinel auth-pass & auth-user */
1849 1850 1851 1852 1853 1854 1855
        if (master->auth_pass) {
            line = sdscatprintf(sdsempty(),
                "sentinel auth-pass %s %s",
                master->name, master->auth_pass);
            rewriteConfigRewriteLine(state,"sentinel",line,1);
        }

1856 1857 1858 1859 1860 1861 1862
        if (master->auth_user) {
            line = sdscatprintf(sdsempty(),
                "sentinel auth-user %s %s",
                master->name, master->auth_user);
            rewriteConfigRewriteLine(state,"sentinel",line,1);
        }

1863 1864 1865 1866 1867 1868
        /* sentinel config-epoch */
        line = sdscatprintf(sdsempty(),
            "sentinel config-epoch %s %llu",
            master->name, (unsigned long long) master->config_epoch);
        rewriteConfigRewriteLine(state,"sentinel",line,1);

1869 1870 1871 1872 1873 1874
        /* sentinel leader-epoch */
        line = sdscatprintf(sdsempty(),
            "sentinel leader-epoch %s %llu",
            master->name, (unsigned long long) master->leader_epoch);
        rewriteConfigRewriteLine(state,"sentinel",line,1);

1875 1876
        /* sentinel known-slave */
        di2 = dictGetIterator(master->slaves);
1877
        while((de = dictNext(di2)) != NULL) {
1878 1879
            sentinelAddr *slave_addr;

1880
            ri = dictGetVal(de);
1881 1882 1883 1884 1885 1886 1887 1888 1889
            slave_addr = ri->addr;

            /* If master_addr (obtained using sentinelGetCurrentMasterAddress()
             * so it may be the address of the promoted slave) is equal to this
             * slave's address, a failover is in progress and the slave was
             * already successfully promoted. So as the address of this slave
             * we use the old master address instead. */
            if (sentinelAddrIsEqual(slave_addr,master_addr))
                slave_addr = master->addr;
1890
            line = sdscatprintf(sdsempty(),
1891
                "sentinel known-replica %s %s %d",
1892
                master->name, slave_addr->ip, slave_addr->port);
1893 1894 1895 1896 1897 1898
            rewriteConfigRewriteLine(state,"sentinel",line,1);
        }
        dictReleaseIterator(di2);

        /* sentinel known-sentinel */
        di2 = dictGetIterator(master->sentinels);
1899
        while((de = dictNext(di2)) != NULL) {
1900
            ri = dictGetVal(de);
1901
            if (ri->runid == NULL) continue;
1902
            line = sdscatprintf(sdsempty(),
1903 1904
                "sentinel known-sentinel %s %s %d %s",
                master->name, ri->addr->ip, ri->addr->port, ri->runid);
1905
            rewriteConfigRewriteLine(state,"sentinel",line,1);
1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917
        }
        dictReleaseIterator(di2);

        /* sentinel rename-command */
        di2 = dictGetIterator(master->renamed_commands);
        while((de = dictNext(di2)) != NULL) {
            sds oldname = dictGetKey(de);
            sds newname = dictGetVal(de);
            line = sdscatprintf(sdsempty(),
                "sentinel rename-command %s %s %s",
                master->name, oldname, newname);
            rewriteConfigRewriteLine(state,"sentinel",line,1);
1918 1919 1920
        }
        dictReleaseIterator(di2);
    }
1921 1922 1923 1924 1925 1926

    /* sentinel current-epoch is a global state valid for all the masters. */
    line = sdscatprintf(sdsempty(),
        "sentinel current-epoch %llu", (unsigned long long) sentinel.current_epoch);
    rewriteConfigRewriteLine(state,"sentinel",line,1);

1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940
    /* sentinel announce-ip. */
    if (sentinel.announce_ip) {
        line = sdsnew("sentinel announce-ip ");
        line = sdscatrepr(line, sentinel.announce_ip, sdslen(sentinel.announce_ip));
        rewriteConfigRewriteLine(state,"sentinel",line,1);
    }

    /* sentinel announce-port. */
    if (sentinel.announce_port) {
        line = sdscatprintf(sdsempty(),"sentinel announce-port %d",
                            sentinel.announce_port);
        rewriteConfigRewriteLine(state,"sentinel",line,1);
    }

1941 1942 1943
    dictReleaseIterator(di);
}

1944 1945 1946 1947 1948 1949 1950 1951
/* This function uses the config rewriting Redis engine in order to persist
 * the state of the Sentinel in the current configuration file.
 *
 * Before returning the function calls fsync() against the generated
 * configuration file to make sure changes are committed to disk.
 *
 * On failure the function logs a warning on the Redis log. */
void sentinelFlushConfig(void) {
1952
    int fd = -1;
1953
    int saved_hz = server.hz;
1954
    int rewrite_status;
1955

1956
    server.hz = CONFIG_DEFAULT_HZ;
1957
    rewrite_status = rewriteConfig(server.configfile);
1958
    server.hz = saved_hz;
1959 1960 1961 1962 1963

    if (rewrite_status == -1) goto werr;
    if ((fd = open(server.configfile,O_RDONLY)) == -1) goto werr;
    if (fsync(fd) == -1) goto werr;
    if (close(fd) == EOF) goto werr;
1964
    return;
1965 1966 1967

werr:
    if (fd != -1) close(fd);
A
antirez 已提交
1968
    serverLog(LL_WARNING,"WARNING: Sentinel was not able to save the new configuration on disk!!!: %s", strerror(errno));
1969 1970
}

1971 1972
/* ====================== hiredis connection handling ======================= */

A
antirez 已提交
1973 1974 1975
/* Send the AUTH command with the specified master password if needed.
 * Note that for slaves the password set for the master is used.
 *
A
antirez 已提交
1976 1977 1978 1979 1980 1981
 * In case this Sentinel requires a password as well, via the "requirepass"
 * configuration directive, we assume we should use the local password in
 * order to authenticate when connecting with the other Sentinels as well.
 * So basically all the Sentinels share the same password and use it to
 * authenticate reciprocally.
 *
A
antirez 已提交
1982 1983 1984 1985
 * We don't check at all if the command was successfully transmitted
 * to the instance as if it fails Sentinel will detect the instance down,
 * will disconnect and reconnect the link and so forth. */
void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
A
antirez 已提交
1986
    char *auth_pass = NULL;
1987
    char *auth_user = NULL;
A
antirez 已提交
1988 1989 1990

    if (ri->flags & SRI_MASTER) {
        auth_pass = ri->auth_pass;
1991
        auth_user = ri->auth_user;
A
antirez 已提交
1992 1993
    } else if (ri->flags & SRI_SLAVE) {
        auth_pass = ri->master->auth_pass;
1994
        auth_user = ri->master->auth_user;
A
antirez 已提交
1995
    } else if (ri->flags & SRI_SENTINEL) {
1996
        auth_pass = server.requirepass;
1997
        auth_user = NULL;
A
antirez 已提交
1998
    }
A
antirez 已提交
1999

2000
    if (auth_pass && auth_user == NULL) {
2001 2002
        if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",
            sentinelInstanceMapCommand(ri,"AUTH"),
2003
            auth_pass) == C_OK) ri->link->pending_commands++;
2004 2005 2006 2007 2008 2009
    } else if (auth_pass && auth_user) {
        /* If we also have an username, use the ACL-style AUTH command
         * with two arguments, username and password. */
        if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s %s",
            sentinelInstanceMapCommand(ri,"AUTH"),
            auth_user, auth_pass) == C_OK) ri->link->pending_commands++;
2010
    }
A
antirez 已提交
2011 2012
}

2013 2014 2015 2016 2017 2018 2019 2020 2021
/* Use CLIENT SETNAME to name the connection in the Redis instance as
 * sentinel-<first_8_chars_of_runid>-<connection_type>
 * The connection type is "cmd" or "pubsub" as specified by 'type'.
 *
 * This makes it possible to list all the sentinel instances connected
 * to a Redis servewr with CLIENT LIST, grepping for a specific name format. */
void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) {
    char name[64];

2022
    snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
A
antirez 已提交
2023
    if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
2024 2025 2026
        "%s SETNAME %s",
        sentinelInstanceMapCommand(ri,"CLIENT"),
        name) == C_OK)
2027
    {
A
antirez 已提交
2028
        ri->link->pending_commands++;
2029 2030 2031
    }
}

2032 2033
static int instanceLinkNegotiateTLS(redisAsyncContext *context) {
#ifndef USE_OPENSSL
2034
    (void) context;
2035 2036 2037 2038 2039 2040 2041 2042 2043 2044
#else
    if (!redis_tls_ctx) return C_ERR;
    SSL *ssl = SSL_new(redis_tls_ctx);
    if (!ssl) return C_ERR;

    if (redisInitiateSSL(&context->c, ssl) == REDIS_ERR) return C_ERR;
#endif
    return C_OK;
}

A
antirez 已提交
2045 2046
/* Create the async connections for the instance link if the link
 * is disconnected. Note that link->disconnected is true even if just
2047 2048
 * one of the two links (commands and pub/sub) is missing. */
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
A
antirez 已提交
2049
    if (ri->link->disconnected == 0) return;
2050
    if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
A
antirez 已提交
2051
    instanceLink *link = ri->link;
2052 2053 2054 2055
    mstime_t now = mstime();

    if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
    ri->link->last_reconn_time = now;
2056 2057

    /* Commands connection. */
A
antirez 已提交
2058
    if (link->cc == NULL) {
A
antirez 已提交
2059
        link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
2060 2061 2062 2063 2064
        if (!link->cc->err && server.tls_replication &&
                (instanceLinkNegotiateTLS(link->cc) == C_ERR)) {
            sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #Failed to initialize TLS");
            instanceLinkCloseConnection(link,link->cc);
        } else if (link->cc->err) {
A
antirez 已提交
2065
            sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
A
antirez 已提交
2066 2067
                link->cc->errstr);
            instanceLinkCloseConnection(link,link->cc);
2068
        } else {
2069
            link->pending_commands = 0;
A
antirez 已提交
2070 2071 2072 2073 2074 2075 2076 2077 2078
            link->cc_conn_time = mstime();
            link->cc->data = link;
            redisAeAttach(server.el,link->cc);
            redisAsyncSetConnectCallback(link->cc,
                    sentinelLinkEstablishedCallback);
            redisAsyncSetDisconnectCallback(link->cc,
                    sentinelDisconnectCallback);
            sentinelSendAuthIfNeeded(ri,link->cc);
            sentinelSetClientName(ri,link->cc,"cmd");
2079 2080 2081

            /* Send a PING ASAP when reconnecting. */
            sentinelSendPing(ri);
2082 2083 2084
        }
    }
    /* Pub / Sub */
A
antirez 已提交
2085
    if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
A
antirez 已提交
2086
        link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
2087 2088 2089 2090
        if (!link->pc->err && server.tls_replication &&
                (instanceLinkNegotiateTLS(link->pc) == C_ERR)) {
            sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #Failed to initialize TLS");
        } else if (link->pc->err) {
A
antirez 已提交
2091
            sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
A
antirez 已提交
2092 2093
                link->pc->errstr);
            instanceLinkCloseConnection(link,link->pc);
2094 2095 2096
        } else {
            int retval;

A
antirez 已提交
2097 2098 2099 2100 2101 2102 2103 2104 2105
            link->pc_conn_time = mstime();
            link->pc->data = link;
            redisAeAttach(server.el,link->pc);
            redisAsyncSetConnectCallback(link->pc,
                    sentinelLinkEstablishedCallback);
            redisAsyncSetDisconnectCallback(link->pc,
                    sentinelDisconnectCallback);
            sentinelSendAuthIfNeeded(ri,link->pc);
            sentinelSetClientName(ri,link->pc,"pubsub");
2106
            /* Now we subscribe to the Sentinels "Hello" channel. */
A
antirez 已提交
2107
            retval = redisAsyncCommand(link->pc,
2108 2109 2110
                sentinelReceiveHelloMessages, ri, "%s %s",
                sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
                SENTINEL_HELLO_CHANNEL);
2111
            if (retval != C_OK) {
2112 2113
                /* If we can't subscribe, the Pub/Sub connection is useless
                 * and we can simply disconnect it and try again. */
A
antirez 已提交
2114
                instanceLinkCloseConnection(link,link->pc);
2115 2116 2117 2118
                return;
            }
        }
    }
A
antirez 已提交
2119
    /* Clear the disconnected status only if we have both the connections
2120
     * (or just the commands connection if this is a sentinel instance). */
A
antirez 已提交
2121 2122
    if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
        link->disconnected = 0;
2123 2124 2125 2126
}

/* ======================== Redis instances pinging  ======================== */

2127 2128 2129 2130
/* Return true if master looks "sane", that is:
 * 1) It is actually a master in the current configuration.
 * 2) It reports itself as a master.
 * 3) It is not SDOWN or ODOWN.
2131
 * 4) We obtained last INFO no more than two times the INFO period time ago. */
2132 2133 2134 2135 2136 2137 2138 2139
int sentinelMasterLooksSane(sentinelRedisInstance *master) {
    return
        master->flags & SRI_MASTER &&
        master->role_reported == SRI_MASTER &&
        (master->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0 &&
        (mstime() - master->info_refresh) < SENTINEL_INFO_PERIOD*2;
}

2140 2141 2142 2143 2144 2145
/* Process the INFO output from masters. */
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
    sds *lines;
    int numlines, j;
    int role = 0;

2146 2147 2148 2149
    /* cache full INFO output for instance */
    sdsfree(ri->info);
    ri->info = sdsnew(info);

2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164
    /* The following fields must be reset to a given value in the case they
     * are not found at all in the INFO output. */
    ri->master_link_down_time = 0;

    /* Process line by line. */
    lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
    for (j = 0; j < numlines; j++) {
        sentinelRedisInstance *slave;
        sds l = lines[j];

        /* run_id:<40 hex chars>*/
        if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
            if (ri->runid == NULL) {
                ri->runid = sdsnewlen(l+7,40);
            } else {
2165
                if (strncmp(ri->runid,l+7,40) != 0) {
A
antirez 已提交
2166
                    sentinelEvent(LL_NOTICE,"+reboot",ri,"%@");
2167 2168 2169
                    sdsfree(ri->runid);
                    ri->runid = sdsnewlen(l+7,40);
                }
2170 2171 2172
            }
        }

2173 2174
        /* old versions: slave0:<ip>,<port>,<state>
         * new versions: slave0:ip=127.0.0.1,port=9999,... */
2175 2176 2177 2178 2179 2180
        if ((ri->flags & SRI_MASTER) &&
            sdslen(l) >= 7 &&
            !memcmp(l,"slave",5) && isdigit(l[5]))
        {
            char *ip, *port, *end;

2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199
            if (strstr(l,"ip=") == NULL) {
                /* Old format. */
                ip = strchr(l,':'); if (!ip) continue;
                ip++; /* Now ip points to start of ip address. */
                port = strchr(ip,','); if (!port) continue;
                *port = '\0'; /* nul term for easy access. */
                port++; /* Now port points to start of port number. */
                end = strchr(port,','); if (!end) continue;
                *end = '\0'; /* nul term for easy access. */
            } else {
                /* New format. */
                ip = strstr(l,"ip="); if (!ip) continue;
                ip += 3; /* Now ip points to start of ip address. */
                port = strstr(l,"port="); if (!port) continue;
                port += 5; /* Now port points to start of port number. */
                /* Nul term both fields for easy access. */
                end = strchr(ip,','); if (end) *end = '\0';
                end = strchr(port,','); if (end) *end = '\0';
            }
2200 2201 2202 2203 2204

            /* Check if we already have this slave into our table,
             * otherwise add it. */
            if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
                if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
2205
                            atoi(port), ri->quorum, ri)) != NULL)
2206
                {
A
antirez 已提交
2207
                    sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
2208
                    sentinelFlushConfig();
2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226
                }
            }
        }

        /* master_link_down_since_seconds:<seconds> */
        if (sdslen(l) >= 32 &&
            !memcmp(l,"master_link_down_since_seconds",30))
        {
            ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
        }

        /* role:<role> */
        if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
        else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;

        if (role == SRI_SLAVE) {
            /* master_host:<host> */
            if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
2227 2228 2229 2230 2231 2232 2233
                if (ri->slave_master_host == NULL ||
                    strcasecmp(l+12,ri->slave_master_host))
                {
                    sdsfree(ri->slave_master_host);
                    ri->slave_master_host = sdsnew(l+12);
                    ri->slave_conf_change_time = mstime();
                }
2234 2235 2236
            }

            /* master_port:<port> */
2237 2238 2239 2240 2241 2242 2243 2244
            if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) {
                int slave_master_port = atoi(l+12);

                if (ri->slave_master_port != slave_master_port) {
                    ri->slave_master_port = slave_master_port;
                    ri->slave_conf_change_time = mstime();
                }
            }
2245

2246 2247 2248 2249 2250 2251 2252
            /* master_link_status:<status> */
            if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
                ri->slave_master_link_status =
                    (strcasecmp(l+19,"up") == 0) ?
                    SENTINEL_MASTER_LINK_STATUS_UP :
                    SENTINEL_MASTER_LINK_STATUS_DOWN;
            }
2253 2254 2255 2256

            /* slave_priority:<priority> */
            if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
                ri->slave_priority = atoi(l+15);
2257 2258 2259 2260

            /* slave_repl_offset:<offset> */
            if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18))
                ri->slave_repl_offset = strtoull(l+18,NULL,10);
2261 2262 2263 2264 2265
        }
    }
    ri->info_refresh = mstime();
    sdsfreesplitres(lines,numlines);

A
antirez 已提交
2266 2267 2268
    /* ---------------------------- Acting half -----------------------------
     * Some things will not happen if sentinel.tilt is true, but some will
     * still be processed. */
2269

2270 2271 2272 2273 2274
    /* Remember when the role changed. */
    if (role != ri->role_reported) {
        ri->role_reported_time = mstime();
        ri->role_reported = role;
        if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
2275 2276
        /* Log the event with +role-change if the new role is coherent or
         * with -role-change if there is a mismatch with the current config. */
A
antirez 已提交
2277
        sentinelEvent(LL_VERBOSE,
2278 2279 2280 2281 2282
            ((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
            "+role-change" : "-role-change",
            ri, "%@ new reported role is %s",
            role == SRI_MASTER ? "master" : "slave",
            ri->flags & SRI_MASTER ? "master" : "slave");
2283 2284
    }

2285 2286 2287 2288
    /* None of the following conditions are processed when in tilt mode, so
     * return asap. */
    if (sentinel.tilt) return;

2289
    /* Handle master -> slave role switch. */
2290
    if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
2291 2292 2293
        /* Nothing to do, but masters claiming to be slaves are
         * considered to be unreachable by Sentinel, so eventually
         * a failover will be triggered. */
2294 2295
    }

A
antirez 已提交
2296
    /* Handle slave -> master role switch. */
2297
    if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
2298 2299
        /* If this is a promoted slave we can change state to the
         * failover state machine. */
2300 2301
        if ((ri->flags & SRI_PROMOTED) &&
            (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
2302 2303 2304
            (ri->master->failover_state ==
                SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
        {
2305 2306 2307 2308 2309 2310
            /* Now that we are sure the slave was reconfigured as a master
             * set the master configuration epoch to the epoch we won the
             * election to perform this failover. This will force the other
             * Sentinels to update their config (assuming there is not
             * a newer one already available). */
            ri->master->config_epoch = ri->master->failover_epoch;
2311 2312
            ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
            ri->master->failover_state_change_time = mstime();
2313
            sentinelFlushConfig();
A
antirez 已提交
2314
            sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
2315 2316 2317
            if (sentinel.simfailure_flags &
                SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
                sentinelSimFailureCrash();
A
antirez 已提交
2318
            sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
2319 2320 2321
                ri->master,"%@");
            sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
                "start",ri->master->addr,ri->addr);
2322
            sentinelForceHelloUpdateForMaster(ri->master);
2323
        } else {
2324
            /* A slave turned into a master. We want to force our view and
2325 2326
             * reconfigure as slave. Wait some time after the change before
             * going forward, to receive new configs if any. */
2327
            mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;
2328

2329 2330 2331 2332
            if (!(ri->flags & SRI_PROMOTED) &&
                 sentinelMasterLooksSane(ri->master) &&
                 sentinelRedisInstanceNoDownFor(ri,wait_time) &&
                 mstime() - ri->role_reported_time > wait_time)
2333
            {
2334
                int retval = sentinelSendSlaveOf(ri,
2335 2336
                        ri->master->addr->ip,
                        ri->master->addr->port);
2337
                if (retval == C_OK)
A
antirez 已提交
2338
                    sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
2339 2340 2341 2342
            }
        }
    }

2343
    /* Handle slaves replicating to a different master address. */
2344
    if ((ri->flags & SRI_SLAVE) &&
2345
        role == SRI_SLAVE &&
2346 2347 2348
        (ri->slave_master_port != ri->master->addr->port ||
         strcasecmp(ri->slave_master_host,ri->master->addr->ip)))
    {
2349
        mstime_t wait_time = ri->master->failover_timeout;
2350 2351 2352

        /* Make sure the master is sane before reconfiguring this instance
         * into a slave. */
2353 2354 2355
        if (sentinelMasterLooksSane(ri->master) &&
            sentinelRedisInstanceNoDownFor(ri,wait_time) &&
            mstime() - ri->slave_conf_change_time > wait_time)
2356 2357 2358 2359
        {
            int retval = sentinelSendSlaveOf(ri,
                    ri->master->addr->ip,
                    ri->master->addr->port);
2360
            if (retval == C_OK)
A
antirez 已提交
2361
                sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@");
2362 2363 2364
        }
    }

2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378
    /* Detect if the slave that is in the process of being reconfigured
     * changed state. */
    if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
        (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
    {
        /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
        if ((ri->flags & SRI_RECONF_SENT) &&
            ri->slave_master_host &&
            strcmp(ri->slave_master_host,
                    ri->master->promoted_slave->addr->ip) == 0 &&
            ri->slave_master_port == ri->master->promoted_slave->addr->port)
        {
            ri->flags &= ~SRI_RECONF_SENT;
            ri->flags |= SRI_RECONF_INPROG;
A
antirez 已提交
2379
            sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
2380 2381 2382 2383 2384 2385 2386 2387
        }

        /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
        if ((ri->flags & SRI_RECONF_INPROG) &&
            ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
        {
            ri->flags &= ~SRI_RECONF_INPROG;
            ri->flags |= SRI_RECONF_DONE;
A
antirez 已提交
2388
            sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
2389 2390 2391 2392 2393
        }
    }
}

void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
A
antirez 已提交
2394 2395
    sentinelRedisInstance *ri = privdata;
    instanceLink *link = c->data;
2396 2397
    redisReply *r;

A
antirez 已提交
2398 2399
    if (!reply || !link) return;
    link->pending_commands--;
2400 2401
    r = reply;

A
antirez 已提交
2402
    if (r->type == REDIS_REPLY_STRING)
2403 2404 2405 2406 2407 2408
        sentinelRefreshInstanceInfo(ri,r->str);
}

/* Just discard the reply. We use this when we are not monitoring the return
 * value of the command but its effects directly. */
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
A
antirez 已提交
2409
    instanceLink *link = c->data;
A
antirez 已提交
2410 2411
    UNUSED(reply);
    UNUSED(privdata);
2412

A
antirez 已提交
2413
    if (link) link->pending_commands--;
2414 2415 2416
}

void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
A
antirez 已提交
2417 2418
    sentinelRedisInstance *ri = privdata;
    instanceLink *link = c->data;
2419 2420
    redisReply *r;

A
antirez 已提交
2421 2422
    if (!reply || !link) return;
    link->pending_commands--;
2423 2424 2425 2426 2427 2428 2429 2430 2431 2432
    r = reply;

    if (r->type == REDIS_REPLY_STATUS ||
        r->type == REDIS_REPLY_ERROR) {
        /* Update the "instance available" field only if this is an
         * acceptable reply. */
        if (strncmp(r->str,"PONG",4) == 0 ||
            strncmp(r->str,"LOADING",7) == 0 ||
            strncmp(r->str,"MASTERDOWN",10) == 0)
        {
A
antirez 已提交
2433
            link->last_avail_time = mstime();
2434
            link->act_ping_time = 0; /* Flag the pong as received. */
2435 2436 2437 2438 2439 2440 2441
        } else {
            /* Send a SCRIPT KILL command if the instance appears to be
             * down because of a busy script. */
            if (strncmp(r->str,"BUSY",4) == 0 &&
                (ri->flags & SRI_S_DOWN) &&
                !(ri->flags & SRI_SCRIPT_KILL_SENT))
            {
A
antirez 已提交
2442 2443
                if (redisAsyncCommand(ri->link->cc,
                        sentinelDiscardReplyCallback, ri,
2444 2445 2446
                        "%s KILL",
                        sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK)
                {
A
antirez 已提交
2447
                    ri->link->pending_commands++;
2448
                }
2449 2450
                ri->flags |= SRI_SCRIPT_KILL_SENT;
            }
2451 2452
        }
    }
A
antirez 已提交
2453
    link->last_pong_time = mstime();
2454 2455 2456 2457 2458
}

/* This is called when we get the reply about the PUBLISH command we send
 * to the master to advertise this sentinel. */
void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
A
antirez 已提交
2459 2460
    sentinelRedisInstance *ri = privdata;
    instanceLink *link = c->data;
2461 2462
    redisReply *r;

A
antirez 已提交
2463 2464
    if (!reply || !link) return;
    link->pending_commands--;
2465 2466 2467
    r = reply;

    /* Only update pub_time if we actually published our message. Otherwise
2468
     * we'll retry again in 100 milliseconds. */
2469 2470 2471 2472
    if (r->type != REDIS_REPLY_ERROR)
        ri->last_pub_time = mstime();
}

2473 2474 2475 2476
/* Process an hello message received via Pub/Sub in master or slave instance,
 * or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel.
 *
 * If the master name specified in the message is not known, the message is
2477
 * discarded. */
2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501
void sentinelProcessHelloMessage(char *hello, int hello_len) {
    /* Format is composed of 8 tokens:
     * 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
     * 5=master_ip,6=master_port,7=master_config_epoch. */
    int numtokens, port, removed, master_port;
    uint64_t current_epoch, master_config_epoch;
    char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
    sentinelRedisInstance *si, *master;

    if (numtokens == 8) {
        /* Obtain a reference to the master this hello message is about */
        master = sentinelGetMasterByName(token[4]);
        if (!master) goto cleanup; /* Unknown master, skip the message. */

        /* First, try to see if we already have this sentinel. */
        port = atoi(token[1]);
        master_port = atoi(token[6]);
        si = getSentinelRedisInstanceByAddrAndRunID(
                        master->sentinels,token[0],port,token[2]);
        current_epoch = strtoull(token[3],NULL,10);
        master_config_epoch = strtoull(token[7],NULL,10);

        if (!si) {
            /* If not, remove all the sentinels that have the same runid
2502 2503 2504
             * because there was an address change, and add the same Sentinel
             * with the new address back. */
            removed = removeMatchingSentinelFromMaster(master,token[2]);
2505
            if (removed) {
A
antirez 已提交
2506
                sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
2507
                    "%@ ip %s port %d for %s", token[0],port,token[2]);
2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520
            } else {
                /* Check if there is another Sentinel with the same address this
                 * new one is reporting. What we do if this happens is to set its
                 * port to 0, to signal the address is invalid. We'll update it
                 * later if we get an HELLO message. */
                sentinelRedisInstance *other =
                    getSentinelRedisInstanceByAddrAndRunID(
                        master->sentinels, token[0],port,NULL);
                if (other) {
                    sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
                    other->addr->port = 0; /* It means: invalid address. */
                    sentinelUpdateSentinelAddressInAllMasters(other);
                }
2521 2522 2523
            }

            /* Add the new sentinel. */
2524
            si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
2525
                            token[0],port,master->quorum,master);
2526

2527
            if (si) {
A
antirez 已提交
2528
                if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
2529 2530 2531 2532
                /* The runid is NULL after a new instance creation and
                 * for Sentinels we don't have a later chance to fill it,
                 * so do it now. */
                si->runid = sdsnew(token[2]);
2533
                sentinelTryConnectionSharing(si);
2534
                if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
2535 2536 2537 2538 2539 2540 2541
                sentinelFlushConfig();
            }
        }

        /* Update local current_epoch if received current_epoch is greater.*/
        if (current_epoch > sentinel.current_epoch) {
            sentinel.current_epoch = current_epoch;
2542
            sentinelFlushConfig();
A
antirez 已提交
2543
            sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
2544 2545 2546 2547
                (unsigned long long) sentinel.current_epoch);
        }

        /* Update master info if received configuration is newer. */
2548
        if (si && master->config_epoch < master_config_epoch) {
2549 2550 2551 2552 2553 2554
            master->config_epoch = master_config_epoch;
            if (master_port != master->addr->port ||
                strcmp(master->addr->ip, token[5]))
            {
                sentinelAddr *old_addr;

A
antirez 已提交
2555 2556
                sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
                sentinelEvent(LL_WARNING,"+switch-master",
2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579
                    master,"%s %s %d %s %d",
                    master->name,
                    master->addr->ip, master->addr->port,
                    token[5], master_port);

                old_addr = dupSentinelAddr(master->addr);
                sentinelResetMasterAndChangeAddress(master, token[5], master_port);
                sentinelCallClientReconfScript(master,
                    SENTINEL_OBSERVER,"start",
                    old_addr,master->addr);
                releaseSentinelAddr(old_addr);
            }
        }

        /* Update the state of the Sentinel. */
        if (si) si->last_hello_time = mstime();
    }

cleanup:
    sdsfreesplitres(token,numtokens);
}


2580 2581 2582
/* This is our Pub/Sub callback for the Hello channel. It's useful in order
 * to discover other sentinels attached at the same master. */
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
2583
    sentinelRedisInstance *ri = privdata;
2584
    redisReply *r;
A
antirez 已提交
2585
    UNUSED(c);
2586

2587
    if (!reply || !ri) return;
2588 2589 2590 2591
    r = reply;

    /* Update the last activity in the pubsub channel. Note that since we
     * receive our messages as well this timestamp can be used to detect
G
guiquanz 已提交
2592
     * if the link is probably disconnected even if it seems otherwise. */
A
antirez 已提交
2593
    ri->link->pc_last_activity = mstime();
2594

2595 2596 2597 2598 2599 2600 2601 2602 2603 2604
    /* Sanity check in the reply we expect, so that the code that follows
     * can avoid to check for details. */
    if (r->type != REDIS_REPLY_ARRAY ||
        r->elements != 3 ||
        r->element[0]->type != REDIS_REPLY_STRING ||
        r->element[1]->type != REDIS_REPLY_STRING ||
        r->element[2]->type != REDIS_REPLY_STRING ||
        strcmp(r->element[0]->str,"message") != 0) return;

    /* We are not interested in meeting ourselves */
2605
    if (strstr(r->element[2]->str,sentinel.myid) != NULL) return;
2606

2607
    sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
2608 2609
}

2610
/* Send an "Hello" message via Pub/Sub to the specified 'ri' Redis
J
Jack Drogon 已提交
2611
 * instance in order to broadcast the current configuration for this
2612 2613 2614 2615 2616 2617 2618
 * master, and to advertise the existence of this Sentinel at the same time.
 *
 * The message has the following format:
 *
 * sentinel_ip,sentinel_port,sentinel_runid,current_epoch,
 * master_name,master_ip,master_port,master_config_epoch.
 *
2619 2620
 * Returns C_OK if the PUBLISH was queued correctly, otherwise
 * C_ERR is returned. */
2621
int sentinelSendHello(sentinelRedisInstance *ri) {
A
antirez 已提交
2622 2623
    char ip[NET_IP_STR_LEN];
    char payload[NET_IP_STR_LEN+1024];
2624
    int retval;
2625 2626
    char *announce_ip;
    int announce_port;
2627 2628 2629
    sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
    sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);

2630
    if (ri->link->disconnected) return C_ERR;
2631

2632 2633
    /* Use the specified announce address if specified, otherwise try to
     * obtain our own IP address. */
2634 2635
    if (sentinel.announce_ip) {
        announce_ip = sentinel.announce_ip;
2636
    } else {
A
antirez 已提交
2637
        if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
2638
            return C_ERR;
2639
        announce_ip = ip;
2640
    }
Y
Yossi Gottlieb 已提交
2641 2642 2643
    if (sentinel.announce_port) announce_port = sentinel.announce_port;
    else if (server.tls_replication && server.tls_port) announce_port = server.tls_port;
    else announce_port = server.port;
2644 2645 2646 2647

    /* Format and send the Hello message. */
    snprintf(payload,sizeof(payload),
        "%s,%d,%s,%llu," /* Info about this sentinel. */
2648
        "%s,%s,%d,%llu", /* Info about current master. */
2649
        announce_ip, announce_port, sentinel.myid,
2650 2651 2652
        (unsigned long long) sentinel.current_epoch,
        /* --- */
        master->name,master_addr->ip,master_addr->port,
2653
        (unsigned long long) master->config_epoch);
A
antirez 已提交
2654
    retval = redisAsyncCommand(ri->link->cc,
2655 2656 2657
        sentinelPublishReplyCallback, ri, "%s %s %s",
        sentinelInstanceMapCommand(ri,"PUBLISH"),
        SENTINEL_HELLO_CHANNEL,payload);
2658
    if (retval != C_OK) return C_ERR;
A
antirez 已提交
2659
    ri->link->pending_commands++;
2660
    return C_OK;
2661 2662
}

2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686
/* Reset last_pub_time in all the instances in the specified dictionary
 * in order to force the delivery of an Hello update ASAP. */
void sentinelForceHelloUpdateDictOfRedisInstances(dict *instances) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetSafeIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        if (ri->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
            ri->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
    }
    dictReleaseIterator(di);
}

/* This function forces the delivery of an "Hello" message (see
 * sentinelSendHello() top comment for further information) to all the Redis
 * and Sentinel instances related to the specified 'master'.
 *
 * It is technically not needed since we send an update to every instance
 * with a period of SENTINEL_PUBLISH_PERIOD milliseconds, however when a
 * Sentinel upgrades a configuration it is a good idea to deliever an update
 * to the other Sentinels ASAP. */
int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) {
2687
    if (!(master->flags & SRI_MASTER)) return C_ERR;
2688 2689 2690 2691
    if (master->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
        master->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
    sentinelForceHelloUpdateDictOfRedisInstances(master->sentinels);
    sentinelForceHelloUpdateDictOfRedisInstances(master->slaves);
2692
    return C_OK;
2693 2694
}

2695
/* Send a PING to the specified instance and refresh the act_ping_time
2696 2697 2698 2699 2700
 * if it is zero (that is, if we received a pong for the previous ping).
 *
 * On error zero is returned, and we can't consider the PING command
 * queued in the connection. */
int sentinelSendPing(sentinelRedisInstance *ri) {
A
antirez 已提交
2701
    int retval = redisAsyncCommand(ri->link->cc,
2702 2703
        sentinelPingReplyCallback, ri, "%s",
        sentinelInstanceMapCommand(ri,"PING"));
2704
    if (retval == C_OK) {
A
antirez 已提交
2705
        ri->link->pending_commands++;
2706 2707 2708
        ri->link->last_ping_time = mstime();
        /* We update the active ping time only if we received the pong for
         * the previous ping, otherwise we are technically waiting since the
2709
         * first ping that did not receive a reply. */
2710 2711
        if (ri->link->act_ping_time == 0)
            ri->link->act_ping_time = ri->link->last_ping_time;
2712 2713 2714 2715 2716 2717
        return 1;
    } else {
        return 0;
    }
}

2718 2719
/* Send periodic PING, INFO, and PUBLISH to the Hello channel to
 * the specified master or slave instance. */
2720
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
2721
    mstime_t now = mstime();
2722
    mstime_t info_period, ping_period;
2723 2724 2725 2726
    int retval;

    /* Return ASAP if we have already a PING or INFO already pending, or
     * in the case the instance is not properly connected. */
A
antirez 已提交
2727
    if (ri->link->disconnected) return;
2728 2729 2730 2731 2732 2733 2734

    /* For INFO, PING, PUBLISH that are not critical commands to send we
     * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
     * want to use a lot of memory just because a link is not working
     * properly (note that anyway there is a redundant protection about this,
     * that is, the link will be disconnected and reconnected if a long
     * timeout condition is detected. */
A
antirez 已提交
2735 2736
    if (ri->link->pending_commands >=
        SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
2737 2738 2739 2740

    /* If this is a slave of a master in O_DOWN condition we start sending
     * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
     * period. In this state we want to closely monitor slaves in case they
2741 2742 2743 2744 2745
     * are turned into masters by another Sentinel, or by the sysadmin.
     *
     * Similarly we monitor the INFO output more often if the slave reports
     * to be disconnected from the master, so that we can have a fresh
     * disconnection time figure. */
2746
    if ((ri->flags & SRI_SLAVE) &&
2747 2748 2749
        ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
         (ri->master_link_down_time != 0)))
    {
2750 2751 2752 2753 2754
        info_period = 1000;
    } else {
        info_period = SENTINEL_INFO_PERIOD;
    }

2755 2756 2757 2758 2759 2760
    /* We ping instances every time the last received pong is older than
     * the configured 'down-after-milliseconds' time, but every second
     * anyway if 'down-after-milliseconds' is greater than 1 second. */
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

2761
    /* Send INFO to masters and slaves, not sentinels. */
2762 2763 2764 2765
    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
A
antirez 已提交
2766
        retval = redisAsyncCommand(ri->link->cc,
2767 2768
            sentinelInfoReplyCallback, ri, "%s",
            sentinelInstanceMapCommand(ri,"INFO"));
2769
        if (retval == C_OK) ri->link->pending_commands++;
2770 2771 2772 2773
    }

    /* Send PING to all the three kinds of instances. */
    if ((now - ri->link->last_pong_time) > ping_period &&
2774
               (now - ri->link->last_ping_time) > ping_period/2) {
2775
        sentinelSendPing(ri);
2776 2777 2778 2779
    }

    /* PUBLISH hello messages to all the three kinds of instances. */
    if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
2780
        sentinelSendHello(ri);
2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799
    }
}

/* =========================== SENTINEL command ============================= */

const char *sentinelFailoverStateStr(int state) {
    switch(state) {
    case SENTINEL_FAILOVER_STATE_NONE: return "none";
    case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
    case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
    case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
    case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
    case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
    case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
    default: return "unknown";
    }
}

/* Redis instance to Redis protocol representation. */
2800
void addReplySentinelRedisInstance(client *c, sentinelRedisInstance *ri) {
2801 2802 2803 2804
    char *flags = sdsempty();
    void *mbl;
    int fields = 0;

A
antirez 已提交
2805
    mbl = addReplyDeferredLen(c);
2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828

    addReplyBulkCString(c,"name");
    addReplyBulkCString(c,ri->name);
    fields++;

    addReplyBulkCString(c,"ip");
    addReplyBulkCString(c,ri->addr->ip);
    fields++;

    addReplyBulkCString(c,"port");
    addReplyBulkLongLong(c,ri->addr->port);
    fields++;

    addReplyBulkCString(c,"runid");
    addReplyBulkCString(c,ri->runid ? ri->runid : "");
    fields++;

    addReplyBulkCString(c,"flags");
    if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
    if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
    if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
    if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
    if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
A
antirez 已提交
2829
    if (ri->link->disconnected) flags = sdscat(flags,"disconnected,");
2830 2831 2832 2833 2834 2835 2836 2837
    if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
    if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
        flags = sdscat(flags,"failover_in_progress,");
    if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
    if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
    if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
    if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");

2838
    if (sdslen(flags) != 0) sdsrange(flags,0,-2); /* remove last "," */
2839 2840 2841 2842
    addReplyBulkCString(c,flags);
    sdsfree(flags);
    fields++;

A
antirez 已提交
2843 2844
    addReplyBulkCString(c,"link-pending-commands");
    addReplyBulkLongLong(c,ri->link->pending_commands);
2845 2846
    fields++;

2847 2848 2849 2850
    addReplyBulkCString(c,"link-refcount");
    addReplyBulkLongLong(c,ri->link->refcount);
    fields++;

2851 2852 2853 2854 2855 2856
    if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
        addReplyBulkCString(c,"failover-state");
        addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
        fields++;
    }

2857 2858
    addReplyBulkCString(c,"last-ping-sent");
    addReplyBulkLongLong(c,
2859
        ri->link->act_ping_time ? (mstime() - ri->link->act_ping_time) : 0);
2860 2861
    fields++;

2862
    addReplyBulkCString(c,"last-ok-ping-reply");
A
antirez 已提交
2863
    addReplyBulkLongLong(c,mstime() - ri->link->last_avail_time);
2864 2865 2866
    fields++;

    addReplyBulkCString(c,"last-ping-reply");
A
antirez 已提交
2867
    addReplyBulkLongLong(c,mstime() - ri->link->last_pong_time);
2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881
    fields++;

    if (ri->flags & SRI_S_DOWN) {
        addReplyBulkCString(c,"s-down-time");
        addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
        fields++;
    }

    if (ri->flags & SRI_O_DOWN) {
        addReplyBulkCString(c,"o-down-time");
        addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
        fields++;
    }

2882 2883 2884 2885
    addReplyBulkCString(c,"down-after-milliseconds");
    addReplyBulkLongLong(c,ri->down_after_period);
    fields++;

2886 2887 2888 2889 2890
    /* Masters and Slaves */
    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
        addReplyBulkCString(c,"info-refresh");
        addReplyBulkLongLong(c,mstime() - ri->info_refresh);
        fields++;
2891 2892 2893 2894 2895 2896 2897 2898 2899

        addReplyBulkCString(c,"role-reported");
        addReplyBulkCString(c, (ri->role_reported == SRI_MASTER) ? "master" :
                                                                   "slave");
        fields++;

        addReplyBulkCString(c,"role-reported-time");
        addReplyBulkLongLong(c,mstime() - ri->role_reported_time);
        fields++;
2900 2901 2902 2903
    }

    /* Only masters */
    if (ri->flags & SRI_MASTER) {
2904 2905 2906 2907
        addReplyBulkCString(c,"config-epoch");
        addReplyBulkLongLong(c,ri->config_epoch);
        fields++;

2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918
        addReplyBulkCString(c,"num-slaves");
        addReplyBulkLongLong(c,dictSize(ri->slaves));
        fields++;

        addReplyBulkCString(c,"num-other-sentinels");
        addReplyBulkLongLong(c,dictSize(ri->sentinels));
        fields++;

        addReplyBulkCString(c,"quorum");
        addReplyBulkLongLong(c,ri->quorum);
        fields++;
2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938

        addReplyBulkCString(c,"failover-timeout");
        addReplyBulkLongLong(c,ri->failover_timeout);
        fields++;

        addReplyBulkCString(c,"parallel-syncs");
        addReplyBulkLongLong(c,ri->parallel_syncs);
        fields++;

        if (ri->notification_script) {
            addReplyBulkCString(c,"notification-script");
            addReplyBulkCString(c,ri->notification_script);
            fields++;
        }

        if (ri->client_reconfig_script) {
            addReplyBulkCString(c,"client-reconfig-script");
            addReplyBulkCString(c,ri->client_reconfig_script);
            fields++;
        }
2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960
    }

    /* Only slaves */
    if (ri->flags & SRI_SLAVE) {
        addReplyBulkCString(c,"master-link-down-time");
        addReplyBulkLongLong(c,ri->master_link_down_time);
        fields++;

        addReplyBulkCString(c,"master-link-status");
        addReplyBulkCString(c,
            (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
            "ok" : "err");
        fields++;

        addReplyBulkCString(c,"master-host");
        addReplyBulkCString(c,
            ri->slave_master_host ? ri->slave_master_host : "?");
        fields++;

        addReplyBulkCString(c,"master-port");
        addReplyBulkLongLong(c,ri->slave_master_port);
        fields++;
2961 2962 2963 2964

        addReplyBulkCString(c,"slave-priority");
        addReplyBulkLongLong(c,ri->slave_priority);
        fields++;
2965 2966 2967 2968

        addReplyBulkCString(c,"slave-repl-offset");
        addReplyBulkLongLong(c,ri->slave_repl_offset);
        fields++;
2969 2970 2971 2972 2973 2974 2975 2976
    }

    /* Only sentinels */
    if (ri->flags & SRI_SENTINEL) {
        addReplyBulkCString(c,"last-hello-message");
        addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
        fields++;

2977 2978 2979 2980 2981 2982 2983
        addReplyBulkCString(c,"voted-leader");
        addReplyBulkCString(c,ri->leader ? ri->leader : "?");
        fields++;

        addReplyBulkCString(c,"voted-leader-epoch");
        addReplyBulkLongLong(c,ri->leader_epoch);
        fields++;
2984 2985
    }

A
antirez 已提交
2986
    setDeferredMapLen(c,mbl,fields);
2987 2988
}

G
guiquanz 已提交
2989
/* Output a number of instances contained inside a dictionary as
2990
 * Redis protocol. */
2991
void addReplyDictOfRedisInstances(client *c, dict *instances) {
2992 2993 2994 2995
    dictIterator *di;
    dictEntry *de;

    di = dictGetIterator(instances);
A
antirez 已提交
2996
    addReplyArrayLen(c,dictSize(instances));
2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

        addReplySentinelRedisInstance(c,ri);
    }
    dictReleaseIterator(di);
}

/* Lookup the named master into sentinel.masters.
 * If the master is not found reply to the client with an error and returns
 * NULL. */
3008
sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(client *c,
3009 3010 3011 3012
                        robj *name)
{
    sentinelRedisInstance *ri;

3013
    ri = dictFetchValue(sentinel.masters,name->ptr);
3014 3015 3016 3017 3018 3019 3020
    if (!ri) {
        addReplyError(c,"No such master with that name");
        return NULL;
    }
    return ri;
}

A
antirez 已提交
3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045
#define SENTINEL_ISQR_OK 0
#define SENTINEL_ISQR_NOQUORUM (1<<0)
#define SENTINEL_ISQR_NOAUTH (1<<1)
int sentinelIsQuorumReachable(sentinelRedisInstance *master, int *usableptr) {
    dictIterator *di;
    dictEntry *de;
    int usable = 1; /* Number of usable Sentinels. Init to 1 to count myself. */
    int result = SENTINEL_ISQR_OK;
    int voters = dictSize(master->sentinels)+1; /* Known Sentinels + myself. */

    di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

        if (ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
        usable++;
    }
    dictReleaseIterator(di);

    if (usable < (int)master->quorum) result |= SENTINEL_ISQR_NOQUORUM;
    if (usable < voters/2+1) result |= SENTINEL_ISQR_NOAUTH;
    if (usableptr) *usableptr = usable;
    return result;
}

3046
void sentinelCommand(client *c) {
3047 3048 3049 3050
    if (!strcasecmp(c->argv[1]->ptr,"masters")) {
        /* SENTINEL MASTERS */
        if (c->argc != 2) goto numargserr;
        addReplyDictOfRedisInstances(c,sentinel.masters);
3051 3052 3053 3054
    } else if (!strcasecmp(c->argv[1]->ptr,"master")) {
        /* SENTINEL MASTER <name> */
        sentinelRedisInstance *ri;

3055
        if (c->argc != 3) goto numargserr;
3056 3057 3058
        if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
            == NULL) return;
        addReplySentinelRedisInstance(c,ri);
3059 3060 3061 3062
    } else if (!strcasecmp(c->argv[1]->ptr,"slaves") ||
               !strcasecmp(c->argv[1]->ptr,"replicas"))
    {
        /* SENTINEL REPLICAS <master-name> */
3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077
        sentinelRedisInstance *ri;

        if (c->argc != 3) goto numargserr;
        if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
            return;
        addReplyDictOfRedisInstances(c,ri->slaves);
    } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
        /* SENTINEL SENTINELS <master-name> */
        sentinelRedisInstance *ri;

        if (c->argc != 3) goto numargserr;
        if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
            return;
        addReplyDictOfRedisInstances(c,ri->sentinels);
    } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
3078 3079 3080 3081 3082 3083 3084 3085 3086 3087
        /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>
         *
         * Arguments:
         *
         * ip and port are the ip and port of the master we want to be
         * checked by Sentinel. Note that the command will not check by
         * name but just by master, in theory different Sentinels may monitor
         * differnet masters with the same name.
         *
         * current-epoch is needed in order to understand if we are allowed
A
antirez 已提交
3088
         * to vote for a failover leader or not. Each Sentinel can vote just
3089 3090 3091 3092 3093 3094
         * one time per epoch.
         *
         * runid is "*" if we are not seeking for a vote from the Sentinel
         * in order to elect the failover leader. Otherwise it is set to the
         * runid we want the Sentinel to vote if it did not already voted.
         */
3095
        sentinelRedisInstance *ri;
3096 3097
        long long req_epoch;
        uint64_t leader_epoch = 0;
3098 3099 3100 3101
        char *leader = NULL;
        long port;
        int isdown = 0;

3102
        if (c->argc != 6) goto numargserr;
3103
        if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK ||
3104
            getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
3105
                                                              != C_OK)
3106 3107 3108 3109 3110 3111 3112 3113 3114 3115
            return;
        ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
            c->argv[2]->ptr,port,NULL);

        /* It exists? Is actually a master? Is subjectively down? It's down.
         * Note: if we are in tilt mode we always reply with "0". */
        if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
                                    (ri->flags & SRI_MASTER))
            isdown = 1;

3116 3117 3118
        /* Vote for the master (or fetch the previous vote) if the request
         * includes a runid, otherwise the sender is not seeking for a vote. */
        if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
3119 3120 3121 3122 3123 3124 3125
            leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
                                            c->argv[5]->ptr,
                                            &leader_epoch);
        }

        /* Reply with a three-elements multi-bulk reply:
         * down state, leader, vote epoch. */
A
antirez 已提交
3126
        addReplyArrayLen(c,3);
3127
        addReply(c, isdown ? shared.cone : shared.czero);
3128
        addReplyBulkCString(c, leader ? leader : "*");
3129
        addReplyLongLong(c, (long long)leader_epoch);
3130 3131 3132 3133
        if (leader) sdsfree(leader);
    } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
        /* SENTINEL RESET <pattern> */
        if (c->argc != 3) goto numargserr;
3134
        addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
3135 3136 3137 3138 3139 3140 3141
    } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
        /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
        sentinelRedisInstance *ri;

        if (c->argc != 3) goto numargserr;
        ri = sentinelGetMasterByName(c->argv[2]->ptr);
        if (ri == NULL) {
3142
            addReplyNullArray(c);
3143
        } else {
3144 3145
            sentinelAddr *addr = sentinelGetCurrentMasterAddress(ri);

A
antirez 已提交
3146
            addReplyArrayLen(c,2);
3147 3148 3149
            addReplyBulkCString(c,addr->ip);
            addReplyBulkLongLong(c,addr->port);
        }
3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161
    } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
        /* SENTINEL FAILOVER <master-name> */
        sentinelRedisInstance *ri;

        if (c->argc != 3) goto numargserr;
        if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
            return;
        if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
            addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
            return;
        }
        if (sentinelSelectSlave(ri) == NULL) {
3162
            addReplySds(c,sdsnew("-NOGOODSLAVE No suitable replica to promote\r\n"));
3163 3164
            return;
        }
A
antirez 已提交
3165
        serverLog(LL_WARNING,"Executing user requested FAILOVER of '%s'",
A
antirez 已提交
3166
            ri->name);
3167
        sentinelStartFailover(ri);
3168 3169
        ri->flags |= SRI_FORCE_FAILOVER;
        addReply(c,shared.ok);
3170 3171 3172 3173 3174
    } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
        /* SENTINEL PENDING-SCRIPTS */

        if (c->argc != 2) goto numargserr;
        sentinelPendingScriptsCommand(c);
3175 3176
    } else if (!strcasecmp(c->argv[1]->ptr,"monitor")) {
        /* SENTINEL MONITOR <name> <ip> <port> <quorum> */
3177
        sentinelRedisInstance *ri;
3178
        long quorum, port;
A
antirez 已提交
3179
        char ip[NET_IP_STR_LEN];
3180 3181 3182

        if (c->argc != 6) goto numargserr;
        if (getLongFromObjectOrReply(c,c->argv[5],&quorum,"Invalid quorum")
3183
            != C_OK) return;
3184
        if (getLongFromObjectOrReply(c,c->argv[4],&port,"Invalid port")
3185
            != C_OK) return;
3186 3187 3188 3189 3190 3191

        if (quorum <= 0) {
            addReplyError(c, "Quorum must be 1 or greater.");
            return;
        }

3192 3193 3194
        /* Make sure the IP field is actually a valid IP before passing it
         * to createSentinelRedisInstance(), otherwise we may trigger a
         * DNS lookup at runtime. */
3195
        if (anetResolveIP(NULL,c->argv[3]->ptr,ip,sizeof(ip)) == ANET_ERR) {
3196 3197 3198
            addReplyError(c,"Invalid IP address specified");
            return;
        }
3199 3200 3201 3202 3203

        /* Parameters are valid. Try to create the master instance. */
        ri = createSentinelRedisInstance(c->argv[2]->ptr,SRI_MASTER,
                c->argv[3]->ptr,port,quorum,NULL);
        if (ri == NULL) {
3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215
            switch(errno) {
            case EBUSY:
                addReplyError(c,"Duplicated master name");
                break;
            case EINVAL:
                addReplyError(c,"Invalid port number");
                break;
            default:
                addReplyError(c,"Unspecified error adding the instance");
                break;
            }
        } else {
3216
            sentinelFlushConfig();
A
antirez 已提交
3217
            sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
3218 3219
            addReply(c,shared.ok);
        }
3220
    } else if (!strcasecmp(c->argv[1]->ptr,"flushconfig")) {
3221
        if (c->argc != 2) goto numargserr;
3222 3223 3224
        sentinelFlushConfig();
        addReply(c,shared.ok);
        return;
3225 3226 3227 3228
    } else if (!strcasecmp(c->argv[1]->ptr,"remove")) {
        /* SENTINEL REMOVE <name> */
        sentinelRedisInstance *ri;

3229
        if (c->argc != 3) goto numargserr;
3230 3231
        if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
            == NULL) return;
A
antirez 已提交
3232
        sentinelEvent(LL_WARNING,"-monitor",ri,"%@");
3233 3234 3235
        dictDelete(sentinel.masters,c->argv[2]->ptr);
        sentinelFlushConfig();
        addReply(c,shared.ok);
A
antirez 已提交
3236 3237 3238 3239 3240
    } else if (!strcasecmp(c->argv[1]->ptr,"ckquorum")) {
        /* SENTINEL CKQUORUM <name> */
        sentinelRedisInstance *ri;
        int usable;

3241
        if (c->argc != 3) goto numargserr;
A
antirez 已提交
3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262
        if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
            == NULL) return;
        int result = sentinelIsQuorumReachable(ri,&usable);
        if (result == SENTINEL_ISQR_OK) {
            addReplySds(c, sdscatfmt(sdsempty(),
                "+OK %i usable Sentinels. Quorum and failover authorization "
                "can be reached\r\n",usable));
        } else {
            sds e = sdscatfmt(sdsempty(),
                "-NOQUORUM %i usable Sentinels. ",usable);
            if (result & SENTINEL_ISQR_NOQUORUM)
                e = sdscat(e,"Not enough available Sentinels to reach the"
                             " specified quorum for this master");
            if (result & SENTINEL_ISQR_NOAUTH) {
                if (result & SENTINEL_ISQR_NOQUORUM) e = sdscat(e,". ");
                e = sdscat(e, "Not enough available Sentinels to reach the"
                              " majority and authorize a failover");
            }
            e = sdscat(e,"\r\n");
            addReplySds(c,e);
        }
A
antirez 已提交
3263
    } else if (!strcasecmp(c->argv[1]->ptr,"set")) {
3264
        if (c->argc < 3) goto numargserr;
A
antirez 已提交
3265
        sentinelSetCommand(c);
3266
    } else if (!strcasecmp(c->argv[1]->ptr,"info-cache")) {
3267
        /* SENTINEL INFO-CACHE <name> */
3268
        if (c->argc < 2) goto numargserr;
3269
        mstime_t now = mstime();
3270 3271 3272 3273

        /* Create an ad-hoc dictionary type so that we can iterate
         * a dictionary composed of just the master groups the user
         * requested. */
3274 3275
        dictType copy_keeper = instancesDictType;
        copy_keeper.valDestructor = NULL;
3276 3277
        dict *masters_local = sentinel.masters;
        if (c->argc > 2) {
3278 3279 3280 3281 3282 3283 3284 3285 3286 3287
            masters_local = dictCreate(&copy_keeper, NULL);

            for (int i = 2; i < c->argc; i++) {
                sentinelRedisInstance *ri;
                ri = sentinelGetMasterByName(c->argv[i]->ptr);
                if (!ri) continue; /* ignore non-existing names */
                dictAdd(masters_local, ri->name, ri);
            }
        }

3288 3289 3290 3291 3292 3293 3294 3295
        /* Reply format:
         *   1.) master name
         *   2.) 1.) info from master
         *       2.) info from replica
         *       ...
         *   3.) other master name
         *   ...
         */
A
antirez 已提交
3296
        addReplyArrayLen(c,dictSize(masters_local) * 2);
3297 3298 3299 3300 3301 3302 3303

        dictIterator  *di;
        dictEntry *de;
        di = dictGetIterator(masters_local);
        while ((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);
            addReplyBulkCBuffer(c,ri->name,strlen(ri->name));
A
antirez 已提交
3304 3305
            addReplyArrayLen(c,dictSize(ri->slaves) + 1); /* +1 for self */
            addReplyArrayLen(c,2);
3306
            addReplyLongLong(c, now - ri->info_refresh);
3307 3308 3309
            if (ri->info)
                addReplyBulkCBuffer(c,ri->info,sdslen(ri->info));
            else
A
antirez 已提交
3310
                addReplyNull(c);
3311 3312 3313 3314 3315 3316

            dictIterator *sdi;
            dictEntry *sde;
            sdi = dictGetIterator(ri->slaves);
            while ((sde = dictNext(sdi)) != NULL) {
                sentinelRedisInstance *sri = dictGetVal(sde);
A
antirez 已提交
3317
                addReplyArrayLen(c,2);
3318
                addReplyLongLong(c, now - sri->info_refresh);
3319 3320 3321
                if (sri->info)
                    addReplyBulkCBuffer(c,sri->info,sdslen(sri->info));
                else
A
antirez 已提交
3322
                    addReplyNull(c);
3323 3324 3325 3326
            }
            dictReleaseIterator(sdi);
        }
        dictReleaseIterator(di);
3327
        if (masters_local != sentinel.masters) dictRelease(masters_local);
3328 3329 3330 3331 3332 3333 3334 3335 3336
    } else if (!strcasecmp(c->argv[1]->ptr,"simulate-failure")) {
        /* SENTINEL SIMULATE-FAILURE <flag> <flag> ... <flag> */
        int j;

        sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
        for (j = 2; j < c->argc; j++) {
            if (!strcasecmp(c->argv[j]->ptr,"crash-after-election")) {
                sentinel.simfailure_flags |=
                    SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION;
A
antirez 已提交
3337
                serverLog(LL_WARNING,"Failure simulation: this Sentinel "
3338 3339 3340 3341 3342
                    "will crash after being successfully elected as failover "
                    "leader");
            } else if (!strcasecmp(c->argv[j]->ptr,"crash-after-promotion")) {
                sentinel.simfailure_flags |=
                    SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION;
A
antirez 已提交
3343
                serverLog(LL_WARNING,"Failure simulation: this Sentinel "
3344
                    "will crash after promoting the selected replica to master");
3345
            } else if (!strcasecmp(c->argv[j]->ptr,"help")) {
A
antirez 已提交
3346
                addReplyArrayLen(c,2);
3347 3348
                addReplyBulkCString(c,"crash-after-election");
                addReplyBulkCString(c,"crash-after-promotion");
3349 3350 3351 3352 3353 3354
            } else {
                addReplyError(c,"Unknown failure simulation specified");
                return;
            }
        }
        addReply(c,shared.ok);
3355 3356 3357 3358 3359 3360 3361
    } else {
        addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
                               (char*)c->argv[1]->ptr);
    }
    return;

numargserr:
3362
    addReplyErrorFormat(c,"Wrong number of arguments for 'sentinel %s'",
3363 3364 3365
                          (char*)c->argv[1]->ptr);
}

3366 3367 3368 3369 3370 3371 3372 3373 3374 3375
#define info_section_from_redis(section_name) do { \
    if (defsections || allsections || !strcasecmp(section,section_name)) { \
        sds redissection; \
        if (sections++) info = sdscat(info,"\r\n"); \
        redissection = genRedisInfoString(section_name); \
        info = sdscatlen(info,redissection,sdslen(redissection)); \
        sdsfree(redissection); \
    } \
} while(0)

A
antirez 已提交
3376
/* SENTINEL INFO [section] */
3377
void sentinelInfoCommand(client *c) {
A
antirez 已提交
3378 3379 3380 3381 3382
    if (c->argc > 2) {
        addReply(c,shared.syntaxerr);
        return;
    }

3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393
    int defsections = 0, allsections = 0;
    char *section = c->argc == 2 ? c->argv[1]->ptr : NULL;
    if (section) {
        allsections = !strcasecmp(section,"all");
        defsections = !strcasecmp(section,"default");
    } else {
        defsections = 1;
    }

    int sections = 0;
    sds info = sdsempty();
3394 3395 3396 3397 3398

    info_section_from_redis("server");
    info_section_from_redis("clients");
    info_section_from_redis("cpu");
    info_section_from_redis("stats");
A
antirez 已提交
3399

3400
    if (defsections || allsections || !strcasecmp(section,"sentinel")) {
A
antirez 已提交
3401 3402 3403 3404 3405 3406 3407 3408 3409 3410
        dictIterator *di;
        dictEntry *de;
        int master_id = 0;

        if (sections++) info = sdscat(info,"\r\n");
        info = sdscatprintf(info,
            "# Sentinel\r\n"
            "sentinel_masters:%lu\r\n"
            "sentinel_tilt:%d\r\n"
            "sentinel_running_scripts:%d\r\n"
3411 3412
            "sentinel_scripts_queue_length:%ld\r\n"
            "sentinel_simulate_failure_flags:%lu\r\n",
A
antirez 已提交
3413 3414 3415
            dictSize(sentinel.masters),
            sentinel.tilt,
            sentinel.running_scripts,
3416 3417
            listLength(sentinel.scripts_queue),
            sentinel.simfailure_flags);
A
antirez 已提交
3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436

        di = dictGetIterator(sentinel.masters);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);
            char *status = "ok";

            if (ri->flags & SRI_O_DOWN) status = "odown";
            else if (ri->flags & SRI_S_DOWN) status = "sdown";
            info = sdscatprintf(info,
                "master%d:name=%s,status=%s,address=%s:%d,"
                "slaves=%lu,sentinels=%lu\r\n",
                master_id++, ri->name, status,
                ri->addr->ip, ri->addr->port,
                dictSize(ri->slaves),
                dictSize(ri->sentinels)+1);
        }
        dictReleaseIterator(di);
    }

M
Matt Stancliff 已提交
3437
    addReplyBulkSds(c, info);
A
antirez 已提交
3438 3439
}

J
Jack Drogon 已提交
3440
/* Implements Sentinel version of the ROLE command. The output is
A
antirez 已提交
3441
 * "sentinel" and the list of currently monitored master names. */
3442
void sentinelRoleCommand(client *c) {
A
antirez 已提交
3443 3444 3445
    dictIterator *di;
    dictEntry *de;

A
antirez 已提交
3446
    addReplyArrayLen(c,2);
A
antirez 已提交
3447
    addReplyBulkCBuffer(c,"sentinel",8);
A
antirez 已提交
3448
    addReplyArrayLen(c,dictSize(sentinel.masters));
A
antirez 已提交
3449 3450 3451 3452 3453 3454 3455 3456 3457 3458

    di = dictGetIterator(sentinel.masters);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

        addReplyBulkCString(c,ri->name);
    }
    dictReleaseIterator(di);
}

A
antirez 已提交
3459
/* SENTINEL SET <mastername> [<option> <value> ...] */
3460
void sentinelSetCommand(client *c) {
A
antirez 已提交
3461 3462
    sentinelRedisInstance *ri;
    int j, changes = 0;
3463 3464
    int badarg = 0; /* Bad argument position for error reporting. */
    char *option;
A
antirez 已提交
3465 3466 3467 3468 3469

    if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
        == NULL) return;

    /* Process option - value pairs. */
3470 3471
    for (j = 3; j < c->argc; j++) {
        int moreargs = (c->argc-1) - j;
A
antirez 已提交
3472 3473
        option = c->argv[j]->ptr;
        long long ll;
3474
        int old_j = j; /* Used to know what to log as an event. */
A
antirez 已提交
3475

3476
        if (!strcasecmp(option,"down-after-milliseconds") && moreargs > 0) {
A
antirez 已提交
3477
            /* down-after-millisecodns <milliseconds> */
3478
            robj *o = c->argv[++j];
3479 3480
            if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
                badarg = j;
A
antirez 已提交
3481
                goto badfmt;
3482
            }
A
antirez 已提交
3483
            ri->down_after_period = ll;
3484
            sentinelPropagateDownAfterPeriod(ri);
A
antirez 已提交
3485
            changes++;
3486
        } else if (!strcasecmp(option,"failover-timeout") && moreargs > 0) {
A
antirez 已提交
3487
            /* failover-timeout <milliseconds> */
3488
            robj *o = c->argv[++j];
3489 3490
            if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
                badarg = j;
A
antirez 已提交
3491
                goto badfmt;
3492
            }
A
antirez 已提交
3493 3494
            ri->failover_timeout = ll;
            changes++;
3495
        } else if (!strcasecmp(option,"parallel-syncs") && moreargs > 0) {
A
antirez 已提交
3496
            /* parallel-syncs <milliseconds> */
3497
            robj *o = c->argv[++j];
3498 3499
            if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
                badarg = j;
A
antirez 已提交
3500
                goto badfmt;
3501
            }
A
antirez 已提交
3502 3503
            ri->parallel_syncs = ll;
            changes++;
3504
        } else if (!strcasecmp(option,"notification-script") && moreargs > 0) {
A
antirez 已提交
3505
            /* notification-script <path> */
3506
            char *value = c->argv[++j]->ptr;
3507 3508 3509 3510 3511 3512 3513 3514
            if (sentinel.deny_scripts_reconfig) {
                addReplyError(c,
                    "Reconfiguration of scripts path is denied for "
                    "security reasons. Check the deny-scripts-reconfig "
                    "configuration directive in your Sentinel configuration");
                return;
            }

A
antirez 已提交
3515 3516 3517
            if (strlen(value) && access(value,X_OK) == -1) {
                addReplyError(c,
                    "Notification script seems non existing or non executable");
3518
                if (changes) sentinelFlushConfig();
A
antirez 已提交
3519 3520 3521 3522 3523
                return;
            }
            sdsfree(ri->notification_script);
            ri->notification_script = strlen(value) ? sdsnew(value) : NULL;
            changes++;
3524
        } else if (!strcasecmp(option,"client-reconfig-script") && moreargs > 0) {
A
antirez 已提交
3525
            /* client-reconfig-script <path> */
3526
            char *value = c->argv[++j]->ptr;
3527 3528 3529 3530 3531 3532 3533 3534
            if (sentinel.deny_scripts_reconfig) {
                addReplyError(c,
                    "Reconfiguration of scripts path is denied for "
                    "security reasons. Check the deny-scripts-reconfig "
                    "configuration directive in your Sentinel configuration");
                return;
            }

A
antirez 已提交
3535 3536 3537 3538
            if (strlen(value) && access(value,X_OK) == -1) {
                addReplyError(c,
                    "Client reconfiguration script seems non existing or "
                    "non executable");
3539
                if (changes) sentinelFlushConfig();
A
antirez 已提交
3540 3541 3542 3543 3544
                return;
            }
            sdsfree(ri->client_reconfig_script);
            ri->client_reconfig_script = strlen(value) ? sdsnew(value) : NULL;
            changes++;
3545
        } else if (!strcasecmp(option,"auth-pass") && moreargs > 0) {
A
antirez 已提交
3546
            /* auth-pass <password> */
3547
            char *value = c->argv[++j]->ptr;
A
antirez 已提交
3548 3549 3550
            sdsfree(ri->auth_pass);
            ri->auth_pass = strlen(value) ? sdsnew(value) : NULL;
            changes++;
3551 3552 3553 3554 3555 3556
        } else if (!strcasecmp(option,"auth-user") && moreargs > 0) {
            /* auth-user <username> */
            char *value = c->argv[++j]->ptr;
            sdsfree(ri->auth_user);
            ri->auth_user = strlen(value) ? sdsnew(value) : NULL;
            changes++;
3557
        } else if (!strcasecmp(option,"quorum") && moreargs > 0) {
3558
            /* quorum <count> */
3559
            robj *o = c->argv[++j];
3560 3561
            if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
                badarg = j;
3562
                goto badfmt;
3563
            }
3564 3565
            ri->quorum = ll;
            changes++;
3566
        } else if (!strcasecmp(option,"rename-command") && moreargs > 1) {
3567
            /* rename-command <oldname> <newname> */
3568 3569 3570
            sds oldname = c->argv[++j]->ptr;
            sds newname = c->argv[++j]->ptr;

3571 3572 3573 3574 3575
            if ((sdslen(oldname) == 0) || (sdslen(newname) == 0)) {
                badarg = sdslen(newname) ? j-1 : j;
                goto badfmt;
            }

3576 3577 3578
            /* Remove any older renaming for this command. */
            dictDelete(ri->renamed_commands,oldname);

3579 3580 3581
            /* If the target name is the same as the source name there
             * is no need to add an entry mapping to itself. */
            if (!dictSdsKeyCaseCompare(NULL,oldname,newname)) {
3582 3583 3584 3585 3586
                oldname = sdsdup(oldname);
                newname = sdsdup(newname);
                dictAdd(ri->renamed_commands,oldname,newname);
            }
            changes++;
3587
        } else {
3588 3589
            addReplyErrorFormat(c,"Unknown option or number of arguments for "
                                  "SENTINEL SET '%s'", option);
3590 3591
            if (changes) sentinelFlushConfig();
            return;
A
antirez 已提交
3592
        }
3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609

        /* Log the event. */
        int numargs = j-old_j+1;
        switch(numargs) {
        case 2:
            sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",c->argv[old_j]->ptr,
                                                          c->argv[old_j+1]->ptr);
            break;
        case 3:
            sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",c->argv[old_j]->ptr,
                                                             c->argv[old_j+1]->ptr,
                                                             c->argv[old_j+2]->ptr);
            break;
        default:
            sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",c->argv[old_j]->ptr);
            break;
        }
A
antirez 已提交
3610 3611 3612 3613 3614 3615 3616 3617 3618
    }

    if (changes) sentinelFlushConfig();
    addReply(c,shared.ok);
    return;

badfmt: /* Bad format errors */
    if (changes) sentinelFlushConfig();
    addReplyErrorFormat(c,"Invalid argument '%s' for SENTINEL SET '%s'",
Z
zhaozhao.zz 已提交
3619
        (char*)c->argv[badarg]->ptr,option);
A
antirez 已提交
3620 3621
}

3622 3623 3624 3625 3626 3627
/* Our fake PUBLISH command: it is actually useful only to receive hello messages
 * from the other sentinel instances, and publishing to a channel other than
 * SENTINEL_HELLO_CHANNEL is forbidden.
 *
 * Because we have a Sentinel PUBLISH, the code to send hello messages is the same
 * for all the three kind of instances: masters, slaves, sentinels. */
3628
void sentinelPublishCommand(client *c) {
3629 3630 3631 3632 3633 3634 3635 3636
    if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) {
        addReplyError(c, "Only HELLO messages are accepted by Sentinel instances.");
        return;
    }
    sentinelProcessHelloMessage(c->argv[2]->ptr,sdslen(c->argv[2]->ptr));
    addReplyLongLong(c,1);
}

3637 3638 3639 3640
/* ===================== SENTINEL availability checks ======================= */

/* Is this instance down from our point of view? */
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
3641 3642
    mstime_t elapsed = 0;

3643 3644
    if (ri->link->act_ping_time)
        elapsed = mstime() - ri->link->act_ping_time;
3645 3646
    else if (ri->link->disconnected)
        elapsed = mstime() - ri->link->last_avail_time;
3647

3648
    /* Check if we are in need for a reconnection of one of the
3649 3650 3651
     * links, because we are detecting low activity.
     *
     * 1) Check if the command link seems connected, was connected not less
3652 3653
     *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
     *    pending ping for more than half the timeout. */
A
antirez 已提交
3654 3655 3656
    if (ri->link->cc &&
        (mstime() - ri->link->cc_conn_time) >
        SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
J
Jack Drogon 已提交
3657
        ri->link->act_ping_time != 0 && /* There is a pending ping... */
3658
        /* The pending ping is delayed, and we did not receive
3659
         * error replies as well. */
3660
        (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
A
antirez 已提交
3661
        (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
3662
    {
A
antirez 已提交
3663
        instanceLinkCloseConnection(ri->link,ri->link->cc);
3664 3665 3666 3667 3668 3669 3670
    }

    /* 2) Check if the pubsub link seems connected, was connected not less
     *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
     *    activity in the Pub/Sub channel for more than
     *    SENTINEL_PUBLISH_PERIOD * 3.
     */
A
antirez 已提交
3671 3672 3673 3674
    if (ri->link->pc &&
        (mstime() - ri->link->pc_conn_time) >
         SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
3675
    {
A
antirez 已提交
3676
        instanceLinkCloseConnection(ri->link,ri->link->pc);
3677 3678
    }

3679 3680
    /* Update the SDOWN flag. We believe the instance is SDOWN if:
     *
3681 3682 3683 3684 3685 3686 3687 3688 3689 3690
     * 1) It is not replying.
     * 2) We believe it is a master, it reports to be a slave for enough time
     *    to meet the down_after_period, plus enough time to get two times
     *    INFO report from the instance. */
    if (elapsed > ri->down_after_period ||
        (ri->flags & SRI_MASTER &&
         ri->role_reported == SRI_SLAVE &&
         mstime() - ri->role_reported_time >
          (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
    {
3691 3692
        /* Is subjectively down */
        if ((ri->flags & SRI_S_DOWN) == 0) {
A
antirez 已提交
3693
            sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
3694 3695 3696 3697 3698 3699
            ri->s_down_since_time = mstime();
            ri->flags |= SRI_S_DOWN;
        }
    } else {
        /* Is subjectively up */
        if (ri->flags & SRI_S_DOWN) {
A
antirez 已提交
3700
            sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
3701
            ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
3702 3703 3704 3705
        }
    }
}

3706 3707 3708 3709 3710 3711
/* Is this instance down according to the configured quorum?
 *
 * Note that ODOWN is a weak quorum, it only means that enough Sentinels
 * reported in a given time range that the instance was not reachable.
 * However messages can be delayed so there are no strong guarantees about
 * N instances agreeing at the same time about the down state. */
3712 3713 3714
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
3715
    unsigned int quorum = 0, odown = 0;
3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728 3729 3730 3731 3732 3733

    if (master->flags & SRI_S_DOWN) {
        /* Is down for enough sentinels? */
        quorum = 1; /* the current sentinel. */
        /* Count all the other sentinels. */
        di = dictGetIterator(master->sentinels);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);

            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        }
        dictReleaseIterator(di);
        if (quorum >= master->quorum) odown = 1;
    }

    /* Set the flag accordingly to the outcome. */
    if (odown) {
        if ((master->flags & SRI_O_DOWN) == 0) {
A
antirez 已提交
3734
            sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
3735 3736 3737 3738 3739 3740
                quorum, master->quorum);
            master->flags |= SRI_O_DOWN;
            master->o_down_since_time = mstime();
        }
    } else {
        if (master->flags & SRI_O_DOWN) {
A
antirez 已提交
3741
            sentinelEvent(LL_WARNING,"-odown",master,"%@");
3742 3743 3744 3745 3746 3747 3748 3749
            master->flags &= ~SRI_O_DOWN;
        }
    }
}

/* Receive the SENTINEL is-master-down-by-addr reply, see the
 * sentinelAskMasterStateToOtherSentinels() function for more information. */
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
3750
    sentinelRedisInstance *ri = privdata;
A
antirez 已提交
3751
    instanceLink *link = c->data;
3752 3753
    redisReply *r;

A
antirez 已提交
3754 3755
    if (!reply || !link) return;
    link->pending_commands--;
3756 3757 3758 3759 3760
    r = reply;

    /* Ignore every error or unexpected reply.
     * Note that if the command returns an error for any reason we'll
     * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
3761
    if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
3762
        r->element[0]->type == REDIS_REPLY_INTEGER &&
3763 3764
        r->element[1]->type == REDIS_REPLY_STRING &&
        r->element[2]->type == REDIS_REPLY_INTEGER)
3765 3766 3767 3768 3769 3770 3771
    {
        ri->last_master_down_reply_time = mstime();
        if (r->element[0]->integer == 1) {
            ri->flags |= SRI_MASTER_DOWN;
        } else {
            ri->flags &= ~SRI_MASTER_DOWN;
        }
3772 3773 3774 3775
        if (strcmp(r->element[1]->str,"*")) {
            /* If the runid in the reply is not "*" the Sentinel actually
             * replied with a vote. */
            sdsfree(ri->leader);
3776
            if ((long long)ri->leader_epoch != r->element[2]->integer)
A
antirez 已提交
3777
                serverLog(LL_WARNING,
3778 3779 3780
                    "%s voted for %s %llu", ri->name,
                    r->element[1]->str,
                    (unsigned long long) r->element[2]->integer);
3781 3782 3783
            ri->leader = sdsnew(r->element[1]->str);
            ri->leader_epoch = r->element[2]->integer;
        }
3784 3785 3786
    }
}

3787
/* If we think the master is down, we start sending
3788
 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
3789 3790
 * in order to get the replies that allow to reach the quorum
 * needed to mark the master in ODOWN state and trigger a failover. */
3791 3792
#define SENTINEL_ASK_FORCED (1<<0)
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 3803
    dictIterator *di;
    dictEntry *de;

    di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
        char port[32];
        int retval;

        /* If the master state from other sentinel is too old, we clear it. */
3804
        if (elapsed > SENTINEL_ASK_PERIOD*5) {
3805 3806 3807 3808 3809 3810 3811 3812 3813
            ri->flags &= ~SRI_MASTER_DOWN;
            sdsfree(ri->leader);
            ri->leader = NULL;
        }

        /* Only ask if master is down to other sentinels if:
         *
         * 1) We believe it is down, or there is a failover in progress.
         * 2) Sentinel is connected.
3814
         * 3) We did not receive the info within SENTINEL_ASK_PERIOD ms. */
3815
        if ((master->flags & SRI_S_DOWN) == 0) continue;
A
antirez 已提交
3816
        if (ri->link->disconnected) continue;
3817 3818
        if (!(flags & SENTINEL_ASK_FORCED) &&
            mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
3819 3820 3821 3822
            continue;

        /* Ask */
        ll2string(port,sizeof(port),master->addr->port);
A
antirez 已提交
3823
        retval = redisAsyncCommand(ri->link->cc,
3824
                    sentinelReceiveIsMasterDownReply, ri,
3825 3826
                    "%s is-master-down-by-addr %s %s %llu %s",
                    sentinelInstanceMapCommand(ri,"SENTINEL"),
3827 3828
                    master->addr->ip, port,
                    sentinel.current_epoch,
3829
                    (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
3830
                    sentinel.myid : "*");
3831
        if (retval == C_OK) ri->link->pending_commands++;
3832 3833 3834 3835 3836 3837
    }
    dictReleaseIterator(di);
}

/* =============================== FAILOVER ================================= */

3838 3839
/* Crash because of user request via SENTINEL simulate-failure command. */
void sentinelSimFailureCrash(void) {
A
antirez 已提交
3840
    serverLog(LL_WARNING,
3841 3842 3843 3844
        "Sentinel CRASH because of SENTINEL simulate-failure");
    exit(99);
}

3845
/* Vote for the sentinel with 'req_runid' or return the old vote if already
J
Jack Drogon 已提交
3846
 * voted for the specified 'req_epoch' or one greater.
3847 3848
 *
 * If a vote is not available returns NULL, otherwise return the Sentinel
3849
 * runid and populate the leader_epoch with the epoch of the vote. */
3850
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
A
antirez 已提交
3851
    if (req_epoch > sentinel.current_epoch) {
3852
        sentinel.current_epoch = req_epoch;
3853
        sentinelFlushConfig();
A
antirez 已提交
3854
        sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
A
antirez 已提交
3855 3856
            (unsigned long long) sentinel.current_epoch);
    }
3857

3858 3859
    if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
    {
3860 3861 3862
        sdsfree(master->leader);
        master->leader = sdsnew(req_runid);
        master->leader_epoch = sentinel.current_epoch;
3863
        sentinelFlushConfig();
A
antirez 已提交
3864
        sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
A
antirez 已提交
3865
            master->leader, (unsigned long long) master->leader_epoch);
3866 3867
        /* If we did not voted for ourselves, set the master failover start
         * time to now, in order to force a delay before we can start a
A
antirez 已提交
3868
         * failover for the same master. */
3869
        if (strcasecmp(master->leader,sentinel.myid))
3870
            master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
3871 3872
    }

3873
    *leader_epoch = master->leader_epoch;
3874
    return master->leader ? sdsnew(master->leader) : NULL;
3875 3876 3877 3878 3879 3880 3881
}

struct sentinelLeader {
    char *runid;
    unsigned long votes;
};

3882
/* Helper function for sentinelGetLeader, increment the counter
3883
 * relative to the specified runid. */
3884
int sentinelLeaderIncr(dict *counters, char *runid) {
O
oranagra 已提交
3885
    dictEntry *existing, *de;
3886 3887
    uint64_t oldval;

O
oranagra 已提交
3888 3889 3890 3891
    de = dictAddRaw(counters,runid,&existing);
    if (existing) {
        oldval = dictGetUnsignedIntegerVal(existing);
        dictSetUnsignedIntegerVal(existing,oldval+1);
3892
        return oldval+1;
3893
    } else {
A
antirez 已提交
3894
        serverAssert(de != NULL);
3895
        dictSetUnsignedIntegerVal(de,1);
3896
        return 1;
3897 3898 3899
    }
}

3900
/* Scan all the Sentinels attached to this master to check if there
3901
 * is a leader for the specified epoch.
3902
 *
3903 3904 3905
 * To be a leader for a given epoch, we should have the majority of
 * the Sentinels we know (ever seen since the last SENTINEL RESET) that
 * reported the same instance as leader for the same epoch. */
3906
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
3907 3908 3909 3910 3911 3912
    dict *counters;
    dictIterator *di;
    dictEntry *de;
    unsigned int voters = 0, voters_quorum;
    char *myvote;
    char *winner = NULL;
3913
    uint64_t leader_epoch;
3914
    uint64_t max_votes = 0;
3915

A
antirez 已提交
3916
    serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
3917 3918
    counters = dictCreate(&leaderVotesDictType,NULL);

A
antirez 已提交
3919
    voters = dictSize(master->sentinels)+1; /* All the other sentinels and me.*/
3920

3921 3922 3923 3924
    /* Count other sentinels votes */
    di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
3925 3926
        if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
            sentinelLeaderIncr(counters,ri->leader);
3927 3928 3929 3930 3931 3932
    }
    dictReleaseIterator(di);

    /* Check what's the winner. For the winner to win, it needs two conditions:
     * 1) Absolute majority between voters (50% + 1).
     * 2) And anyway at least master->quorum votes. */
3933 3934 3935
    di = dictGetIterator(counters);
    while((de = dictNext(di)) != NULL) {
        uint64_t votes = dictGetUnsignedIntegerVal(de);
3936

3937 3938 3939 3940 3941 3942
        if (votes > max_votes) {
            max_votes = votes;
            winner = dictGetKey(de);
        }
    }
    dictReleaseIterator(di);
3943

3944 3945 3946 3947 3948 3949
    /* Count this Sentinel vote:
     * if this Sentinel did not voted yet, either vote for the most
     * common voted sentinel, or for itself if no vote exists at all. */
    if (winner)
        myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
    else
3950
        myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);
3951 3952 3953 3954 3955 3956 3957

    if (myvote && leader_epoch == epoch) {
        uint64_t votes = sentinelLeaderIncr(counters,myvote);

        if (votes > max_votes) {
            max_votes = votes;
            winner = myvote;
3958 3959
        }
    }
3960 3961 3962 3963 3964

    voters_quorum = voters/2+1;
    if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
        winner = NULL;

3965 3966 3967 3968 3969 3970
    winner = winner ? sdsnew(winner) : NULL;
    sdsfree(myvote);
    dictRelease(counters);
    return winner;
}

3971 3972 3973 3974 3975 3976 3977
/* Send SLAVEOF to the specified instance, always followed by a
 * CONFIG REWRITE command in order to store the new configuration on disk
 * when possible (that is, if the Redis instance is recent enough to support
 * config rewriting, and if the server was started with a configuration file).
 *
 * If Host is NULL the function sends "SLAVEOF NO ONE".
 *
3978 3979
 * The command returns C_OK if the SLAVEOF command was accepted for
 * (later) delivery otherwise C_ERR. The command replies are just
3980 3981 3982
 * discarded. */
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
    char portstr[32];
3983
    int retval;
3984 3985 3986

    ll2string(portstr,sizeof(portstr),port);

3987 3988
    /* If host is NULL we send SLAVEOF NO ONE that will turn the instance
     * into a master. */
3989 3990 3991 3992 3993
    if (host == NULL) {
        host = "NO";
        memcpy(portstr,"ONE",4);
    }

3994 3995 3996
    /* In order to send SLAVEOF in a safe way, we send a transaction performing
     * the following tasks:
     * 1) Reconfigure the instance according to the specified host/port params.
J
Jack Drogon 已提交
3997
     * 2) Rewrite the configuration.
3998 3999 4000 4001 4002 4003
     * 3) Disconnect all clients (but this one sending the commnad) in order
     *    to trigger the ask-master-on-reconnection protocol for connected
     *    clients.
     *
     * Note that we don't check the replies returned by commands, since we
     * will observe instead the effects in the next INFO output. */
A
antirez 已提交
4004
    retval = redisAsyncCommand(ri->link->cc,
4005 4006
        sentinelDiscardReplyCallback, ri, "%s",
        sentinelInstanceMapCommand(ri,"MULTI"));
4007
    if (retval == C_ERR) return retval;
A
antirez 已提交
4008
    ri->link->pending_commands++;
4009

A
antirez 已提交
4010
    retval = redisAsyncCommand(ri->link->cc,
4011 4012 4013
        sentinelDiscardReplyCallback, ri, "%s %s %s",
        sentinelInstanceMapCommand(ri,"SLAVEOF"),
        host, portstr);
4014
    if (retval == C_ERR) return retval;
A
antirez 已提交
4015
    ri->link->pending_commands++;
4016

A
antirez 已提交
4017
    retval = redisAsyncCommand(ri->link->cc,
4018 4019
        sentinelDiscardReplyCallback, ri, "%s REWRITE",
        sentinelInstanceMapCommand(ri,"CONFIG"));
4020
    if (retval == C_ERR) return retval;
A
antirez 已提交
4021
    ri->link->pending_commands++;
4022 4023 4024 4025 4026 4027

    /* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12,
     * however sending it to an instance not understanding this command is not
     * an issue because CLIENT is variadic command, so Redis will not
     * recognized as a syntax error, and the transaction will not fail (but
     * only the unsupported command will fail). */
A
antirez 已提交
4028
    for (int type = 0; type < 2; type++) {
4029 4030 4031
        retval = redisAsyncCommand(ri->link->cc,
            sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",
            sentinelInstanceMapCommand(ri,"CLIENT"),
A
antirez 已提交
4032
            type == 0 ? "normal" : "pubsub");
4033 4034 4035
        if (retval == C_ERR) return retval;
        ri->link->pending_commands++;
    }
4036

A
antirez 已提交
4037
    retval = redisAsyncCommand(ri->link->cc,
4038 4039
        sentinelDiscardReplyCallback, ri, "%s",
        sentinelInstanceMapCommand(ri,"EXEC"));
4040
    if (retval == C_ERR) return retval;
A
antirez 已提交
4041
    ri->link->pending_commands++;
4042

4043
    return C_OK;
4044 4045
}

4046 4047
/* Setup the master state to start a failover. */
void sentinelStartFailover(sentinelRedisInstance *master) {
A
antirez 已提交
4048
    serverAssert(master->flags & SRI_MASTER);
4049

4050 4051 4052
    master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
    master->flags |= SRI_FAILOVER_IN_PROGRESS;
    master->failover_epoch = ++sentinel.current_epoch;
A
antirez 已提交
4053
    sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
A
antirez 已提交
4054
        (unsigned long long) sentinel.current_epoch);
A
antirez 已提交
4055
    sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
4056
    master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
4057 4058 4059
    master->failover_state_change_time = mstime();
}

4060 4061 4062
/* This function checks if there are the conditions to start the failover,
 * that is:
 *
4063 4064 4065
 * 1) Master must be in ODOWN condition.
 * 2) No failover already in progress.
 * 3) No failover already attempted recently.
4066
 *
4067
 * We still don't know if we'll win the election so it is possible that we
4068 4069 4070 4071
 * start the failover but that we'll not be able to act.
 *
 * Return non-zero if a failover was started. */
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
4072
    /* We can't failover if the master is not in O_DOWN state. */
4073
    if (!(master->flags & SRI_O_DOWN)) return 0;
4074

4075
    /* Failover already in progress? */
4076
    if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
4077

4078 4079
    /* Last failover attempt started too little time ago? */
    if (mstime() - master->failover_start_time <
4080 4081 4082 4083 4084 4085 4086 4087 4088 4089
        master->failover_timeout*2)
    {
        if (master->failover_delay_logged != master->failover_start_time) {
            time_t clock = (master->failover_start_time +
                            master->failover_timeout*2) / 1000;
            char ctimebuf[26];

            ctime_r(&clock,ctimebuf);
            ctimebuf[24] = '\0'; /* Remove newline. */
            master->failover_delay_logged = master->failover_start_time;
A
antirez 已提交
4090
            serverLog(LL_WARNING,
4091 4092 4093 4094 4095
                "Next failover delay: I will not start a failover before %s",
                ctimebuf);
        }
        return 0;
    }
4096

4097
    sentinelStartFailover(master);
4098
    return 1;
4099 4100 4101 4102 4103 4104
}

/* Select a suitable slave to promote. The current algorithm only uses
 * the following parameters:
 *
 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
4105 4106
 * 2) Last time the slave replied to ping no more than 5 times the PING period.
 * 3) info_refresh not older than 3 times the INFO refresh period.
4107 4108
 * 4) master_link_down_time no more than:
 *     (now - master->s_down_since_time) + (master->down_after_period * 10).
4109 4110 4111 4112 4113
 *    Basically since the master is down from our POV, the slave reports
 *    to be disconnected no more than 10 times the configured down-after-period.
 *    This is pretty much black magic but the idea is, the master was not
 *    available so the slave may be lagging, but not over a certain time.
 *    Anyway we'll select the best slave according to replication offset.
G
guiquanz 已提交
4114
 * 5) Slave priority can't be zero, otherwise the slave is discarded.
4115 4116
 *
 * Among all the slaves matching the above conditions we select the slave
4117 4118 4119 4120 4121 4122 4123 4124
 * with, in order of sorting key:
 *
 * - lower slave_priority.
 * - bigger processed replication offset.
 * - lexicographically smaller runid.
 *
 * Basically if runid is the same, the slave that processed more commands
 * from the master is selected.
4125 4126 4127 4128 4129
 *
 * The function returns the pointer to the selected slave, otherwise
 * NULL if no suitable slave was found.
 */

4130 4131 4132
/* Helper for sentinelSelectSlave(). This is used by qsort() in order to
 * sort suitable slaves in a "better first" order, to take the first of
 * the list. */
4133 4134 4135
int compareSlavesForPromotion(const void *a, const void *b) {
    sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
                          **sb = (sentinelRedisInstance **)b;
4136 4137
    char *sa_runid, *sb_runid;

4138 4139
    if ((*sa)->slave_priority != (*sb)->slave_priority)
        return (*sa)->slave_priority - (*sb)->slave_priority;
4140

4141
    /* If priority is the same, select the slave with greater replication
4142
     * offset (processed more data from the master). */
4143 4144 4145
    if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
        return -1; /* a < b */
    } else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
A
andyli 已提交
4146
        return 1; /* a > b */
4147 4148 4149 4150
    }

    /* If the replication offset is the same select the slave with that has
     * the lexicographically smaller runid. Note that we try to handle runid
4151 4152 4153 4154 4155 4156 4157 4158
     * == NULL as there are old Redis versions that don't publish runid in
     * INFO. A NULL runid is considered bigger than any other runid. */
    sa_runid = (*sa)->runid;
    sb_runid = (*sb)->runid;
    if (sa_runid == NULL && sb_runid == NULL) return 0;
    else if (sa_runid == NULL) return 1;  /* a > b */
    else if (sb_runid == NULL) return -1; /* a < b */
    return strcasecmp(sa_runid, sb_runid);
4159 4160 4161 4162 4163 4164 4165 4166 4167
}

sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
    sentinelRedisInstance **instance =
        zmalloc(sizeof(instance[0])*dictSize(master->slaves));
    sentinelRedisInstance *selected = NULL;
    int instances = 0;
    dictIterator *di;
    dictEntry *de;
4168
    mstime_t max_master_down_time = 0;
4169

4170 4171 4172
    if (master->flags & SRI_S_DOWN)
        max_master_down_time += mstime() - master->s_down_since_time;
    max_master_down_time += master->down_after_period * 10;
4173 4174 4175 4176

    di = dictGetIterator(master->slaves);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);
4177
        mstime_t info_validity_time;
4178

A
antirez 已提交
4179 4180 4181
        if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
        if (slave->link->disconnected) continue;
        if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;
4182
        if (slave->slave_priority == 0) continue;
4183 4184 4185 4186

        /* If the master is in SDOWN state we get INFO for slaves every second.
         * Otherwise we get it with the usual period so we need to account for
         * a larger delay. */
4187 4188 4189 4190 4191
        if (master->flags & SRI_S_DOWN)
            info_validity_time = SENTINEL_PING_PERIOD*5;
        else
            info_validity_time = SENTINEL_INFO_PERIOD*3;
        if (mstime() - slave->info_refresh > info_validity_time) continue;
4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206
        if (slave->master_link_down_time > max_master_down_time) continue;
        instance[instances++] = slave;
    }
    dictReleaseIterator(di);
    if (instances) {
        qsort(instance,instances,sizeof(sentinelRedisInstance*),
            compareSlavesForPromotion);
        selected = instance[0];
    }
    zfree(instance);
    return selected;
}

/* ---------------- Failover state machine implementation ------------------- */
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
4207 4208 4209 4210 4211
    char *leader;
    int isleader;

    /* Check if we are the leader for the failover epoch. */
    leader = sentinelGetLeader(ri, ri->failover_epoch);
4212
    isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
4213 4214
    sdsfree(leader);

A
antirez 已提交
4215 4216 4217
    /* If I'm not the leader, and it is not a forced failover via
     * SENTINEL FAILOVER, then I can't continue with the failover. */
    if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
4218 4219 4220 4221 4222 4223
        int election_timeout = SENTINEL_ELECTION_TIMEOUT;

        /* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
         * and the configured failover timeout. */
        if (election_timeout > ri->failover_timeout)
            election_timeout = ri->failover_timeout;
4224
        /* Abort the failover if I'm not the leader after some time. */
4225
        if (mstime() - ri->failover_start_time > election_timeout) {
A
antirez 已提交
4226
            sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
4227 4228 4229 4230
            sentinelAbortFailover(ri);
        }
        return;
    }
A
antirez 已提交
4231
    sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");
4232 4233
    if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
        sentinelSimFailureCrash();
4234 4235
    ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
    ri->failover_state_change_time = mstime();
A
antirez 已提交
4236
    sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
4237 4238 4239 4240 4241
}

void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
    sentinelRedisInstance *slave = sentinelSelectSlave(ri);

4242 4243
    /* We don't handle the timeout in this state as the function aborts
     * the failover or go forward in the next state. */
4244
    if (slave == NULL) {
A
antirez 已提交
4245
        sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
4246
        sentinelAbortFailover(ri);
4247
    } else {
A
antirez 已提交
4248
        sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
4249 4250 4251 4252
        slave->flags |= SRI_PROMOTED;
        ri->promoted_slave = slave;
        ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
        ri->failover_state_change_time = mstime();
A
antirez 已提交
4253
        sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
4254 4255 4256 4257 4258 4259 4260
            slave, "%@");
    }
}

void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
    int retval;

4261 4262 4263
    /* We can't send the command to the promoted slave if it is now
     * disconnected. Retry again and again with this state until the timeout
     * is reached, then abort the failover. */
4264
    if (ri->promoted_slave->link->disconnected) {
4265
        if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
A
antirez 已提交
4266
            sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
4267 4268 4269 4270
            sentinelAbortFailover(ri);
        }
        return;
    }
4271 4272 4273 4274 4275

    /* Send SLAVEOF NO ONE command to turn the slave into a master.
     * We actually register a generic callback for this command as we don't
     * really care about the reply. We check if it worked indirectly observing
     * if INFO returns a different role (master instead of slave). */
4276
    retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
4277
    if (retval != C_OK) return;
A
antirez 已提交
4278
    sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
4279 4280 4281 4282 4283 4284 4285 4286
        ri->promoted_slave,"%@");
    ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
    ri->failover_state_change_time = mstime();
}

/* We actually wait for promotion indirectly checking with INFO when the
 * slave turns into a master. */
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
4287 4288 4289
    /* Just handle the timeout. Switching to the next state is handled
     * by the function parsing the INFO command of the promoted slave. */
    if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
A
antirez 已提交
4290
        sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
4291
        sentinelAbortFailover(ri);
4292 4293 4294 4295 4296 4297 4298 4299 4300 4301 4302 4303 4304 4305 4306 4307 4308 4309 4310 4311 4312 4313 4314 4315 4316 4317 4318 4319 4320 4321
    }
}

void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
    int not_reconfigured = 0, timeout = 0;
    dictIterator *di;
    dictEntry *de;
    mstime_t elapsed = mstime() - master->failover_state_change_time;

    /* We can't consider failover finished if the promoted slave is
     * not reachable. */
    if (master->promoted_slave == NULL ||
        master->promoted_slave->flags & SRI_S_DOWN) return;

    /* The failover terminates once all the reachable slaves are properly
     * configured. */
    di = dictGetIterator(master->slaves);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);

        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
        if (slave->flags & SRI_S_DOWN) continue;
        not_reconfigured++;
    }
    dictReleaseIterator(di);

    /* Force end of failover on timeout. */
    if (elapsed > master->failover_timeout) {
        not_reconfigured = 0;
        timeout = 1;
A
antirez 已提交
4322
        sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
4323 4324 4325
    }

    if (not_reconfigured == 0) {
A
antirez 已提交
4326
        sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
4327 4328 4329 4330 4331 4332 4333
        master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
        master->failover_state_change_time = mstime();
    }

    /* If I'm the leader it is a good idea to send a best effort SLAVEOF
     * command to all the slaves still not reconfigured to replicate with
     * the new master. */
4334
    if (timeout) {
4335 4336 4337 4338 4339 4340 4341 4342
        dictIterator *di;
        dictEntry *de;

        di = dictGetIterator(master->slaves);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *slave = dictGetVal(de);
            int retval;

4343
            if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
A
antirez 已提交
4344
            if (slave->link->disconnected) continue;
4345

4346
            retval = sentinelSendSlaveOf(slave,
4347
                    master->promoted_slave->addr->ip,
4348
                    master->promoted_slave->addr->port);
4349
            if (retval == C_OK) {
A
antirez 已提交
4350
                sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
4351 4352 4353 4354 4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367 4368 4369 4370 4371 4372 4373 4374 4375 4376 4377 4378 4379 4380 4381 4382 4383
                slave->flags |= SRI_RECONF_SENT;
            }
        }
        dictReleaseIterator(di);
    }
}

/* Send SLAVE OF <new master address> to all the remaining slaves that
 * still don't appear to have the configuration updated. */
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    int in_progress = 0;

    di = dictGetIterator(master->slaves);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *slave = dictGetVal(de);

        if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
            in_progress++;
    }
    dictReleaseIterator(di);

    di = dictGetIterator(master->slaves);
    while(in_progress < master->parallel_syncs &&
          (de = dictNext(di)) != NULL)
    {
        sentinelRedisInstance *slave = dictGetVal(de);
        int retval;

        /* Skip the promoted slave, and already configured slaves. */
        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;

A
antirez 已提交
4384 4385 4386 4387
        /* If too much time elapsed without the slave moving forward to
         * the next state, consider it reconfigured even if it is not.
         * Sentinels will detect the slave as misconfigured and fix its
         * configuration later. */
4388 4389
        if ((slave->flags & SRI_RECONF_SENT) &&
            (mstime() - slave->slave_reconf_sent_time) >
4390
            SENTINEL_SLAVE_RECONF_TIMEOUT)
4391
        {
A
antirez 已提交
4392
            sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
4393
            slave->flags &= ~SRI_RECONF_SENT;
A
antirez 已提交
4394
            slave->flags |= SRI_RECONF_DONE;
4395 4396 4397 4398
        }

        /* Nothing to do for instances that are disconnected or already
         * in RECONF_SENT state. */
A
antirez 已提交
4399 4400
        if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
        if (slave->link->disconnected) continue;
4401 4402

        /* Send SLAVEOF <new master>. */
4403
        retval = sentinelSendSlaveOf(slave,
4404
                master->promoted_slave->addr->ip,
4405
                master->promoted_slave->addr->port);
4406
        if (retval == C_OK) {
4407 4408
            slave->flags |= SRI_RECONF_SENT;
            slave->slave_reconf_sent_time = mstime();
A
antirez 已提交
4409
            sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
4410 4411 4412 4413
            in_progress++;
        }
    }
    dictReleaseIterator(di);
4414 4415

    /* Check if all the slaves are reconfigured and handle timeout. */
4416 4417 4418 4419 4420
    sentinelFailoverDetectEnd(master);
}

/* This function is called when the slave is in
 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
4421
 * to remove it from the master table and add the promoted slave instead. */
4422
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
4423 4424 4425
    sentinelRedisInstance *ref = master->promoted_slave ?
                                 master->promoted_slave : master;

A
antirez 已提交
4426
    sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
4427 4428 4429 4430
        master->name, master->addr->ip, master->addr->port,
        ref->addr->ip, ref->addr->port);

    sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
4431 4432 4433
}

void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
A
antirez 已提交
4434
    serverAssert(ri->flags & SRI_MASTER);
4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445 4446 4447 4448 4449 4450 4451 4452 4453 4454 4455 4456

    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

    switch(ri->failover_state) {
        case SENTINEL_FAILOVER_STATE_WAIT_START:
            sentinelFailoverWaitStart(ri);
            break;
        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
            sentinelFailoverSelectSlave(ri);
            break;
        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
            sentinelFailoverSendSlaveOfNoOne(ri);
            break;
        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
            sentinelFailoverWaitPromotion(ri);
            break;
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
            sentinelFailoverReconfNextSlave(ri);
            break;
    }
}

4457 4458 4459 4460
/* Abort a failover in progress:
 *
 * This function can only be called before the promoted slave acknowledged
 * the slave -> master switch. Otherwise the failover can't be aborted and
4461
 * will reach its end (possibly by timeout). */
4462
void sentinelAbortFailover(sentinelRedisInstance *ri) {
A
antirez 已提交
4463 4464
    serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
    serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);
4465

4466
    ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_FORCE_FAILOVER);
4467 4468
    ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
    ri->failover_state_change_time = mstime();
4469 4470 4471 4472 4473 4474
    if (ri->promoted_slave) {
        ri->promoted_slave->flags &= ~SRI_PROMOTED;
        ri->promoted_slave = NULL;
    }
}

4475 4476 4477 4478 4479 4480 4481 4482 4483 4484
/* ======================== SENTINEL timer handler ==========================
 * This is the "main" our Sentinel, being sentinel completely non blocking
 * in design. The function is called every second.
 * -------------------------------------------------------------------------- */

/* Perform scheduled operations for the specified Redis instance. */
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    /* ========== MONITORING HALF ============ */
    /* Every kind of instance */
    sentinelReconnectInstance(ri);
4485
    sentinelSendPeriodicCommands(ri);
4486 4487 4488 4489 4490 4491 4492 4493

    /* ============== ACTING HALF ============= */
    /* We don't proceed with the acting half if we are in TILT mode.
     * TILT happens when we find something odd with the time, like a
     * sudden change in the clock. */
    if (sentinel.tilt) {
        if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
        sentinel.tilt = 0;
A
antirez 已提交
4494
        sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
4495 4496 4497 4498 4499 4500 4501 4502 4503 4504 4505 4506 4507
    }

    /* Every kind of instance */
    sentinelCheckSubjectivelyDown(ri);

    /* Masters and slaves */
    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
        /* Nothing so far. */
    }

    /* Only masters */
    if (ri->flags & SRI_MASTER) {
        sentinelCheckObjectivelyDown(ri);
4508 4509
        if (sentinelStartFailoverIfNeeded(ri))
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
4510
        sentinelFailoverStateMachine(ri);
4511
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
4512 4513 4514 4515 4516 4517 4518 4519 4520 4521 4522 4523 4524 4525 4526 4527 4528 4529 4530 4531 4532 4533 4534 4535 4536 4537 4538 4539 4540 4541 4542 4543 4544 4545 4546 4547 4548 4549 4550
    }
}

/* Perform scheduled operations for all the instances in the dictionary.
 * Recursively call the function against dictionaries of slaves. */
void sentinelHandleDictOfRedisInstances(dict *instances) {
    dictIterator *di;
    dictEntry *de;
    sentinelRedisInstance *switch_to_promoted = NULL;

    /* There are a number of things we need to perform against every master. */
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

        sentinelHandleRedisInstance(ri);
        if (ri->flags & SRI_MASTER) {
            sentinelHandleDictOfRedisInstances(ri->slaves);
            sentinelHandleDictOfRedisInstances(ri->sentinels);
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                switch_to_promoted = ri;
            }
        }
    }
    if (switch_to_promoted)
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);
}

/* This function checks if we need to enter the TITL mode.
 *
 * The TILT mode is entered if we detect that between two invocations of the
 * timer interrupt, a negative amount of time, or too much time has passed.
 * Note that we expect that more or less just 100 milliseconds will pass
 * if everything is fine. However we'll see a negative number or a
 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
 * following conditions happen:
 *
 * 1) The Sentiel process for some time is blocked, for every kind of
G
guiquanz 已提交
4551
 * random reason: the load is huge, the computer was frozen for some time
4552 4553 4554 4555 4556
 * in I/O or alike, the process was stopped by a signal. Everything.
 * 2) The system clock was altered significantly.
 *
 * Under both this conditions we'll see everything as timed out and failing
 * without good reasons. Instead we enter the TILT mode and wait
G
guiquanz 已提交
4557
 * for SENTINEL_TILT_PERIOD to elapse before starting to act again.
4558 4559 4560 4561 4562 4563 4564 4565 4566
 *
 * During TILT time we still collect information, we just do not act. */
void sentinelCheckTiltCondition(void) {
    mstime_t now = mstime();
    mstime_t delta = now - sentinel.previous_time;

    if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
        sentinel.tilt = 1;
        sentinel.tilt_start_time = mstime();
A
antirez 已提交
4567
        sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
4568 4569 4570 4571 4572 4573 4574
    }
    sentinel.previous_time = mstime();
}

void sentinelTimer(void) {
    sentinelCheckTiltCondition();
    sentinelHandleDictOfRedisInstances(sentinel.masters);
4575 4576 4577
    sentinelRunPendingScripts();
    sentinelCollectTerminatedScripts();
    sentinelKillTimedoutScripts();
A
antirez 已提交
4578 4579 4580 4581 4582 4583 4584

    /* We continuously change the frequency of the Redis "timer interrupt"
     * in order to desynchronize every Sentinel from every other.
     * This non-determinism avoids that Sentinels started at the same time
     * exactly continue to stay synchronized asking to be voted at the
     * same time again and again (resulting in nobody likely winning the
     * election because of split brain voting). */
4585
    server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
4586 4587
}