redis-trib.rb 33.9 KB
Newer Older
1 2
#!/usr/bin/env ruby

3
# TODO (temporary here, we'll move this into the Github issues once
G
guiquanz 已提交
4
#       redis-trib initial implementation is completed).
5
#
6 7 8 9 10 11 12 13 14 15 16 17 18 19
# - Make sure that if the rehashing fails in the middle redis-trib will try
#   to recover.
# - When redis-trib performs a cluster check, if it detects a slot move in
#   progress it should prompt the user to continue the move from where it
#   stopped.
# - Gracefully handle Ctrl+C in move_slot to prompt the user if really stop
#   while rehashing, and performing the best cleanup possible if the user
#   forces the quit.
# - When doing "fix" set a global Fix to true, and prompt the user to
#   fix the problem if automatically fixable every time there is something
#   to fix. For instance:
#   1) If there is a node that pretend to receive a slot, or to migrate a
#      slot, but has no entries in that slot, fix it.
#   2) If there is a node having keys in slots that are not owned by it
G
guiquanz 已提交
20
#      fix this condition moving the entries in the same node.
21 22
#   3) Perform more possibly slow tests about the state of the cluster.
#   4) When aborted slot migration is detected, fix it.
23

24 25 26
require 'rubygems'
require 'redis'

27
ClusterHashSlots = 16384
28

29
def xputs(s)
30 31 32 33 34 35 36 37 38 39 40 41 42
    case s[0..2]
    when ">>>"
        color="29;1"
    when "[ER"
        color="31;1"
    when "[OK"
        color="32"
    when "[FA","***"
        color="33"
    else
        color=nil
    end

43
    color = nil if ENV['TERM'] != "xterm"
44 45 46 47
    print "\033[#{color}m" if color
    print s
    print "\033[0m" if color
    print "\n"
48
end
49

50 51 52
class ClusterNode
    def initialize(addr)
        s = addr.split(":")
53
        if s.length != 2
A
antirez 已提交
54
            puts "Invalid node name #{addr}"
55 56
            exit 1
        end
57
        @r = nil
58 59 60 61
        @info = {}
        @info[:host] = s[0]
        @info[:port] = s[1]
        @info[:slots] = {}
62 63
        @info[:migrating] = {}
        @info[:importing] = {}
64
        @info[:replicate] = false
65
        @dirty = false # True if we need to flush slots info into node.
66
        @friends = []
67 68
    end

69 70 71 72 73
    def friends
        @friends
    end

    def slots 
74
        @info[:slots]
75 76
    end

77 78 79 80
    def has_flag?(flag)
        @info[:flags].index(flag)
    end

81
    def to_s
82
        "#{@info[:host]}:#{@info[:port]}"
83 84
    end

85
    def connect(o={})
86
        return if @r
87 88
        print "Connecting to node #{self}: "
        STDOUT.flush
89
        begin
90
            @r = Redis.new(:host => @info[:host], :port => @info[:port], :timeout => 60)
91
            @r.ping
92
        rescue
93
            xputs "[ERR] Sorry, can't connect to node #{self}"
94 95
            exit 1 if o[:abort]
            @r = nil
96
        end
97
        xputs "OK"
98 99
    end

100 101 102
    def assert_cluster
        info = @r.info
        if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0
103
            xputs "[ERR] Node #{self} is not configured as a cluster node."
104 105 106 107
            exit 1
        end
    end

108 109 110
    def assert_empty
        if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) ||
            (@r.info['db0'])
111
            xputs "[ERR] Node #{self} is not empty. Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0."
112 113 114 115
            exit 1
        end
    end

