file_based_hash_ring_router.py 3.0 KB
Newer Older
P
peng.xu 已提交
1 2
import logging
from sqlalchemy import exc as sqlalchemy_exc
3
from sqlalchemy import and_, or_
P
peng.xu 已提交
4 5
from mishards.models import Tables
from mishards.router import RouterMixin
P
peng.xu 已提交
6 7 8 9 10 11
from mishards import exceptions, db
from mishards.hash_ring import HashRing

logger = logging.getLogger(__name__)


P
peng.xu 已提交
12 13
class Factory(RouterMixin):
    name = 'FileBasedHashRingRouter'
P
peng.xu 已提交
14

15 16 17
    def __init__(self, writable_topo, readonly_topo, **kwargs):
        super(Factory, self).__init__(writable_topo=writable_topo,
                readonly_topo=readonly_topo)
P
peng.xu 已提交
18

19
    def routing(self, table_name, partition_tags=None, metadata=None, **kwargs):
P
peng.xu 已提交
20
        range_array = kwargs.pop('range_array', None)
21
        return self._route(table_name, range_array, partition_tags, metadata, **kwargs)
P
peng.xu 已提交
22

23
    def _route(self, table_name, range_array, partition_tags=None, metadata=None, **kwargs):
P
peng.xu 已提交
24 25
        # PXU TODO: Implement Thread-local Context
        # PXU TODO: Session life mgt
26 27 28 29 30 31 32 33 34

        if not partition_tags:
            cond = and_(
                        or_(Tables.table_id == table_name, Tables.owner_table == table_name),
                        Tables.state != Tables.TO_DELETE)
        else:
            cond = and_(Tables.state != Tables.TO_DELETE,
                        Tables.owner_table == table_name,
                        Tables.partition_tag.in_(partition_tags))
P
peng.xu 已提交
35
        try:
36
            tables = db.Session.query(Tables).filter(cond).all()
P
peng.xu 已提交
37 38 39
        except sqlalchemy_exc.SQLAlchemyError as e:
            raise exceptions.DBError(message=str(e), metadata=metadata)

40 41 42 43 44 45 46 47
        if not tables:
            raise exceptions.TableNotFoundError('{}:{}'.format(table_name, partition_tags), metadata=metadata)

        total_files = []
        for table in tables:
            files = table.files_to_search(range_array)
            total_files.append(files)

P
peng.xu 已提交
48 49
        db.remove_session()

50
        servers = self.readonly_topo.group_names
P
peng.xu 已提交
51 52 53 54 55 56
        logger.info('Available servers: {}'.format(servers))

        ring = HashRing(servers)

        routing = {}

57 58 59 60 61 62 63
        for files in total_files:
            for f in files:
                target_host = ring.get_node(str(f.id))
                sub = routing.get(target_host, None)
                if not sub:
                    routing[target_host] = {'table_id': f.table_id, 'file_ids': []}
                routing[target_host]['file_ids'].append(str(f.id))
P
peng.xu 已提交
64 65

        return routing
P
peng.xu 已提交
66 67

    @classmethod
P
peng.xu 已提交
68
    def Create(cls, **kwargs):
69 70 71 72 73 74 75
        writable_topo = kwargs.pop('writable_topo', None)
        if not writable_topo:
            raise RuntimeError('Cannot find \'writable_topo\' to initialize \'{}\''.format(self.name))
        readonly_topo = kwargs.pop('readonly_topo', None)
        if not readonly_topo:
            raise RuntimeError('Cannot find \'readonly_topo\' to initialize \'{}\''.format(self.name))
        router = cls(writable_topo=writable_topo, readonly_topo=readonly_topo, **kwargs)
P
peng.xu 已提交
76 77 78 79 80 81
        return router


def setup(app):
    logger.info('Plugin \'{}\' Installed In Package: {}'.format(__file__, app.plugin_package_name))
    app.on_plugin_setup(Factory)