redis-trib.rb 20.6 KB
Newer Older
1 2
#!/usr/bin/env ruby

3
# TODO (temporary here, we'll move this into the Github issues once
G
guiquanz 已提交
4
#       redis-trib initial implementation is completed).
5
#
6 7 8 9 10 11 12 13 14 15 16 17 18 19
# - Make sure that if the rehashing fails in the middle redis-trib will try
#   to recover.
# - When redis-trib performs a cluster check, if it detects a slot move in
#   progress it should prompt the user to continue the move from where it
#   stopped.
# - Gracefully handle Ctrl+C in move_slot to prompt the user if really stop
#   while rehashing, and performing the best cleanup possible if the user
#   forces the quit.
# - When doing "fix" set a global Fix to true, and prompt the user to
#   fix the problem if automatically fixable every time there is something
#   to fix. For instance:
#   1) If there is a node that pretend to receive a slot, or to migrate a
#      slot, but has no entries in that slot, fix it.
#   2) If there is a node having keys in slots that are not owned by it
G
guiquanz 已提交
20
#      fix this condition moving the entries in the same node.
21 22
#   3) Perform more possibly slow tests about the state of the cluster.
#   4) When aborted slot migration is detected, fix it.
23

24 25 26
require 'rubygems'
require 'redis'

27
ClusterHashSlots = 16384
28

29 30 31 32
def xputs(s)
    printf s
    STDOUT.flush
end
33

34 35 36
class ClusterNode
    def initialize(addr)
        s = addr.split(":")
37
        if s.length != 2
A
antirez 已提交
38
            puts "Invalid node name #{addr}"
39 40
            exit 1
        end
41
        @r = nil
42 43 44 45
        @info = {}
        @info[:host] = s[0]
        @info[:port] = s[1]
        @info[:slots] = {}
46 47
        @info[:migrating] = {}
        @info[:importing] = {}
48
        @dirty = false # True if we need to flush slots info into node.
49
        @friends = []
50 51
    end

52 53 54 55 56
    def friends
        @friends
    end

    def slots 
57
        @info[:slots]
58 59
    end

60
    def to_s
61
        "#{@info[:host]}:#{@info[:port]}"
62 63
    end

64
    def connect(o={})
65
        return if @r
66
        xputs "Connecting to node #{self}: "
67
        begin
68
            @r = Redis.new(:host => @info[:host], :port => @info[:port])
69
            @r.ping
70 71
        rescue
            puts "ERROR"
72
            puts "Sorry, can't connect to node #{self}"
73 74
            exit 1 if o[:abort]
            @r = nil
75 76 77 78
        end
        puts "OK"
    end

79 80 81 82 83 84 85 86
    def assert_cluster
        info = @r.info
        if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0
            puts "Error: Node #{self} is not configured as a cluster node."
            exit 1
        end
    end

87 88 89
    def assert_empty
        if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) ||
            (@r.info['db0'])
90
            puts "Error: Node #{self} is not empty. Either the node already knows other nodes (check with CLUSTER NODES) or contains some key in database 0."
91 92 93 94
            exit 1
        end
    end

95 96 97 98 99
    def load_info(o={})
        self.connect
        nodes = @r.cluster("nodes").split("\n")
        nodes.each{|n|
            # name addr flags role ping_sent ping_recv link_status slots
100 101 102
            split = n.split
            name,addr,flags,role,ping_sent,ping_recv,link_status = split[0..6]
            slots = split[7..-1]
103 104 105 106 107 108 109 110 111 112
            info = {
                :name => name,
                :addr => addr,
                :flags => flags.split(","),
                :role => role,
                :ping_sent => ping_sent.to_i,
                :ping_recv => ping_recv.to_i,
                :link_status => link_status
            }
            if info[:flags].index("myself")
113 114
                @info = @info.merge(info)
                @info[:slots] = {}
115 116
                slots.each{|s|
                    if s[0..0] == '['
117 118 119 120 121 122 123
                        if s[2..4] == "->-" # Migrating
                            slot,dst = s[1..-1].split("->-")
                            @info[:migrating][slot] = dst
                        elsif s[2..4] == "-<-" # Importing
                            slot,src = s[1..-1].split("-<-")
                            @info[:importing][slot] = src
                        end
124
                    elsif s.index("-")
125 126 127 128 129
                        start,stop = s.split("-")
                        self.add_slots((start.to_i)..(stop.to_i))
                    else
                        self.add_slots((s.to_i)..(s.to_i))
                    end
130
                } if slots
131
                @dirty = false
132 133 134
                @r.cluster("info").split("\n").each{|e|    
                    k,v=e.split(":")
                    k = k.to_sym
135
                    v.chop!
136 137 138 139 140 141
                    if k != :cluster_state
                        @info[k] = v.to_i
                    else
                        @info[k] = v
                    end
                }
142 143 144 145 146 147
            elsif o[:getfriends]
                @friends << info
            end
        }
    end