116 117 118 119 120
    def load_info(o={})
        self.connect
        nodes = @r.cluster("nodes").split("\n")
        nodes.each{|n|
            # name addr flags role ping_sent ping_recv link_status slots
121
            split = n.split
122
            name,addr,flags,master_id,ping_sent,ping_recv,config_epoch,link_status = split[0..6]
123
            slots = split[8..-1]
124 125 126 127
            info = {
                :name => name,
                :addr => addr,
                :flags => flags.split(","),
128
                :replicate => master_id,
129 130 131 132
                :ping_sent => ping_sent.to_i,
                :ping_recv => ping_recv.to_i,
                :link_status => link_status
            }
133 134
            info[:replicate] = false if master_id == "-"

135
            if info[:flags].index("myself")
136 137
                @info = @info.merge(info)
                @info[:slots] = {}
138 139
                slots.each{|s|
                    if s[0..0] == '['
140
                        if s.index("->-") # Migrating
141 142
                            slot,dst = s[1..-1].split("->-")
                            @info[:migrating][slot] = dst
143
                        elsif s.index("-<-") # Importing
144 145 146
                            slot,src = s[1..-1].split("-<-")
                            @info[:importing][slot] = src
                        end
147
                    elsif s.index("-")
148 149 150 151 152
                        start,stop = s.split("-")
                        self.add_slots((start.to_i)..(stop.to_i))
                    else
                        self.add_slots((s.to_i)..(s.to_i))
                    end
153
                } if slots
154
                @dirty = false
155 156 157
                @r.cluster("info").split("\n").each{|e|    
                    k,v=e.split(":")
                    k = k.to_sym
158
                    v.chop!
159 160 161 162 163 164
                    if k != :cluster_state
                        @info[k] = v.to_i
                    else
                        @info[k] = v
                    end
                }
165 166 167 168 169 170
            elsif o[:getfriends]
                @friends << info
            end
        }
    end

171 172
    def add_slots(slots)
        slots.each{|s|
173
            @info[:slots][s] = :new
174 175 176 177
        }
        @dirty = true
    end

178 179 180 181 182
    def set_as_replica(node_id)
        @info[:replicate] = node_id
        @dirty = true
    end

183 184
    def flush_node_config
        return if !@dirty
185 186 187 188 189 190 191 192 193
        if @info[:replicate]
            begin
                @r.cluster("replicate",@info[:replicate])
            rescue
                # If the cluster did not already joined it is possible that
                # the slave does not know the master node yet. So on errors
                # we return ASAP leaving the dirty flag set, to flush the
                # config later.
                return
194
            end
195 196 197 198 199 200 201 202 203 204
        else
            new = []
            @info[:slots].each{|s,val|
                if val == :new
                    new << s
                    @info[:slots][s] = true
                end
            }
            @r.cluster("addslots",*new)
        end
205 206 207
        @dirty = false
    end

208
    def info_string
209
        # We want to display the hash slots assigned to this node
A
antirez 已提交
210
        # as ranges, like in: "1-5,8-9,20-25,30"
211 212 213 214 215 216
        #
        # Note: this could be easily written without side effects,
        # we use 'slots' just to split the computation into steps.
        
        # First step: we want an increasing array of integers
        # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30]
217
        slots = @info[:slots].keys.sort
218

G
guiquanz 已提交
219
        # As we want to aggregate adjacent slots we convert all the
220 221
        # slot integers into ranges (with just one element)
        # So we have something like [1..1,2..2, ... and so forth.
A
antirez 已提交
222
        slots.map!{|x| x..x}
223

G
guiquanz 已提交
224
        # Finally we group ranges with adjacent elements.
225 226 227
        slots = slots.reduce([]) {|a,b|
            if !a.empty? && b.first == (a[-1].last)+1
                a[0..-2] + [(a[-1].first)..(b.last)]
228
            else
229
                a + [b]
230
            end
231 232 233 234 235 236 237
        }

        # Now our task is easy, we just convert ranges with just one
        # element into a number, and a real range into a start-end format.
        # Finally we join the array using the comma as separator.
        slots = slots.map{|x|
            x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}"
238
        }.join(",")
239

240
        role = self.has_flag?("master") ? "M" : "S"
241 242

        if self.info[:replicate] and @dirty
243
            is = "S: #{self.info[:name]} #{self.to_s}"
244
        else
245
            is = "#{role}: #{self.info[:name]} #{self.to_s}\n"+
246 247 248
            "   slots:#{slots} (#{self.slots.length} slots) "+
            "#{(self.info[:flags]-["myself"]).join(",")}"
        end
249 250
        if self.info[:replicate]
            is += "\n   replicates #{info[:replicate]}"
251 252
        elsif self.has_flag?("master") && self.info[:replicas]
            is += "\n   #{info[:replicas].length} additional replica(s)"
253 254
        end
        is
255
    end
256

257 258 259 260 261 262 263
    # Return a single string representing nodes and associated slots.
    # TODO: remove slaves from config when slaves will be handled
    # by Redis Cluster.
    def get_config_signature
        config = []
        @r.cluster("nodes").each_line{|l|
            s = l.split
264
            slots = s[8..-1].select {|x| x[0..0] != "["}
265
            next if slots.length == 0
266 267 268 269 270
            config << s[0]+":"+(slots.sort.join(","))
        }
        config.sort.join("|")
    end

