static_provider.py 2.5 KB
Newer Older
P
peng.xu 已提交
1 2 3 4 5 6 7
import os
import sys
if __name__ == '__main__':
    sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import logging
import socket
P
peng.xu 已提交
8
from environs import Env
9 10
from mishards.exceptions import ConnectionConnectError
from mishards.topology import StatusType
P
peng.xu 已提交
11 12

logger = logging.getLogger(__name__)
P
peng.xu 已提交
13
env = Env()
P
peng.xu 已提交
14

15 16 17 18 19 20 21 22 23 24 25
DELIMITER = ':'

def parse_host(addr):
    splited_arr = addr.split(DELIMITER)
    return splited_arr

def resolve_address(addr, default_port):
    addr_arr = parse_host(addr)
    assert len(addr_arr) >= 1 and len(addr_arr) <= 2, 'Invalid Addr: {}'.format(addr)
    port = addr_arr[1] if len(addr_arr) == 2 else default_port
    return '{}:{}'.format(socket.gethostbyname(addr_arr[0]), port)
P
peng.xu 已提交
26 27 28 29

class StaticDiscovery(object):
    name = 'static'

30 31
    def __init__(self, config, readonly_topo, **kwargs):
        self.readonly_topo = readonly_topo
P
peng.xu 已提交
32 33
        hosts = env.list('DISCOVERY_STATIC_HOSTS', [])
        self.port = env.int('DISCOVERY_STATIC_PORT', 19530)
34
        self.hosts = [resolve_address(host, self.port) for host in hosts]
P
peng.xu 已提交
35 36

    def start(self):
37
        ok = True
P
peng.xu 已提交
38
        for host in self.hosts:
39 40 41 42 43 44
            ok &= self.add_pod(host, host)
            if not ok: break
        if ok and len(self.hosts) == 0:
            logger.error('No address is specified')
            ok = False
        return ok
P
peng.xu 已提交
45 46 47 48 49

    def stop(self):
        for host in self.hosts:
            self.delete_pod(host)

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
    def add_pod(self, name, addr):
        ok = True
        status = StatusType.OK
        try:
            uri = 'tcp://{}'.format(addr)
            status, group = self.readonly_topo.create(name=name)
            if status == StatusType.OK:
                status, pool = group.create(name=name, uri=uri)
            if status not in (StatusType.OK, StatusType.DUPLICATED):
                ok = False
        except ConnectionConnectError as exc:
            ok = False
            logger.error('Connection error to: {}'.format(addr))

        if ok and status == StatusType.OK:
            logger.info('StaticDiscovery Add Static Group \"{}\" Of 1 Address: {}'.format(name, addr))
        return ok
P
peng.xu 已提交
67 68

    def delete_pod(self, name):
69 70
        pool = self.readonly_topo.delete_group(name)
        return True
P
peng.xu 已提交
71 72

    @classmethod
73 74
    def Create(cls, readonly_topo, plugin_config, **kwargs):
        discovery = cls(config=plugin_config, readonly_topo=readonly_topo, **kwargs)
P
peng.xu 已提交
75 76 77 78 79 80
        return discovery


def setup(app):
    logger.info('Plugin \'{}\' Installed In Package: {}'.format(__file__, app.plugin_package_name))
    app.on_plugin_setup(StaticDiscovery)