148 149
    def add_slots(slots)
        slots.each{|s|
150
            @info[:slots][s] = :new
151 152 153 154 155 156 157
        }
        @dirty = true
    end

    def flush_node_config
        return if !@dirty
        new = []
158
        @info[:slots].each{|s,val|
159 160
            if val == :new
                new << s
161
                @info[:slots][s] = true
162 163 164 165 166 167
            end
        }
        @r.cluster("addslots",*new)
        @dirty = false
    end

168
    def info_string
169
        # We want to display the hash slots assigned to this node
A
antirez 已提交
170
        # as ranges, like in: "1-5,8-9,20-25,30"
171 172 173 174 175 176
        #
        # Note: this could be easily written without side effects,
        # we use 'slots' just to split the computation into steps.
        
        # First step: we want an increasing array of integers
        # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30]
177
        slots = @info[:slots].keys.sort
178

G
guiquanz 已提交
179
        # As we want to aggregate adjacent slots we convert all the
180 181
        # slot integers into ranges (with just one element)
        # So we have something like [1..1,2..2, ... and so forth.
A
antirez 已提交
182
        slots.map!{|x| x..x}
183

G
guiquanz 已提交
184
        # Finally we group ranges with adjacent elements.
185 186 187
        slots = slots.reduce([]) {|a,b|
            if !a.empty? && b.first == (a[-1].last)+1
                a[0..-2] + [(a[-1].first)..(b.last)]
188
            else
189
                a + [b]
190
            end
191 192 193 194 195 196 197
        }

        # Now our task is easy, we just convert ranges with just one
        # element into a number, and a real range into a start-end format.
        # Finally we join the array using the comma as separator.
        slots = slots.map{|x|
            x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}"
198
        }.join(",")
199

200
        "[#{@info[:cluster_state].upcase}] #{self.info[:name]} #{self.to_s} slots:#{slots} (#{self.slots.length} slots)"
201
    end
202

203 204 205 206 207 208 209 210
    # Return a single string representing nodes and associated slots.
    # TODO: remove slaves from config when slaves will be handled
    # by Redis Cluster.
    def get_config_signature
        config = []
        @r.cluster("nodes").each_line{|l|
            s = l.split
            slots = s[7..-1].select {|x| x[0..0] != "["}
211
            next if slots.length == 0
212 213 214 215 216
            config << s[0]+":"+(slots.sort.join(","))
        }
        config.sort.join("|")
    end

217
    def info
218
        @info
219
    end
220 221 222 223 224
    
    def is_dirty?
        @dirty
    end

225 226 227 228 229 230
    def r
        @r
    end
end

class RedisTrib
231 232
    def initialize
        @nodes = []
233 234
        @fix = false
        @errors = []
235 236
    end

237 238 239 240 241 242 243 244
    def check_arity(req_args, num_args)
        if ((req_args > 0 and num_args != req_args) ||
           (req_args < 0 and num_args < req_args.abs))
           puts "Wrong number of arguments for specified sub command"
           exit 1
        end
    end

245 246 247 248
    def add_node(node)
        @nodes << node
    end

249 250 251 252 253 254 255
    def get_node_by_name(name)
        @nodes.each{|n|
            return n if n.info[:name] == name.downcase
        }
        return nil
    end

256
    def check_cluster
257
        puts "Performing Cluster Check (using node #{@nodes[0]})"
258
        show_nodes
259
        check_config_consistency
260 261 262 263 264 265
        check_slots_coverage
    end

    # Merge slots of every known node. If the resulting slots are equal
    # to ClusterHashSlots, then all slots are served.
    def covered_slots
266 267 268 269
        slots = {}
        @nodes.each{|n|
            slots = slots.merge(n.slots)
        }
270 271 272 273 274
        slots
    end

    def check_slots_coverage
        slots = covered_slots
275 276
        if slots.length == ClusterHashSlots
            puts "[OK] All #{ClusterHashSlots} slots covered."
277
        else