271
    def info
272
        @info
273
    end
274 275 276 277 278
    
    def is_dirty?
        @dirty
    end

279 280 281 282 283 284
    def r
        @r
    end
end

class RedisTrib
285 286
    def initialize
        @nodes = []
287 288
        @fix = false
        @errors = []
289 290
    end

291 292 293
    def check_arity(req_args, num_args)
        if ((req_args > 0 and num_args != req_args) ||
           (req_args < 0 and num_args < req_args.abs))
294
           xputs "[ERR] Wrong number of arguments for specified sub command"
295 296 297 298
           exit 1
        end
    end

299 300 301 302
    def add_node(node)
        @nodes << node
    end

303 304
    def cluster_error(msg)
        @errors << msg
305
        xputs msg
306 307
    end

308 309 310 311 312 313 314
    def get_node_by_name(name)
        @nodes.each{|n|
            return n if n.info[:name] == name.downcase
        }
        return nil
    end

315 316 317 318 319 320 321 322 323 324 325
    # This function returns the master that has the least number of replicas
    # in the cluster. If there are multiple masters with the same smaller
    # number of replicas, one at random is returned.
    def get_master_with_least_replicas
        masters = @nodes.select{|n| n.has_flag? "master"}
        sorted = masters.sort{|a,b|
            a.info[:replicas].length <=> b.info[:replicas].length
        }
        sorted[0]
    end

326
    def check_cluster
327
        xputs ">>> Performing Cluster Check (using node #{@nodes[0]})"
328
        show_nodes
329
        check_config_consistency
330
        check_open_slots
331 332 333 334 335 336
        check_slots_coverage
    end

    # Merge slots of every known node. If the resulting slots are equal
    # to ClusterHashSlots, then all slots are served.
    def covered_slots
337 338 339 340
        slots = {}
        @nodes.each{|n|
            slots = slots.merge(n.slots)
        }
341 342 343 344
        slots
    end

    def check_slots_coverage
345
        xputs ">>> Check slots coverage..."
346
        slots = covered_slots
347
        if slots.length == ClusterHashSlots
348
            xputs "[OK] All #{ClusterHashSlots} slots covered."
349
        else
350
            cluster_error \
351
                "[ERR] Not all #{ClusterHashSlots} slots are covered by nodes."
A
antirez 已提交
352
            fix_slots_coverage if @fix
353
        end
354 355
    end

356
    def check_open_slots
357
        xputs ">>> Check for open slots..."
358 359 360
        open_slots = []
        @nodes.each{|n|
            if n.info[:migrating].size > 0
361 362
                cluster_error \
                    "[WARNING] Node #{n} has slots in migrating state."
363 364
                open_slots += n.info[:migrating].keys
            elsif n.info[:importing].size > 0
365 366
                cluster_error \
                    "[WARNING] Node #{n} has slots in importing state."
367 368 369 370
                open_slots += n.info[:importing].keys
            end
        }
        open_slots.uniq!
371
        if open_slots.length > 0
372
            xputs "[WARNING] The following slots are open: #{open_slots.join(",")}"
373 374 375 376 377 378
        end
        if @fix
            open_slots.each{|slot| fix_open_slot slot}
        end
    end

379 380 381 382 383 384 385 386
    def nodes_with_keys_in_slot(slot)
        nodes = []
        @nodes.each{|n|
            nodes << n if n.r.cluster("getkeysinslot",slot,1).length > 0
        }
        nodes
    end

387
    def fix_slots_coverage
388
        not_covered = (0...ClusterHashSlots).to_a - covered_slots.keys
389 390
        xputs ">>> Fixing slots coverage..."
        xputs "List of not covered slots: " + not_covered.join(",")
391 392 393 394 395 396 397 398 399

        # For every slot, take action depending on the actual condition:
        # 1) No node has keys for this slot.
        # 2) A single node has keys for this slot.
        # 3) Multiple nodes have keys for this slot.
        slots = {}
        not_covered.each{|slot|
            nodes = nodes_with_keys_in_slot(slot)
            slots[slot] = nodes
400
            xputs "Slot #{slot} has keys in #{nodes.length} nodes: #{nodes.join}"
401 402 403 404 405 406
        }

        none = slots.select {|k,v| v.length == 0}
        single = slots.select {|k,v| v.length == 1}
        multi = slots.select {|k,v| v.length > 1}

407 408
        # Handle case "1": keys in no node.
        if none.length > 0
