package redis import ( "context" "fmt" "math/rand" "strconv" "sync" "time" "github.com/eolinker/goku/common/redis/internal/pool" "github.com/eolinker/goku/common/redis/internal/hashtag" "github.com/eolinker/goku/common/redis/internal/consistenthash" "github.com/eolinker/goku/common/redis/internal" ) // RingOptions are used to configure a ring client and should be // passed to NewRing. type SentinelRingOptions struct { // Map of name => host:port addresses of ring shards. Addrs []string Masters []string // Frequency of PING commands sent to check shards availability. // Shard is considered down after 3 subsequent failed checks. HeartbeatFrequency time.Duration // Hash function used in consistent hash. // Default is crc32.ChecksumIEEE. Hash Hash // Number of replicas in consistent hash. // Default is 100 replicas. // // Higher number of replicas will provide less deviation, that is keys will be // distributed to nodes more evenly. // // Following is deviation for common nreplicas: // -------------------------------------------------------- // | nreplicas | standard error | 99% confidence interval | // | 10 | 0.3152 | (0.37, 1.98) | // | 100 | 0.0997 | (0.76, 1.28) | // | 1000 | 0.0316 | (0.92, 1.09) | // -------------------------------------------------------- // // See https://arxiv.org/abs/1406.2294 for reference HashReplicas int // Following options are copied from Options struct. OnConnect func(*Conn) error DB int Password string MaxRetries int MinRetryBackoff time.Duration MaxRetryBackoff time.Duration DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration PoolSize int MinIdleConns int MaxConnAge time.Duration PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration } func (opt *SentinelRingOptions) init() { if opt.HeartbeatFrequency == 0 { opt.HeartbeatFrequency = 500 * time.Millisecond } if opt.HashReplicas == 0 { opt.HashReplicas = 100 } switch opt.MinRetryBackoff { case -1: opt.MinRetryBackoff = 0 case 0: opt.MinRetryBackoff = 8 * time.Millisecond } switch opt.MaxRetryBackoff { case -1: opt.MaxRetryBackoff = 0 case 0: opt.MaxRetryBackoff = 512 * time.Millisecond } } func (opt *SentinelRingOptions) clientOptions(masterName string) *FailoverOptions { return &FailoverOptions{ MasterName: masterName, SentinelAddrs: opt.Addrs, DB: opt.DB, Password: opt.Password, DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, PoolSize: opt.PoolSize, MinIdleConns: opt.MinIdleConns, MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, } } //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ type sentinelRingShards struct { opt *SentinelRingOptions mu sync.RWMutex hash *consistenthash.Map shards map[string]*ringShard // read only list []*ringShard // read only len int closed bool } func newSentinelRingShards(opt *SentinelRingOptions) *sentinelRingShards { return &sentinelRingShards{ opt: opt, hash: newSentinelConsistentHash(opt), shards: make(map[string]*ringShard), } } func (c *sentinelRingShards) Add(name string, cl *Client) { shard := &ringShard{Client: cl} c.hash.Add(name) c.shards[name] = shard c.list = append(c.list, shard) } func (c *sentinelRingShards) List() []*ringShard { c.mu.RLock() list := c.list c.mu.RUnlock() return list } func (c *sentinelRingShards) Hash(key string) string { c.mu.RLock() hash := c.hash.Get(key) c.mu.RUnlock() return hash } func (c *sentinelRingShards) GetByKey(key string) (*ringShard, error) { key = hashtag.Key(key) c.mu.RLock() if c.closed { c.mu.RUnlock() return nil, pool.ErrClosed } hash := c.hash.Get(key) if hash == "" { c.mu.RUnlock() return nil, errRingShardsDown } shard := c.shards[hash] c.mu.RUnlock() return shard, nil } func (c *sentinelRingShards) GetByHash(name string) (*ringShard, error) { if name == "" { return c.Random() } c.mu.RLock() shard := c.shards[name] c.mu.RUnlock() return shard, nil } func (c *sentinelRingShards) Random() (*ringShard, error) { return c.GetByKey(strconv.Itoa(rand.Int())) } // heartbeat monitors state of each shard in the ring. func (c *sentinelRingShards) Heartbeat(frequency time.Duration) { ticker := time.NewTicker(frequency) defer ticker.Stop() for range ticker.C { var rebalance bool c.mu.RLock() if c.closed { c.mu.RUnlock() break } shards := c.list c.mu.RUnlock() for _, shard := range shards { err := shard.Client.Ping().Err() if shard.Vote(err == nil || err == pool.ErrPoolTimeout) { internal.Logf("ring shard state changed: %s", shard) rebalance = true } } if rebalance { c.rebalance() } } } // rebalance removes dead shards from the Ring. func (c *sentinelRingShards) rebalance() { hash := newSentinelConsistentHash(c.opt) var shardsNum int for name, shard := range c.shards { if shard.IsUp() { hash.Add(name) shardsNum++ } } c.mu.Lock() c.hash = hash c.len = shardsNum c.mu.Unlock() } func (c *sentinelRingShards) Len() int { c.mu.RLock() l := c.len c.mu.RUnlock() return l } func (c *sentinelRingShards) Close() error { c.mu.Lock() defer c.mu.Unlock() if c.closed { return nil } c.closed = true var firstErr error for _, shard := range c.shards { if err := shard.Client.Close(); err != nil && firstErr == nil { firstErr = err } } c.hash = nil c.shards = nil c.list = nil return firstErr } //------------------------------------------------------------------------------ // Ring is a Redis client that uses consistent hashing to distribute // keys across multiple Redis servers (shards). It's safe for // concurrent use by multiple goroutines. // // Ring monitors the state of each shard and removes dead shards from // the ring. When a shard comes online it is added back to the ring. This // gives you maximum availability and partition tolerance, but no // consistency between different shards or even clients. Each client // uses shards that are available to the client and does not do any // coordination when shard state is changed. // // Ring should be used when you need multiple Redis servers for caching // and can tolerate losing data when one of the servers dies. // Otherwise you should use Redis Cluster. type SentinelRing struct { cmdable ctx context.Context opt *SentinelRingOptions shards *sentinelRingShards cmdsInfoCache *cmdsInfoCache process func(Cmder) error processPipeline func([]Cmder) error sentinelClients []*SentinelClient } func NewSentinelRing(opt *SentinelRingOptions) *SentinelRing { opt.init() ring := &SentinelRing{ opt: opt, shards: newSentinelRingShards(opt), } ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo) ring.process = ring.defaultProcess ring.processPipeline = ring.defaultProcessPipeline ring.cmdable.setProcessor(ring.Process) for _, masterName := range opt.Masters { clopt := opt.clientOptions(masterName) ring.shards.Add(masterName, NewFailoverClient(clopt)) } ring.sentinelClients = make([]*SentinelClient, 0, len(opt.Addrs)) for _, addr := range opt.Addrs { sentinel := NewSentinelClient(&Options{ Addr: addr, MaxRetries: opt.MaxRetries, DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, PoolSize: 1, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, }) ring.sentinelClients = append(ring.sentinelClients, sentinel) } go ring.shards.Heartbeat(opt.HeartbeatFrequency) return ring } func (c *SentinelRing) Context() context.Context { if c.ctx != nil { return c.ctx } return context.Background() } func (c *SentinelRing) WithContext(ctx context.Context) *SentinelRing { if ctx == nil { panic("nil context") } c2 := c.copy() c2.ctx = ctx return c2 } func (c *SentinelRing) copy() *SentinelRing { cp := *c return &cp } // Options returns read-only Options that were used to create the client. func (c *SentinelRing) Options() *SentinelRingOptions { return c.opt } func (c *SentinelRing) retryBackoff(attempt int) time.Duration { return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) } // PoolStats returns accumulated connection pool stats. func (c *SentinelRing) PoolStats() *PoolStats { shards := c.shards.List() var acc PoolStats for _, shard := range shards { s := shard.Client.connPool.Stats() acc.Hits += s.Hits acc.Misses += s.Misses acc.Timeouts += s.Timeouts acc.TotalConns += s.TotalConns acc.IdleConns += s.IdleConns } return &acc } // Len returns the current number of shards in the ring. func (c *SentinelRing) Len() int { return c.shards.Len() } // Subscribe subscribes the client to the specified channels. func (c *SentinelRing) Subscribe(channels ...string) *PubSub { if len(channels) == 0 { panic("at least one channel is required") } shard, err := c.shards.GetByKey(channels[0]) if err != nil { // TODO: return PubSub with sticky error panic(err) } return shard.Client.Subscribe(channels...) } // PSubscribe subscribes the client to the given patterns. func (c *SentinelRing) PSubscribe(channels ...string) *PubSub { if len(channels) == 0 { panic("at least one channel is required") } shard, err := c.shards.GetByKey(channels[0]) if err != nil { // TODO: return PubSub with sticky error panic(err) } return shard.Client.PSubscribe(channels...) } // //// ForEachShard concurrently calls the fn on each live shard in the ring. //// It returns the first error if any. func (c *SentinelRing) ForEachAddr(fn func(addr string) error) error { for _, masterName := range c.opt.Masters { for _, sentinel := range c.sentinelClients { masterAddrs, err := sentinel.GetMasterAddrByName(masterName).Result() if err != nil { internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s", masterName, err) continue } fn(fmt.Sprintf("%s:%s", masterAddrs[0], masterAddrs[1])) break } } return nil } // ForEachShard concurrently calls the fn on each live shard in the sentinelRing. // It returns the first error if any. func (c *SentinelRing) ForEachShard(fn func(client *Client) error) error { shards := c.shards.List() var wg sync.WaitGroup errCh := make(chan error, 1) for _, shard := range shards { if shard.IsDown() { continue } wg.Add(1) go func(shard *ringShard) { defer wg.Done() err := fn(shard.Client) if err != nil { select { case errCh <- err: default: } } }(shard) } wg.Wait() select { case err := <-errCh: return err default: return nil } } func (c *SentinelRing) cmdsInfo() (map[string]*CommandInfo, error) { shards := c.shards.List() firstErr := errRingShardsDown for _, shard := range shards { cmdsInfo, err := shard.Client.Command().Result() if err == nil { return cmdsInfo, nil } if firstErr == nil { firstErr = err } } return nil, firstErr } func (c *SentinelRing) cmdInfo(name string) *CommandInfo { cmdsInfo, err := c.cmdsInfoCache.Get() if err != nil { return nil } info := cmdsInfo[name] if info == nil { internal.Logf("info for cmd=%s not found", name) } return info } func (c *SentinelRing) cmdShard(cmd Cmder) (*ringShard, error) { cmdInfo := c.cmdInfo(cmd.Name()) pos := cmdFirstKeyPos(cmd, cmdInfo) if pos == 0 { return c.shards.Random() } firstKey := cmd.stringArg(pos) return c.shards.GetByKey(firstKey) } // Do creates a Cmd from the args and processes the cmd. func (c *SentinelRing) Do(args ...interface{}) *Cmd { cmd := NewCmd(args...) c.Process(cmd) return cmd } func (c *SentinelRing) WrapProcess( fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, ) { c.process = fn(c.process) } func (c *SentinelRing) Process(cmd Cmder) error { return c.process(cmd) } func (c *SentinelRing) defaultProcess(cmd Cmder) error { for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { if attempt > 0 { time.Sleep(c.retryBackoff(attempt)) } shard, err := c.cmdShard(cmd) if err != nil { cmd.setErr(err) return err } err = shard.Client.Process(cmd) if err == nil { return nil } if !internal.IsRetryableError(err, cmd.readTimeout() == nil) { return err } } return cmd.Err() } func (c *SentinelRing) Pipeline() Pipeliner { pipe := Pipeline{ exec: c.processPipeline, } pipe.cmdable.setProcessor(pipe.Process) return &pipe } func (c *SentinelRing) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().Pipelined(fn) } func (c *SentinelRing) WrapProcessPipeline( fn func(oldProcess func([]Cmder) error) func([]Cmder) error, ) { c.processPipeline = fn(c.processPipeline) } func (c *SentinelRing) defaultProcessPipeline(cmds []Cmder) error { cmdsMap := make(map[string][]Cmder) for _, cmd := range cmds { cmdInfo := c.cmdInfo(cmd.Name()) hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo)) if hash != "" { hash = c.shards.Hash(hashtag.Key(hash)) } cmdsMap[hash] = append(cmdsMap[hash], cmd) } for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { if attempt > 0 { time.Sleep(c.retryBackoff(attempt)) } var mu sync.Mutex var failedCmdsMap map[string][]Cmder var wg sync.WaitGroup for hash, cmds := range cmdsMap { wg.Add(1) go func(hash string, cmds []Cmder) { defer wg.Done() shard, err := c.shards.GetByHash(hash) if err != nil { setCmdsErr(cmds, err) return } cn, err := shard.Client.getConn() if err != nil { setCmdsErr(cmds, err) return } canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds) shard.Client.releaseConnStrict(cn, err) if canRetry && internal.IsRetryableError(err, true) { mu.Lock() if failedCmdsMap == nil { failedCmdsMap = make(map[string][]Cmder) } failedCmdsMap[hash] = cmds mu.Unlock() } }(hash, cmds) } wg.Wait() if len(failedCmdsMap) == 0 { break } cmdsMap = failedCmdsMap } return cmdsFirstErr(cmds) } func (c *SentinelRing) TxPipeline() Pipeliner { panic("not implemented") } func (c *SentinelRing) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { panic("not implemented") } // Close closes the ring client, releasing any open resources. // // It is rare to Close a Ring, as the Ring is meant to be long-lived // and shared between many goroutines. func (c *SentinelRing) Close() error { return c.shards.Close() } func newSentinelConsistentHash(opt *SentinelRingOptions) *consistenthash.Map { return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash)) }