import os import sys if __name__ == '__main__': sys.path.append(os.path.dirname(os.path.dirname( os.path.abspath(__file__)))) import re import logging import time import copy import threading import queue import enum from kubernetes import client, config, watch logger = logging.getLogger(__name__) INCLUSTER_NAMESPACE_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/namespace' class EventType(enum.Enum): PodHeartBeat = 1 Watch = 2 class K8SMixin: def __init__(self, namespace, in_cluster=False, **kwargs): self.namespace = namespace self.in_cluster = in_cluster self.kwargs = kwargs self.v1 = kwargs.get('v1', None) if not self.namespace: self.namespace = open(INCLUSTER_NAMESPACE_PATH).read() if not self.v1: config.load_incluster_config( ) if self.in_cluster else config.load_kube_config() self.v1 = client.CoreV1Api() class K8SHeartbeatHandler(threading.Thread, K8SMixin): name = 'kubernetes' def __init__(self, message_queue, namespace, label_selector, in_cluster=False, **kwargs): K8SMixin.__init__(self, namespace=namespace, in_cluster=in_cluster, **kwargs) threading.Thread.__init__(self) self.queue = message_queue self.terminate = False self.label_selector = label_selector self.poll_interval = kwargs.get('poll_interval', 5) def run(self): while not self.terminate: try: pods = self.v1.list_namespaced_pod( namespace=self.namespace, label_selector=self.label_selector) event_message = {'eType': EventType.PodHeartBeat, 'events': []} for item in pods.items: pod = self.v1.read_namespaced_pod(name=item.metadata.name, namespace=self.namespace) name = pod.metadata.name ip = pod.status.pod_ip phase = pod.status.phase reason = pod.status.reason message = pod.status.message ready = True if phase == 'Running' else False pod_event = dict(pod=name, ip=ip, ready=ready, reason=reason, message=message) event_message['events'].append(pod_event) self.queue.put(event_message) except Exception as exc: logger.error(exc) time.sleep(self.poll_interval) def stop(self): self.terminate = True class K8SEventListener(threading.Thread, K8SMixin): def __init__(self, message_queue, namespace, in_cluster=False, **kwargs): K8SMixin.__init__(self, namespace=namespace, in_cluster=in_cluster, **kwargs) threading.Thread.__init__(self) self.queue = message_queue self.terminate = False self.at_start_up = True self._stop_event = threading.Event() def stop(self): self.terminate = True self._stop_event.set() def run(self): resource_version = '' w = watch.Watch() for event in w.stream(self.v1.list_namespaced_event, namespace=self.namespace, field_selector='involvedObject.kind=Pod'): if self.terminate: break resource_version = int(event['object'].metadata.resource_version) info = dict( eType=EventType.Watch, pod=event['object'].involved_object.name, reason=event['object'].reason, message=event['object'].message, start_up=self.at_start_up, ) self.at_start_up = False # logger.info('Received event: {}'.format(info)) self.queue.put(info) class EventHandler(threading.Thread): def __init__(self, mgr, message_queue, namespace, pod_patt, **kwargs): threading.Thread.__init__(self) self.mgr = mgr self.queue = message_queue self.kwargs = kwargs self.terminate = False self.pod_patt = re.compile(pod_patt) self.namespace = namespace def stop(self): self.terminate = True def on_drop(self, event, **kwargs): pass def on_pod_started(self, event, **kwargs): try_cnt = 3 pod = None while try_cnt > 0: try_cnt -= 1 try: pod = self.mgr.v1.read_namespaced_pod(name=event['pod'], namespace=self.namespace) if not pod.status.pod_ip: time.sleep(0.5) continue break except client.rest.ApiException as exc: time.sleep(0.5) if try_cnt <= 0 and not pod: if not event['start_up']: logger.error('Pod {} is started but cannot read pod'.format( event['pod'])) return elif try_cnt <= 0 and not pod.status.pod_ip: logger.warning('NoPodIPFoundError') return logger.info('Register POD {} with IP {}'.format( pod.metadata.name, pod.status.pod_ip)) self.mgr.add_pod(name=pod.metadata.name, ip=pod.status.pod_ip) def on_pod_killing(self, event, **kwargs): logger.info('Unregister POD {}'.format(event['pod'])) self.mgr.delete_pod(name=event['pod']) def on_pod_heartbeat(self, event, **kwargs): names = self.mgr.conn_mgr.conn_names running_names = set() for each_event in event['events']: if each_event['ready']: self.mgr.add_pod(name=each_event['pod'], ip=each_event['ip']) running_names.add(each_event['pod']) else: self.mgr.delete_pod(name=each_event['pod']) to_delete = names - running_names for name in to_delete: self.mgr.delete_pod(name) logger.info(self.mgr.conn_mgr.conn_names) def handle_event(self, event): if event['eType'] == EventType.PodHeartBeat: return self.on_pod_heartbeat(event) if not event or (event['reason'] not in ('Started', 'Killing')): return self.on_drop(event) if not re.match(self.pod_patt, event['pod']): return self.on_drop(event) logger.info('Handling event: {}'.format(event)) if event['reason'] == 'Started': return self.on_pod_started(event) return self.on_pod_killing(event) def run(self): while not self.terminate: try: event = self.queue.get(timeout=1) self.handle_event(event) except queue.Empty: continue class KubernetesProviderSettings: def __init__(self, namespace, pod_patt, label_selector, in_cluster, poll_interval, port=None, **kwargs): self.namespace = namespace self.pod_patt = pod_patt self.label_selector = label_selector self.in_cluster = in_cluster self.poll_interval = poll_interval self.port = int(port) if port else 19530 class KubernetesProvider(object): name = 'kubernetes' def __init__(self, plugin_config, conn_mgr, **kwargs): self.namespace = plugin_config.DISCOVERY_KUBERNETES_NAMESPACE self.pod_patt = plugin_config.DISCOVERY_KUBERNETES_POD_PATT self.label_selector = plugin_config.DISCOVERY_KUBERNETES_LABEL_SELECTOR self.in_cluster = plugin_config.DISCOVERY_KUBERNETES_IN_CLUSTER.lower() self.in_cluster = self.in_cluster == 'true' self.poll_interval = plugin_config.DISCOVERY_KUBERNETES_POLL_INTERVAL self.poll_interval = int(self.poll_interval) if self.poll_interval else 5 self.port = plugin_config.DISCOVERY_KUBERNETES_PORT self.port = int(self.port) if self.port else 19530 self.kwargs = kwargs self.queue = queue.Queue() self.conn_mgr = conn_mgr if not self.namespace: self.namespace = open(incluster_namespace_path).read() config.load_incluster_config( ) if self.in_cluster else config.load_kube_config() self.v1 = client.CoreV1Api() self.listener = K8SEventListener(message_queue=self.queue, namespace=self.namespace, in_cluster=self.in_cluster, v1=self.v1, **kwargs) self.pod_heartbeater = K8SHeartbeatHandler( message_queue=self.queue, namespace=self.namespace, label_selector=self.label_selector, in_cluster=self.in_cluster, v1=self.v1, poll_interval=self.poll_interval, **kwargs) self.event_handler = EventHandler(mgr=self, message_queue=self.queue, namespace=self.namespace, pod_patt=self.pod_patt, **kwargs) def add_pod(self, name, ip): self.conn_mgr.register(name, 'tcp://{}:{}'.format(ip, self.port)) def delete_pod(self, name): self.conn_mgr.unregister(name) def start(self): self.listener.daemon = True self.listener.start() self.event_handler.start() self.pod_heartbeater.start() def stop(self): self.listener.stop() self.pod_heartbeater.stop() self.event_handler.stop() @classmethod def Create(cls, conn_mgr, plugin_config, **kwargs): discovery = cls(plugin_config=plugin_config, conn_mgr=conn_mgr, **kwargs) return discovery def setup(app): logger.info('Plugin \'{}\' Installed In Package: {}'.format(__file__, app.plugin_package_name)) app.on_plugin_setup(KubernetesProvider) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname( os.path.abspath(__file__)))))) sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname( os.path.abspath(__file__))))) class Connect: def register(self, name, value): logger.error('Register: {} - {}'.format(name, value)) def unregister(self, name): logger.error('Unregister: {}'.format(name)) @property def conn_names(self): return set() connect_mgr = Connect() from discovery import DiscoveryConfig settings = DiscoveryConfig(DISCOVERY_KUBERNETES_NAMESPACE='xp', DISCOVERY_KUBERNETES_POD_PATT=".*-ro-servers-.*", DISCOVERY_KUBERNETES_LABEL_SELECTOR='tier=ro-servers', DISCOVERY_KUBERNETES_POLL_INTERVAL=5, DISCOVERY_KUBERNETES_IN_CLUSTER=False) provider_class = KubernetesProvider t = provider_class(conn_mgr=connect_mgr, plugin_config=settings) t.start() cnt = 100 while cnt > 0: time.sleep(2) cnt -= 1 t.stop()