409 410
            xputs "The folowing uncovered slots have no keys across the cluster:"
            xputs none.keys.join(",")
411 412 413
            yes_or_die "Fix these slots by covering with a random node?"
            none.each{|slot,nodes|
                node = @nodes.sample
414
                xputs ">>> Covering slot #{slot} with #{node}"
415 416 417 418 419 420
                node.r.cluster("addslots",slot)
            }
        end

        # Handle case "2": keys only in one node.
        if single.length > 0
421
            xputs "The folowing uncovered slots have keys in just one node:"
422
            puts single.keys.join(",")
423
            yes_or_die "Fix these slots by covering with those nodes?"
424
            single.each{|slot,nodes|
425
                xputs ">>> Covering slot #{slot} with #{nodes[0]}"
426 427 428
                nodes[0].r.cluster("addslots",slot)
            }
        end
429 430 431

        # Handle case "3": keys in multiple nodes.
        if multi.length > 0
432 433
            xputs "The folowing uncovered slots have keys in multiple nodes:"
            xputs multi.keys.join(",")
434 435
            yes_or_die "Fix these slots by moving keys into a single node?"
            multi.each{|slot,nodes|
436
                xputs ">>> Covering slot #{slot} moving keys to #{nodes[0]}"
437 438 439 440 441 442 443 444 445
                # TODO
                # 1) Set all nodes as "MIGRATING" for this slot, so that we
                # can access keys in the hash slot using ASKING.
                # 2) Move everything to node[0]
                # 3) Clear MIGRATING from nodes, and ADDSLOTS the slot to
                # node[0].
                raise "TODO: Work in progress"
            }
        end
446 447
    end

448 449 450 451 452 453 454 455 456 457 458 459 460
    # Slot 'slot' was found to be in importing or migrating state in one or
    # more nodes. This function fixes this condition by migrating keys where
    # it seems more sensible.
    def fix_open_slot(slot)
        migrating = []
        importing = []
        @nodes.each{|n|
            next if n.has_flag? "slave"
            if n.info[:migrating][slot]
                migrating << n
            elsif n.info[:importing][slot]
                importing << n
            elsif n.r.cluster("countkeysinslot",slot) > 0
461
                xputs "*** Found keys about slot #{slot} in node #{n}!"
462 463
            end
        }
464
        puts ">>> Fixing open slot #{slot}"
465 466 467 468 469 470
        puts "Set as migrating in: #{migrating.join(",")}"
        puts "Set as importing in: #{importing.join(",")}"

        # Case 1: The slot is in migrating state in one slot, and in
        #         importing state in 1 slot. That's trivial to address.
        if migrating.length == 1 && importing.length == 1
A
antirez 已提交
471
            move_slot(migrating[0],importing[0],slot,:verbose=>true)
472
        else
473
            xputs "[ERR] Sorry, Redis-trib can't fix this slot yet (work in progress)"
474 475 476
        end
    end

477 478
    # Check if all the nodes agree about the cluster configuration
    def check_config_consistency
479 480 481 482 483 484 485 486
        if !is_config_consistent?
            cluster_error "[ERR] Nodes don't agree about configuration!"
        else
            xputs "[OK] All nodes agree about slots configuration."
        end
    end

    def is_config_consistent?
487 488 489 490
        signatures=[]
        @nodes.each{|n|
            signatures << n.get_config_signature
        }
491 492 493 494 495 496 497 498 499
        return signatures.uniq.length == 1
    end

    def wait_cluster_join
        print "Waiting for the cluster to join"
        while !is_config_consistent?
            print "."
            STDOUT.flush
            sleep 1
500
        end
501
        print "\n"
502 503
    end

504
    def alloc_slots
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
        nodes_count = @nodes.length
        masters_count = @nodes.length / (@replicas+1)
        slots_per_node = ClusterHashSlots / masters_count
        masters = []
        slaves = []

        # The first step is to split instances by IP. This is useful as
        # we'll try to allocate master nodes in different physical machines
        # (as much as possible) and to allocate slaves of a given master in
        # different physical machines as well.
        #
        # This code assumes just that if the IP is different, than it is more
        # likely that the instance is running in a different physical host
        # or at least a different virtual machine.
        ips = {}
520
        @nodes.each{|n|
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
            ips[n.info[:host]] = [] if !ips[n.info[:host]]
            ips[n.info[:host]] << n
        }

        # Select master instances
        puts "Using #{masters_count} masters:"
        while masters.length < masters_count
            ips.each{|ip,nodes_list|
                next if nodes_list.length == 0
                masters << nodes_list.shift
                puts masters[-1]
                nodes_count -= 1
                break if masters.length == masters_count
            }
        end

        # Alloc slots on masters
        i = 0