278 279 280
            @errors <<
                "[ERR] Not all #{ClusterHashSlots} slots are covered by nodes."
            puts @errors[-1]
A
antirez 已提交
281
            fix_slots_coverage if @fix
282
        end
283 284
    end

285 286 287 288 289 290 291 292
    def nodes_with_keys_in_slot(slot)
        nodes = []
        @nodes.each{|n|
            nodes << n if n.r.cluster("getkeysinslot",slot,1).length > 0
        }
        nodes
    end

293
    def fix_slots_coverage
294 295 296 297 298 299 300 301 302 303 304 305
        not_covered = (0...ClusterHashSlots).to_a - covered_slots.keys
        puts "\nFixing slots coverage..."
        puts "List of not covered slots: " + not_covered.join(",")

        # For every slot, take action depending on the actual condition:
        # 1) No node has keys for this slot.
        # 2) A single node has keys for this slot.
        # 3) Multiple nodes have keys for this slot.
        slots = {}
        not_covered.each{|slot|
            nodes = nodes_with_keys_in_slot(slot)
            slots[slot] = nodes
306
            puts "Slot #{slot} has keys in #{nodes.length} nodes: #{nodes.join}"
307 308 309 310 311 312
        }

        none = slots.select {|k,v| v.length == 0}
        single = slots.select {|k,v| v.length == 1}
        multi = slots.select {|k,v| v.length > 1}

313 314 315 316 317 318 319 320 321 322 323 324 325 326
        # Handle case "1": keys in no node.
        if none.length > 0
            puts "The folowing uncovered slots have no keys across the cluster:"
            puts none.keys.join(",")
            yes_or_die "Fix these slots by covering with a random node?"
            none.each{|slot,nodes|
                node = @nodes.sample
                puts "Covering slot #{slot} with #{node}"
                node.r.cluster("addslots",slot)
            }
        end

        # Handle case "2": keys only in one node.
        if single.length > 0
327 328
            puts "The folowing uncovered slots have keys in just one node:"
            puts single.keys.join(",")
329
            yes_or_die "Fix these slots by covering with those nodes?"
330 331 332 333 334
            single.each{|slot,nodes|
                puts "Covering slot #{slot} with #{nodes[0]}"
                nodes[0].r.cluster("addslots",slot)
            }
        end
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351

        # Handle case "3": keys in multiple nodes.
        if multi.length > 0
            puts "The folowing uncovered slots have keys in multiple nodes:"
            puts multi.keys.join(",")
            yes_or_die "Fix these slots by moving keys into a single node?"
            multi.each{|slot,nodes|
                puts "Covering slot #{slot} moving keys to #{nodes[0]}"
                # TODO
                # 1) Set all nodes as "MIGRATING" for this slot, so that we
                # can access keys in the hash slot using ASKING.
                # 2) Move everything to node[0]
                # 3) Clear MIGRATING from nodes, and ADDSLOTS the slot to
                # node[0].
                raise "TODO: Work in progress"
            }
        end
352 353
    end

354 355 356 357 358 359 360 361 362 363 364 365 366
    # Check if all the nodes agree about the cluster configuration
    def check_config_consistency
        signatures=[]
        @nodes.each{|n|
            signatures << n.get_config_signature
        }
        if signatures.uniq.length != 1
            puts "[ERR] Nodes don't agree about configuration!"
        else
            puts "[OK] All nodes agree about slots configuration."
        end
    end

367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
    def alloc_slots
        slots_per_node = ClusterHashSlots/@nodes.length
        i = 0
        @nodes.each{|n|
            first = i*slots_per_node
            last = first+slots_per_node-1
            last = ClusterHashSlots-1 if i == @nodes.length-1
            n.add_slots first..last
            i += 1
        }
    end

    def flush_nodes_config
        @nodes.each{|n|
            n.flush_node_config
        }
    end

    def show_nodes
        @nodes.each{|n|
387
            puts n.info_string
388 389 390 391
        }
    end

    def join_cluster
392 393 394 395 396 397 398 399 400 401
        # We use a brute force approach to make sure the node will meet
        # each other, that is, sending CLUSTER MEET messages to all the nodes
        # about the very same node.
        # Thanks to gossip this information should propagate across all the
        # cluster in a matter of seconds.
        first = false
        @nodes.each{|n|
            if !first then first = n.info; next; end # Skip the first node
            n.r.cluster("meet",first[:host],first[:port])
        }
