redis-trib.rb 9.3 KB
Newer Older
1 2 3 4 5
#!/usr/bin/env ruby

require 'rubygems'
require 'redis'

6 7
ClusterHashSlots = 4096

8 9 10 11
def xputs(s)
    printf s
    STDOUT.flush
end
12

13 14 15
class ClusterNode
    def initialize(addr)
        s = addr.split(":")
16
        if s.length != 2
A
antirez 已提交
17
            puts "Invalid node name #{addr}"
18 19
            exit 1
        end
20
        @r = nil
21 22 23 24 25
        @info = {}
        @info[:host] = s[0]
        @info[:port] = s[1]
        @info[:slots] = {}
        @dirty = false # True if we need to flush slots info into node.
26
        @friends = []
27 28
    end

29 30 31 32 33
    def friends
        @friends
    end

    def slots 
34
        @info[:slots]
35 36
    end

37
    def to_s
38
        "#{@info[:host]}:#{@info[:port]}"
39 40
    end

41
    def connect(o={})
42
        return if @r
43
        xputs "Connecting to node #{self}: "
44
        begin
45
            @r = Redis.new(:host => @info[:host], :port => @info[:port])
46
            @r.ping
47 48
        rescue
            puts "ERROR"
49
            puts "Sorry, can't connect to node #{self}"
50 51
            exit 1 if o[:abort]
            @r = nil
52 53 54 55
        end
        puts "OK"
    end

56 57 58 59 60 61 62 63
    def assert_cluster
        info = @r.info
        if !info["cluster_enabled"] || info["cluster_enabled"].to_i == 0
            puts "Error: Node #{self} is not configured as a cluster node."
            exit 1
        end
    end

64 65 66 67 68 69 70 71
    def assert_empty
        if !(@r.cluster("info").split("\r\n").index("cluster_known_nodes:1")) ||
            (@r.info['db0'])
            puts "Error: Node #{self} is not empty. Either the node already knows other nodes (check with nodes-info) or contains some key in database 0."
            exit 1
        end
    end

72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
    def load_info(o={})
        self.connect
        nodes = @r.cluster("nodes").split("\n")
        nodes.each{|n|
            # name addr flags role ping_sent ping_recv link_status slots
            name,addr,flags,role,ping_sent,ping_recv,link_status,slots = n.split(" ")
            info = {
                :name => name,
                :addr => addr,
                :flags => flags.split(","),
                :role => role,
                :ping_sent => ping_sent.to_i,
                :ping_recv => ping_recv.to_i,
                :link_status => link_status
            }
            if info[:flags].index("myself")
88 89
                @info = @info.merge(info)
                @info[:slots] = {}
90 91 92 93 94 95 96
                slots.split(",").each{|s|
                    if s.index("-")
                        start,stop = s.split("-")
                        self.add_slots((start.to_i)..(stop.to_i))
                    else
                        self.add_slots((s.to_i)..(s.to_i))
                    end
97
                } if slots
98
                @dirty = false
99 100 101
                @r.cluster("info").split("\n").each{|e|    
                    k,v=e.split(":")
                    k = k.to_sym
102
                    v.chop!
103 104 105 106 107 108
                    if k != :cluster_state
                        @info[k] = v.to_i
                    else
                        @info[k] = v
                    end
                }
109 110 111 112 113 114
            elsif o[:getfriends]
                @friends << info
            end
        }
    end

115 116
    def add_slots(slots)
        slots.each{|s|
117
            @info[:slots][s] = :new
118 119 120 121 122 123 124
        }
        @dirty = true
    end

    def flush_node_config
        return if !@dirty
        new = []
125
        @info[:slots].each{|s,val|
126 127
            if val == :new
                new << s
128
                @info[:slots][s] = true
129 130 131 132 133 134
            end
        }
        @r.cluster("addslots",*new)
        @dirty = false
    end

135
    def info_string
136
        # We want to display the hash slots assigned to this node
A
antirez 已提交
137
        # as ranges, like in: "1-5,8-9,20-25,30"
138 139 140 141 142 143
        #
        # Note: this could be easily written without side effects,
        # we use 'slots' just to split the computation into steps.
        
        # First step: we want an increasing array of integers
        # for instance: [1,2,3,4,5,8,9,20,21,22,23,24,25,30]
144
        slots = @info[:slots].keys.sort
145 146 147 148

        # As we want to aggregate adiacent slots we convert all the
        # slot integers into ranges (with just one element)
        # So we have something like [1..1,2..2, ... and so forth.
A
antirez 已提交
149
        slots.map!{|x| x..x}
150 151 152 153 154

        # Finally we group ranges with adiacent elements.
        slots = slots.reduce([]) {|a,b|
            if !a.empty? && b.first == (a[-1].last)+1
                a[0..-2] + [(a[-1].first)..(b.last)]
155
            else
156
                a + [b]
157
            end
158 159 160 161 162 163 164
        }

        # Now our task is easy, we just convert ranges with just one
        # element into a number, and a real range into a start-end format.
        # Finally we join the array using the comma as separator.
        slots = slots.map{|x|
            x.count == 1 ? x.first.to_s : "#{x.first}-#{x.last}"
165
        }.join(",")
166

167
        "[#{@info[:cluster_state].upcase}] #{self.info[:name]} #{self.to_s.ljust(25)} slots:#{slots}"