539
        masters.each_with_index{|n,masternum|
540 541
            first = i*slots_per_node
            last = first+slots_per_node-1
542
            last = ClusterHashSlots-1 if masternum == masters.length-1
543 544 545
            n.add_slots first..last
            i += 1
        }
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566

        # Select N replicas for every master.
        # We try to split the replicas among all the IPs with spare nodes
        # trying to avoid the host where the master is running, if possible.
        masters.each{|m|
            i = 0
            while i < @replicas
                ips.each{|ip,nodes_list|
                    next if nodes_list.length == 0
                    # Skip instances with the same IP as the master if we
                    # have some more IPs available.
                    next if ip == m.info[:host] && nodes_count > nodes_list.length
                    slave = nodes_list.shift
                    slave.set_as_replica(m.info[:name])
                    nodes_count -= 1
                    i += 1
                    puts "#{m} replica ##{i} is #{slave}"
                    break if masters.length == masters_count
                }
            end
        }
567 568 569 570 571 572 573 574 575 576
    end

    def flush_nodes_config
        @nodes.each{|n|
            n.flush_node_config
        }
    end

    def show_nodes
        @nodes.each{|n|
577
            xputs n.info_string
578 579 580 581
        }
    end

    def join_cluster
582 583 584 585 586 587 588 589 590 591
        # We use a brute force approach to make sure the node will meet
        # each other, that is, sending CLUSTER MEET messages to all the nodes
        # about the very same node.
        # Thanks to gossip this information should propagate across all the
        # cluster in a matter of seconds.
        first = false
        @nodes.each{|n|
            if !first then first = n.info; next; end # Skip the first node
            n.r.cluster("meet",first[:host],first[:port])
        }
592 593 594 595 596 597
    end

    def yes_or_die(msg)
        print "#{msg} (type 'yes' to accept): "
        STDOUT.flush
        if !(STDIN.gets.chomp.downcase == "yes")
598
            xputs "*** Aborting..."
599 600
            exit 1
        end
601
    end
602

603
    def load_cluster_info_from_node(nodeaddr)
604
        node = ClusterNode.new(nodeaddr)
605 606
        node.connect(:abort => true)
        node.assert_cluster
607
        node.load_info(:getfriends => true)
608
        add_node(node)
609
        node.friends.each{|f|
610 611 612
            next if f[:flags].index("noaddr") ||
                    f[:flags].index("disconnected") ||
                    f[:flags].index("fail")
613 614 615 616 617
            fnode = ClusterNode.new(f[:addr])
            fnode.connect()
            fnode.load_info()
            add_node(fnode)
        }
618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
        populate_nodes_replicas_info
    end

    # This function is called by load_cluster_info_from_node in order to
    # add additional information to every node as a list of replicas.
    def populate_nodes_replicas_info
        # Start adding the new field to every node.
        @nodes.each{|n|
            n.info[:replicas] = []
        }

        # Populate the replicas field using the replicate field of slave
        # nodes.
        @nodes.each{|n|
            if n.info[:replicate]
                master = get_node_by_name(n.info[:replicate])
                if !master
                    xputs "*** WARNING: #{n} claims to be slave of unknown node ID #{n.info[:replicate]}."
                else
                    master.info[:replicas] << n
                end
            end
        }
641 642
    end

643 644 645 646 647
    # Given a list of source nodes return a "resharding plan"
    # with what slots to move in order to move "numslots" slots to another
    # instance.
    def compute_reshard_table(sources,numslots)
        moved = []
648
        # Sort from bigger to smaller instance, for two reasons:
649 650 651 652 653 654
        # 1) If we take less slots than instances it is better to start
        #    getting from the biggest instances.
        # 2) We take one slot more from the first instance in the case of not
        #    perfect divisibility. Like we have 3 nodes and need to get 10
        #    slots, we take 4 from the first, and 3 from the rest. So the
        #    biggest is always the first.
655
        sources = sources.sort{|a,b| b.slots.length <=> a.slots.length}
656 657 658
        source_tot_slots = sources.inject(0) {|sum,source|
            sum+source.slots.length
        }