402 403 404 405 406 407 408 409 410
    end

    def yes_or_die(msg)
        print "#{msg} (type 'yes' to accept): "
        STDOUT.flush
        if !(STDIN.gets.chomp.downcase == "yes")
            puts "Aborting..."
            exit 1
        end
411
    end
412

413
    def load_cluster_info_from_node(nodeaddr)
414
        node = ClusterNode.new(nodeaddr)
415 416
        node.connect(:abort => true)
        node.assert_cluster
417
        node.load_info(:getfriends => true)
418
        add_node(node)
419
        node.friends.each{|f|
420 421 422
            next if f[:flags].index("noaddr") ||
                    f[:flags].index("disconnected") ||
                    f[:flags].index("fail")
423 424 425 426 427
            fnode = ClusterNode.new(f[:addr])
            fnode.connect()
            fnode.load_info()
            add_node(fnode)
        }
428 429
    end

430 431 432 433 434
    # Given a list of source nodes return a "resharding plan"
    # with what slots to move in order to move "numslots" slots to another
    # instance.
    def compute_reshard_table(sources,numslots)
        moved = []
435
        # Sort from bigger to smaller instance, for two reasons:
436 437 438 439 440 441
        # 1) If we take less slots than instances it is better to start
        #    getting from the biggest instances.
        # 2) We take one slot more from the first instance in the case of not
        #    perfect divisibility. Like we have 3 nodes and need to get 10
        #    slots, we take 4 from the first, and 3 from the rest. So the
        #    biggest is always the first.
442
        sources = sources.sort{|a,b| b.slots.length <=> a.slots.length}
443 444 445
        source_tot_slots = sources.inject(0) {|sum,source|
            sum+source.slots.length
        }
446
        sources.each_with_index{|s,i|
447 448
            # Every node will provide a number of slots proportional to the
            # slots it has assigned.
449
            n = (numslots.to_f/source_tot_slots*s.slots.length)
450 451 452 453 454
            if i == 0
                n = n.ceil
            else
                n = n.floor
            end
455 456 457 458 459 460 461 462 463 464 465
            s.slots.keys.sort[(0...n)].each{|slot|
                if moved.length < numslots
                    moved << {:source => s, :slot => slot}
                end
            }
        }
        return moved
    end

    def show_reshard_table(table)
        table.each{|e|
466
            puts "    Moving slot #{e[:slot]} from #{e[:source].info[:name]}"
467 468 469
        }
    end

470
    def move_slot(source,target,slot,o={})
471 472
        # We start marking the slot as importing in the destination node,
        # and the slot as migrating in the target host. Note that the order of
473 474
        # the operations is important, as otherwise a client may be redirected
        # to the target node that does not yet know it is importing this slot.
475
        print "Moving slot #{slot} from #{source.info_string}: "; STDOUT.flush
476
        target.r.cluster("setslot",slot,"importing",source.info[:name])
477
        source.r.cluster("setslot",slot,"migrating",target.info[:name])
478
        # Migrate all the keys from source to target using the MIGRATE command
479 480 481 482
        while true
            keys = source.r.cluster("getkeysinslot",slot,10)
            break if keys.length == 0
            keys.each{|key|
483
                source.r.migrate(target.info[:host],target.info[:port],key,0,1000)
484 485 486 487 488 489 490 491 492
                print "." if o[:verbose]
                STDOUT.flush
            }
        end
        puts
        # Set the new node as the owner of the slot in all the known nodes.
        @nodes.each{|n|
            n.r.cluster("setslot",slot,"node",target.info[:name])
        }
493 494
    end

495 496
    # redis-trib subcommands implementations

497 498 499 500 501 502 503
    def check_cluster_cmd
        load_cluster_info_from_node(ARGV[1])
        check_cluster
    end

    def fix_cluster_cmd
        @fix = true
504
        load_cluster_info_from_node(ARGV[1])
505 506 507
        check_cluster
    end

508 509
    def reshard_cluster_cmd
        load_cluster_info_from_node(ARGV[1])
510 511
        check_cluster
        if @errors.length != 0
512 513 514
            puts "Please fix your cluster problems before resharding."
            exit 1
        end
515
        numslots = 0