168
    end
169 170

    def info
171
        @info
172
    end
173 174 175 176 177
    
    def is_dirty?
        @dirty
    end

178 179 180 181 182 183
    def r
        @r
    end
end

class RedisTrib
184 185 186 187
    def initialize
        @nodes = []
    end

188 189 190 191 192 193 194 195
    def check_arity(req_args, num_args)
        if ((req_args > 0 and num_args != req_args) ||
           (req_args < 0 and num_args < req_args.abs))
           puts "Wrong number of arguments for specified sub command"
           exit 1
        end
    end

196 197 198 199
    def add_node(node)
        @nodes << node
    end

200 201 202 203 204 205 206
    def get_node_by_name(name)
        @nodes.each{|n|
            return n if n.info[:name] == name.downcase
        }
        return nil
    end

207
    def check_cluster
208
        puts "Performing Cluster Check (using node #{@nodes[0]})"
209
        errors = []
210
        show_nodes
211 212 213 214 215 216 217 218
        # Check if all the slots are covered
        slots = {}
        @nodes.each{|n|
            slots = slots.merge(n.slots)
        }
        if slots.length == 4096
            puts "[OK] All 4096 slots covered."
        else
219 220
            errors << "[ERR] Not all 4096 slots are covered by nodes."
            puts errors[-1]
221
        end
222
        return errors
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
    end

    def alloc_slots
        slots_per_node = ClusterHashSlots/@nodes.length
        i = 0
        @nodes.each{|n|
            first = i*slots_per_node
            last = first+slots_per_node-1
            last = ClusterHashSlots-1 if i == @nodes.length-1
            n.add_slots first..last
            i += 1
        }
    end

    def flush_nodes_config
        @nodes.each{|n|
            n.flush_node_config
        }
    end

    def show_nodes
        @nodes.each{|n|
245
            puts n.info_string
246 247 248 249
        }
    end

    def join_cluster
250 251 252 253 254 255 256 257 258 259
        # We use a brute force approach to make sure the node will meet
        # each other, that is, sending CLUSTER MEET messages to all the nodes
        # about the very same node.
        # Thanks to gossip this information should propagate across all the
        # cluster in a matter of seconds.
        first = false
        @nodes.each{|n|
            if !first then first = n.info; next; end # Skip the first node
            n.r.cluster("meet",first[:host],first[:port])
        }
260 261 262 263 264 265 266 267 268
    end

    def yes_or_die(msg)
        print "#{msg} (type 'yes' to accept): "
        STDOUT.flush
        if !(STDIN.gets.chomp.downcase == "yes")
            puts "Aborting..."
            exit 1
        end
269
    end
270

271
    def load_cluster_info_from_node(nodeaddr)
272 273 274
        node = ClusterNode.new(ARGV[1])
        node.connect(:abort => true)
        node.assert_cluster
275
        node.load_info(:getfriends => true)
276
        add_node(node)
277 278 279 280 281 282
        node.friends.each{|f|
            fnode = ClusterNode.new(f[:addr])
            fnode.connect()
            fnode.load_info()
            add_node(fnode)
        }
283 284 285 286 287 288
    end

    # redis-trib subcommands implementations

    def check_cluster_cmd   
        load_cluster_info_from_node(ARGV[1])
289 290 291
        check_cluster
    end

292 293 294 295 296 297 298 299 300 301 302 303 304 305
    def reshard_cluster_cmd
        load_cluster_info_from_node(ARGV[1])
        errors = check_cluster
        if errors.length != 0
            puts "Please fix your cluster problems before resharding."
            exit 1
        end
        many = 0
        while many <= 0 or many > 4096
            print "How many slots do you want to move? "
            many = STDIN.gets.to_i
        end
    end

306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
    def create_cluster_cmd
        puts "Creating cluster"
        ARGV[1..-1].each{|n|
            node = ClusterNode.new(n)
            node.connect(:abort => true)
            node.assert_cluster
            node.assert_empty
            add_node(node)
        }
        puts "Performing hash slots allocation on #{@nodes.length} nodes..."
        alloc_slots
        show_nodes
        yes_or_die "Can I set the above configuration?"
        flush_nodes_config
        puts "** Nodes configuration updated"
        puts "** Sending CLUSTER MEET messages to join the cluster"
        join_cluster
        check_cluster
    end
325 326 327
end

COMMANDS={
328
    "create" => ["create_cluster_cmd", -2, "host1:port host2:port ... hostN:port"],
329 330
    "check" =>  ["check_cluster_cmd", 2, "host:port"],
    "reshard" =>  ["reshard_cluster_cmd", 2, "host:port"]
331 332 333 334 335
}

# Sanity check
if ARGV.length == 0
    puts "Usage: redis-trib <command> <arguments ...>"
A
antirez 已提交
336 337 338 339 340
    puts
    COMMANDS.each{|k,v|
        puts "  #{k.ljust(20)} #{v[2]}"
    }
    puts
341 342 343 344 345 346 347 348 349 350 351 352 353
    exit 1
end

rt = RedisTrib.new
cmd_spec = COMMANDS[ARGV[0].downcase]
if !cmd_spec
    puts "Unknown redis-trib subcommand '#{ARGV[0]}'"
    exit 1
end
rt.check_arity(cmd_spec[1],ARGV.length)

# Dispatch
rt.send(cmd_spec[0])