659
        sources.each_with_index{|s,i|
660 661
            # Every node will provide a number of slots proportional to the
            # slots it has assigned.
662
            n = (numslots.to_f/source_tot_slots*s.slots.length)
663 664 665 666 667
            if i == 0
                n = n.ceil
            else
                n = n.floor
            end
668 669 670 671 672 673 674 675 676 677 678
            s.slots.keys.sort[(0...n)].each{|slot|
                if moved.length < numslots
                    moved << {:source => s, :slot => slot}
                end
            }
        }
        return moved
    end

    def show_reshard_table(table)
        table.each{|e|
679
            puts "    Moving slot #{e[:slot]} from #{e[:source].info[:name]}"
680 681 682
        }
    end

683
    def move_slot(source,target,slot,o={})
684 685
        # We start marking the slot as importing in the destination node,
        # and the slot as migrating in the target host. Note that the order of
686 687
        # the operations is important, as otherwise a client may be redirected
        # to the target node that does not yet know it is importing this slot.
688
        print "Moving slot #{slot} from #{source} to #{target}: "; STDOUT.flush
689
        target.r.cluster("setslot",slot,"importing",source.info[:name])
690
        source.r.cluster("setslot",slot,"migrating",target.info[:name])
691
        # Migrate all the keys from source to target using the MIGRATE command
692 693 694 695
        while true
            keys = source.r.cluster("getkeysinslot",slot,10)
            break if keys.length == 0
            keys.each{|key|
696
                source.r.migrate(target.info[:host],target.info[:port],key,0,1000)
697 698 699 700 701 702 703 704 705
                print "." if o[:verbose]
                STDOUT.flush
            }
        end
        puts
        # Set the new node as the owner of the slot in all the known nodes.
        @nodes.each{|n|
            n.r.cluster("setslot",slot,"node",target.info[:name])
        }
706 707
    end

708 709
    # redis-trib subcommands implementations

710 711
    def check_cluster_cmd(argv,opt)
        load_cluster_info_from_node(argv[0])
712 713 714
        check_cluster
    end

715
    def fix_cluster_cmd(argv,opt)
716
        @fix = true
717
        load_cluster_info_from_node(argv[0])
718 719 720
        check_cluster
    end

721 722
    def reshard_cluster_cmd(argv,opt)
        load_cluster_info_from_node(argv[0])
723 724
        check_cluster
        if @errors.length != 0
725
            puts "*** Please fix your cluster problems before resharding"
726 727
            exit 1
        end
728
        numslots = 0
729 730
        while numslots <= 0 or numslots > ClusterHashSlots
            print "How many slots do you want to move (from 1 to #{ClusterHashSlots})? "
731 732 733 734 735 736
            numslots = STDIN.gets.to_i
        end
        target = nil
        while not target
            print "What is the receiving node ID? "
            target = get_node_by_name(STDIN.gets.chop)
737
            if !target || target.has_flag?("slave")
738
                xputs "*** The specified node is not known or not a master, please retry."
739
                target = nil
740 741 742
            end
        end
        sources = []
743 744 745
        xputs "Please enter all the source node IDs."
        xputs "  Type 'all' to use all the nodes as source nodes for the hash slots."
        xputs "  Type 'done' once you entered all the source nodes IDs."
746 747 748 749 750 751 752 753 754 755 756 757 758 759
        while true
            print "Source node ##{sources.length+1}:"
            line = STDIN.gets.chop
            src = get_node_by_name(line)
            if line == "done"
                if sources.length == 0
                    puts "No source nodes given, operation aborted"
                    exit 1
                else
                    break
                end
            elsif line == "all"
                @nodes.each{|n|
                    next if n.info[:name] == target.info[:name]
760
                    next if n.has_flag?("slave")
761 762 763
                    sources << n
                }
                break
764
            elsif !src || src.has_flag?("slave")
765
                xputs "*** The specified node is not known or is not a master, please retry."
766
            elsif src.info[:name] == target.info[:name]
767
                xputs "*** It is not possible to use the target node as source node."
768 769 770
            else
                sources << src
            end
771
        end
772 773 774 775 776 777
        puts "\nReady to move #{numslots} slots."
        puts "  Source nodes:"
        sources.each{|s| puts "    "+s.info_string}
        puts "  Destination node:"
        puts "    #{target.info_string}"
        reshard_table = compute_reshard_table(sources,numslots)
778
        puts "  Resharding plan:"
779
        show_reshard_table(reshard_table)
780 781 782 783
        print "Do you want to proceed with the proposed reshard plan (yes/no)? "
        yesno = STDIN.gets.chop
        exit(1) if (yesno != "yes")
        reshard_table.each{|e|
784
            move_slot(e[:source],target,e[:slot],:verbose=>true)
785
        }