516 517
        while numslots <= 0 or numslots > ClusterHashSlots
            print "How many slots do you want to move (from 1 to #{ClusterHashSlots})? "
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
            numslots = STDIN.gets.to_i
        end
        target = nil
        while not target
            print "What is the receiving node ID? "
            target = get_node_by_name(STDIN.gets.chop)
            if not target
                puts "The specified node is not known, please retry."
            end
        end
        sources = []
        puts "Please enter all the source node IDs."
        puts "  Type 'all' to use all the nodes as source nodes for the hash slots."
        puts "  Type 'done' once you entered all the source nodes IDs."
        while true
            print "Source node ##{sources.length+1}:"
            line = STDIN.gets.chop
            src = get_node_by_name(line)
            if line == "done"
                if sources.length == 0
                    puts "No source nodes given, operation aborted"
                    exit 1
                else
                    break
                end
            elsif line == "all"
                @nodes.each{|n|
                    next if n.info[:name] == target.info[:name]
                    sources << n
                }
                break
            elsif not src
                puts "The specified node is not known, please retry."
            elsif src.info[:name] == target.info[:name]
                puts "It is not possible to use the target node as source node."
            else
                sources << src
            end
556
        end
557 558 559 560 561 562
        puts "\nReady to move #{numslots} slots."
        puts "  Source nodes:"
        sources.each{|s| puts "    "+s.info_string}
        puts "  Destination node:"
        puts "    #{target.info_string}"
        reshard_table = compute_reshard_table(sources,numslots)
563
        puts "  Resharding plan:"
564
        show_reshard_table(reshard_table)
565 566 567 568
        print "Do you want to proceed with the proposed reshard plan (yes/no)? "
        yesno = STDIN.gets.chop
        exit(1) if (yesno != "yes")
        reshard_table.each{|e|
569
            move_slot(e[:source],target,e[:slot],:verbose=>true)
570
        }
571 572
    end

573 574 575 576 577 578
    def create_cluster_cmd
        puts "Creating cluster"
        ARGV[1..-1].each{|n|
            node = ClusterNode.new(n)
            node.connect(:abort => true)
            node.assert_cluster
579
            node.load_info
580 581 582 583 584 585 586 587 588 589 590 591 592
            node.assert_empty
            add_node(node)
        }
        puts "Performing hash slots allocation on #{@nodes.length} nodes..."
        alloc_slots
        show_nodes
        yes_or_die "Can I set the above configuration?"
        flush_nodes_config
        puts "** Nodes configuration updated"
        puts "** Sending CLUSTER MEET messages to join the cluster"
        join_cluster
        check_cluster
    end
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612

    def addnode_cluster_cmd
        puts "Adding node #{ARGV[1]} to cluster #{ARGV[2]}"

        # Check the existing cluster
        load_cluster_info_from_node(ARGV[2])
        check_cluster

        # Add the new node
        new = ClusterNode.new(ARGV[1])
        new.connect(:abort => true)
        new.assert_cluster
        new.load_info
        new.assert_empty
        first = @nodes.first.info

        # Send CLUSTER MEET command to the new node
        puts "Send CLUSTER MEET to node #{new} to make it join the cluster."
        new.r.cluster("meet",first[:host],first[:port])
    end
A
antirez 已提交
613 614 615 616 617

    def help_cluster_cmd
        show_help
        exit 0
    end
618 619 620
end

COMMANDS={
621 622 623
    "create"  => ["create_cluster_cmd", -2, "host1:port1 ... hostN:portN"],
    "check"   => ["check_cluster_cmd", 2, "host:port"],
    "fix"     => ["fix_cluster_cmd", 2, "host:port"],
624
    "reshard" => ["reshard_cluster_cmd", 2, "host:port"],
A
antirez 已提交
625 626
    "addnode" => ["addnode_cluster_cmd", 3, "new_host:new_port existing_host:existing_port"],
    "help"    => ["help_cluster_cmd", 1, "(show this help)"]
627 628
}

A
antirez 已提交
629
def show_help
630
    puts "Usage: redis-trib <command> <arguments ...>"
A
antirez 已提交
631 632
    puts
    COMMANDS.each{|k,v|
633
        puts "  #{k.ljust(10)} #{v[2]}"
A
antirez 已提交
634 635
    }
    puts
A
antirez 已提交
636 637 638 639 640
end

# Sanity check
if ARGV.length == 0
    show_help
641 642 643 644 645 646 647 648 649 650 651 652 653
    exit 1
end

rt = RedisTrib.new
cmd_spec = COMMANDS[ARGV[0].downcase]
if !cmd_spec
    puts "Unknown redis-trib subcommand '#{ARGV[0]}'"
    exit 1
end
rt.check_arity(cmd_spec[1],ARGV.length)

# Dispatch
rt.send(cmd_spec[0])