786 787
    end

788 789 790 791 792 793 794 795 796 797 798 799 800 801
    # This is an helper function for create_cluster_cmd that verifies if
    # the number of nodes and the specified replicas have a valid configuration
    # where there are at least three master nodes and enough replicas per node.
    def check_create_parameters
        masters = @nodes.length/(@replicas+1)
        if masters < 3
            puts "*** ERROR: Invalid configuration for cluster creation."
            puts "*** Redis Cluster requires at least 3 master nodes."
            puts "*** This is not possible with #{@nodes.length} nodes and #{@replicas} replicas per node."
            puts "*** At least #{3*(@replicas+1)} nodes are required."
            exit 1
        end
    end

802
    def create_cluster_cmd(argv,opt)
803 804 805
        opt = {'replicas' => 0}.merge(opt)
        @replicas = opt['replicas'].to_i

806
        xputs ">>> Creating cluster"
807
        argv[0..-1].each{|n|
808 809 810
            node = ClusterNode.new(n)
            node.connect(:abort => true)
            node.assert_cluster
811
            node.load_info
812 813 814
            node.assert_empty
            add_node(node)
        }
815
        check_create_parameters
816
        xputs ">>> Performing hash slots allocation on #{@nodes.length} nodes..."
817 818 819 820
        alloc_slots
        show_nodes
        yes_or_die "Can I set the above configuration?"
        flush_nodes_config
821 822
        xputs ">>> Nodes configuration updated"
        xputs ">>> Sending CLUSTER MEET messages to join the cluster"
823
        join_cluster
824 825 826 827 828
        # Give one second for the join to start, in order to avoid that
        # wait_cluster_join will find all the nodes agree about the config as
        # they are still empty with unassigned slots.
        sleep 1
        wait_cluster_join
829
        flush_nodes_config # Useful for the replicas
830 831
        check_cluster
    end
832

833 834
    def addnode_cluster_cmd(argv,opt)
        xputs ">>> Adding node #{argv[0]} to cluster #{argv[1]}"
835 836

        # Check the existing cluster
837
        load_cluster_info_from_node(argv[1])
838 839
        check_cluster

840 841 842 843 844 845 846 847 848 849 850 851 852 853
        # If --master-id was specified, try to resolve it now so that we
        # abort before starting with the node configuration.
        if opt['slave']
            if opt['master-id']
                master = get_node_by_name(opt['master-id'])
                if !master
                    xputs "[ERR] No such master ID #{opt['master-id']}"
                end
            else
                master = get_master_with_least_replicas
                xputs "Automatically selected master #{master}"
            end
        end

854
        # Add the new node
855
        new = ClusterNode.new(argv[0])
856 857 858 859 860
        new.connect(:abort => true)
        new.assert_cluster
        new.load_info
        new.assert_empty
        first = @nodes.first.info
861
        add_node(new)
862 863

        # Send CLUSTER MEET command to the new node
864
        xputs ">>> Send CLUSTER MEET to node #{new} to make it join the cluster."
865
        new.r.cluster("meet",first[:host],first[:port])
866 867 868 869 870 871 872 873 874

        # Additional configuration is needed if the node is added as
        # a slave.
        if opt['slave']
            wait_cluster_join
            xputs ">>> Configure node as replica of #{master}."
            new.r.cluster("replicate",master.info[:name])
        end
        xputs "[OK] New node added correctly."
875
    end
A
antirez 已提交
876

877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914
    def delnode_cluster_cmd(argv,opt)
        id = argv[1].downcase
        xputs ">>> Removing node #{id} from cluster #{argv[0]}"

        # Load cluster information
        load_cluster_info_from_node(argv[0])

        # Check if the node exists and is not empty
        node = get_node_by_name(id)

        if !node
            xputs "[ERR] No such node ID #{id}"
            exit 1
        end

        if node.slots.length != 0
            xputs "[ERR] Node #{node} is not empty! Reshard data away and try again."
            exit 1
        end

        # Send CLUSTER FORGET to all the nodes but the node to remove
        xputs ">>> Sending CLUSTER FORGET messages to the cluster..."
        @nodes.each{|n|
            next if n == node
            if n.info[:replicate] && n.info[:replicate].downcase == node_id
                # Reconfigure the slave to replicate with some other node
                xputs ">>> #{n} as replica of #{master}"
                master = get_master_with_least_replicas
                n.r.cluster("replicate",master.info[:name])
            end
            n.r.cluster("forget",argv[1])
        }

        # Finally shutdown the node
        xputs ">>> SHUTDOWN the node."
        node.r.shutdown
    end

915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942
    def set_timeout_cluster_cmd(argv,opt)
        timeout = argv[1].to_i
        if timeout < 100
            puts "Setting a node timeout of less than 100 milliseconds is a bad idea."
            exit 1
        end

        # Load cluster information
        load_cluster_info_from_node(argv[0])
        ok_count = 0
        err_count = 0

        # Send CLUSTER FORGET to all the nodes but the node to remove
        xputs ">>> Reconfiguring node timeout in every cluster node..."
        @nodes.each{|n|
            begin
                n.r.config("set","cluster-node-timeout",timeout)
                n.r.config("rewrite")
                ok_count += 1
                xputs "*** New timeout set for #{n}"
            rescue => e
                puts "ERR setting node-timeot for #{n}: #{e}"
                err_count += 1
            end
        }
        xputs ">>> New node timeout set. #{ok_count} OK, #{err_count} ERR."
    end

943
    def help_cluster_cmd(argv,opt)
A
antirez 已提交
944 945 946
        show_help
        exit 0
    end
947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975

    # Parse the options for the specific command "cmd".
    # Returns an hash populate with option => value pairs, and the index of
    # the first non-option argument in ARGV.
    def parse_options(cmd)
        idx = 1 ; # Current index into ARGV
        options={}
        while idx < ARGV.length && ARGV[idx][0..1] == '--'
            if ARGV[idx][0..1] == "--"
                option = ARGV[idx][2..-1]
                idx += 1
                if ALLOWED_OPTIONS[cmd] == nil || ALLOWED_OPTIONS[cmd][option] == nil
                    puts "Unknown option '#{option}' for command '#{cmd}'"
                    exit 1
                end
                if ALLOWED_OPTIONS[cmd][option]
                    value = ARGV[idx]
                    idx += 1
                else
                    value = true
                end
                options[option] = value
            else
                # Remaining arguments are not options.
                break
            end
        end
        return options,idx
    end
976 977 978
end

COMMANDS={
979 980 981
    "create"  => ["create_cluster_cmd", -2, "host1:port1 ... hostN:portN"],
    "check"   => ["check_cluster_cmd", 2, "host:port"],
    "fix"     => ["fix_cluster_cmd", 2, "host:port"],
982
    "reshard" => ["reshard_cluster_cmd", 2, "host:port"],
983 984 985
    "add-node" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"],
    "del-node" => ["delnode_cluster_cmd", 3, "host:port node_id"],
    "set-timeout" => ["set_timeout_cluster_cmd", 3, "host:port milliseconds"],
A
antirez 已提交
986
    "help"    => ["help_cluster_cmd", 1, "(show this help)"]
987 988
}

989
ALLOWED_OPTIONS={
990
    "create" => {"replicas" => true},
991
    "add-node" => {"slave" => false, "master-id" => true}
992 993
}

A
antirez 已提交
994
def show_help
995
    puts "Usage: redis-trib <command> <options> <arguments ...>\n\n"
A
antirez 已提交
996
    COMMANDS.each{|k,v|
997
        o = ""
998
        puts "  #{k.ljust(15)} #{v[2]}"
999 1000
        if ALLOWED_OPTIONS[k]
            ALLOWED_OPTIONS[k].each{|optname,has_arg|
1001
                puts "                  --#{optname}" + (has_arg ? " <arg>" : "")
1002 1003
            }
        end
A
antirez 已提交
1004
    }
1005
    puts "\nFor check, fix, reshard, del-node, set-timeout you can specify the host and port of any working node in the cluster.\n"
A
antirez 已提交
1006 1007 1008 1009 1010
end

# Sanity check
if ARGV.length == 0
    show_help
1011 1012 1013 1014 1015 1016 1017 1018 1019
    exit 1
end

rt = RedisTrib.new
cmd_spec = COMMANDS[ARGV[0].downcase]
if !cmd_spec
    puts "Unknown redis-trib subcommand '#{ARGV[0]}'"
    exit 1
end
1020 1021 1022 1023

# Parse options
cmd_options,first_non_option = rt.parse_options(ARGV[0].downcase)
rt.check_arity(cmd_spec[1],ARGV.length-(first_non_option-1))
1024 1025

# Dispatch
1026
rt.send(cmd_spec[0],ARGV[first_non_option..-1],cmd_options)