vector_engine.py 9.2 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8
from engine.model.group_table import GroupTable
from engine.model.file_table import FileTable
from engine.controller.raw_file_handler import RawFileHandler
from engine.controller.group_handler import GroupHandler
from engine.controller.index_file_handler import IndexFileHandler
from engine.settings import ROW_LIMIT
from flask import jsonify
from engine import db
X
xj.lin 已提交
9 10
from engine.ingestion import build_index
from engine.controller.scheduler import Scheduler
X
xj.lin 已提交
11
from engine.ingestion import serialize
J
jinhai 已提交
12 13 14
import sys, os

class VectorEngine(object):
J
jinhai 已提交
15 16
    group_vector_dict = None
    group_vector_id_dict = None
J
jinhai 已提交
17 18 19
    SUCCESS_CODE = 0
    FAULT_CODE = 1
    GROUP_NOT_EXIST = 2
J
jinhai 已提交
20 21 22 23 24 25

    @staticmethod
    def AddGroup(group_id, dimension):
        group = GroupTable.query.filter(GroupTable.group_name==group_id).first()
        if group:
            print('Already create the group: ', group_id)
J
jinhai 已提交
26 27
            return VectorEngine.FAULT_CODE, group_id, group.file_number
            # return jsonify({'code': 1, 'group_name': group_id, 'file_number': group.file_number})
J
jinhai 已提交
28 29 30 31 32 33 34 35
        else:
            print('To create the group: ', group_id)
            new_group = GroupTable(group_id, dimension)
            GroupHandler.CreateGroupDirectory(group_id)

            # add into database
            db.session.add(new_group)
            db.session.commit()
J
jinhai 已提交
36
            return VectorEngine.SUCCESS_CODE, group_id, 0
J
jinhai 已提交
37 38 39 40 41 42


    @staticmethod
    def GetGroup(group_id):
        group = GroupTable.query.filter(GroupTable.group_name==group_id).first()
        if group:
J
jinhai 已提交
43
            return VectorEngine.SUCCESS_CODE, group_id, group.file_number
J
jinhai 已提交
44
        else:
J
jinhai 已提交
45
            return VectorEngine.FAULT_CODE, group_id, 0
J
jinhai 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62


    @staticmethod
    def DeleteGroup(group_id):
        group = GroupTable.query.filter(GroupTable.group_name==group_id).first()
        if(group):
            # old_group = GroupTable(group_id)
            db.session.delete(group)
            db.session.commit()
            GroupHandler.DeleteGroupDirectory(group_id)

            records = FileTable.query.filter(FileTable.group_name == group_id).all()
            for record in records:
                print("record.group_name: ", record.group_name)
                db.session.delete(record)
            db.session.commit()

J
jinhai 已提交
63
            return VectorEngine.SUCCESS_CODE, group_id, group.file_number
J
jinhai 已提交
64
        else:
J
jinhai 已提交
65
            return VectorEngine.SUCCESS_CODE, group_id, 0
J
jinhai 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78


    @staticmethod
    def GetGroupList():
        group = GroupTable.query.all()
        group_list = []
        for group_tuple in group:
            group_item = {}
            group_item['group_name'] = group_tuple.group_name
            group_item['file_number'] = group_tuple.file_number
            group_list.append(group_item)

        print(group_list)
J
jinhai 已提交
79
        return VectorEngine.SUCCESS_CODE, group_list
J
jinhai 已提交
80 81 82 83 84


    @staticmethod
    def AddVector(group_id, vector):
        print(group_id, vector)
J
jinhai 已提交
85 86
        code, _, _ = VectorEngine.GetGroup(group_id)
        if code == VectorEngine.FAULT_CODE:
J
jinhai 已提交
87
            return VectorEngine.GROUP_NOT_EXIST, 'invalid'
J
jinhai 已提交
88

J
jinhai 已提交
89
        file = FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').first()
X
xj.lin 已提交
90
        group = GroupTable.query.filter(GroupTable.group_name == group_id).first()
J
jinhai 已提交
91 92
        if file:
            print('insert into exist file')
J
jinhai 已提交
93
            # create vector id
J
jinhai 已提交
94
            vector_id = file.seq_no + 1
J
jinhai 已提交
95
            # insert into raw file
J
jinhai 已提交
96
            VectorEngine.InsertVectorIntoRawFile(group_id, file.filename, vector, vector_id)
J
jinhai 已提交
97 98 99

            # check if the file can be indexed
            if file.row_number + 1 >= ROW_LIMIT:
J
jinhai 已提交
100
                raw_vector_array, raw_vector_id_array = VectorEngine.GetVectorListFromRawFile(group_id)
X
xj.lin 已提交
101
                d = group.dimension
J
jinhai 已提交
102 103

                # create index
X
xj.lin 已提交
104
                index_builder = build_index.FactoryIndex()
J
jinhai 已提交
105
                index = index_builder().build(d, raw_vector_array, raw_vector_id_array)
X
xj.lin 已提交
106 107

                # TODO(jinhai): store index into Cache
J
jinhai 已提交
108
                index_filename = file.filename + '_index'
X
xj.lin 已提交
109
                serialize.write_index(file_name=index_filename, index=index)
J
jinhai 已提交
110

X
xj.lin 已提交
111 112
                FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').update({'row_number':file.row_number + 1,
                                                                                                                 'type': 'index',
J
jinhai 已提交
113 114
                                                                                                                 'filename': index_filename,
                                                                                                                 'seq_no': file.seq_no + 1})
X
xj.lin 已提交
115 116
                db.session.commit()
                VectorEngine.group_dict = None
J
jinhai 已提交
117 118
            else:
                # we still can insert into exist raw file, update database
J
jinhai 已提交
119 120
                FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').update({'row_number':file.row_number + 1, 
                                                                                                                 'seq_no': file.seq_no + 1})
J
jinhai 已提交
121 122 123 124 125 126 127 128
                db.session.commit()
                print('Update db for raw file insertion')
                pass

        else:
            print('add a new raw file')
            # first raw file
            raw_filename = group_id + '.raw'
J
jinhai 已提交
129
            # create vector id
J
jinhai 已提交
130
            vector_id = 0
J
jinhai 已提交
131
            # create and insert vector into raw file
J
jinhai 已提交
132
            VectorEngine.InsertVectorIntoRawFile(group_id, raw_filename, vector, vector_id)
J
jinhai 已提交
133 134 135 136
            # insert a record into database
            db.session.add(FileTable(group_id, raw_filename, 'raw', 1))
            db.session.commit()

J
jinhai 已提交
137 138
        vector_id_str = group_id + '.' + str(vector_id)
        return VectorEngine.SUCCESS_CODE, vector_id_str
J
jinhai 已提交
139 140 141 142


    @staticmethod
    def SearchVector(group_id, vector, limit):
J
jinhai 已提交
143 144 145
        # Check the group exist
        code, _, _ = VectorEngine.GetGroup(group_id)
        if code == VectorEngine.FAULT_CODE:
J
jinhai 已提交
146
            return VectorEngine.GROUP_NOT_EXIST, {}
J
jinhai 已提交
147

X
xj.lin 已提交
148
        group = GroupTable.query.filter(GroupTable.group_name == group_id).first()
J
jinhai 已提交
149 150
        # find all files
        files = FileTable.query.filter(FileTable.group_name == group_id).all()
X
xj.lin 已提交
151
        index_keys = [ i.filename for i in files if i.type == 'index' ]
J
jinhai 已提交
152
        index_map = {}
X
xj.lin 已提交
153
        index_map['index'] = index_keys
J
jinhai 已提交
154
        index_map['raw'], index_map['raw_id'] = VectorEngine.GetVectorListFromRawFile(group_id, "fakename") #TODO: pass by key, get from storage
X
xj.lin 已提交
155
        index_map['dimension'] = group.dimension
J
jinhai 已提交
156

J
jinhai 已提交
157
        scheduler_instance = Scheduler()
X
xj.lin 已提交
158 159
        vectors = []
        vectors.append(vector)
X
xj.lin 已提交
160
        result = scheduler_instance.search(index_map, vectors, limit)
J
jinhai 已提交
161

J
jinhai 已提交
162
        vector_id = [0]
J
jinhai 已提交
163 164 165
        vector_ids_str = []
        for int_id in vector_id:
            vector_ids_str.append(group_id + '.' + str(int_id))
J
jinhai 已提交
166

J
jinhai 已提交
167
        return VectorEngine.SUCCESS_CODE, vector_ids_str
J
jinhai 已提交
168 169 170 171


    @staticmethod
    def CreateIndex(group_id):
J
jinhai 已提交
172 173 174 175 176
        # Check the group exist
        code, _, _ = VectorEngine.GetGroup(group_id)
        if code == VectorEngine.FAULT_CODE:
            return VectorEngine.GROUP_NOT_EXIST

J
jinhai 已提交
177 178 179 180
        # create index
        file = FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').first()
        path = GroupHandler.GetGroupDirectory(group_id) + '/' + file.filename 
        print('Going to create index for: ', path)
J
jinhai 已提交
181
        return VectorEngine.SUCCESS_CODE
J
jinhai 已提交
182 183 184


    @staticmethod
J
jinhai 已提交
185
    def InsertVectorIntoRawFile(group_id, filename, vector, vector_id):
J
jinhai 已提交
186 187
        # print(sys._getframe().f_code.co_name, group_id, vector)
        # path = GroupHandler.GetGroupDirectory(group_id) + '/' + filename
J
jinhai 已提交
188 189 190 191 192 193 194 195 196
        if VectorEngine.group_vector_dict is None:
            # print("VectorEngine.group_vector_dict is None")
            VectorEngine.group_vector_dict = dict()

        if VectorEngine.group_vector_id_dict is None:
            VectorEngine.group_vector_id_dict = dict()

        if not (group_id in VectorEngine.group_vector_dict):
            VectorEngine.group_vector_dict[group_id] = []
J
jinhai 已提交
197

J
jinhai 已提交
198 199
        if not (group_id in VectorEngine.group_vector_id_dict):
            VectorEngine.group_vector_id_dict[group_id] = []
J
jinhai 已提交
200

J
jinhai 已提交
201 202
        VectorEngine.group_vector_dict[group_id].append(vector)
        VectorEngine.group_vector_id_dict[group_id].append(vector_id)
J
jinhai 已提交
203

J
jinhai 已提交
204
        print('InsertVectorIntoRawFile: ', VectorEngine.group_vector_dict[group_id], VectorEngine.group_vector_id_dict[group_id])
X
xj.lin 已提交
205
        print("cache size: ", len(VectorEngine.group_vector_dict[group_id]))
J
jinhai 已提交
206 207 208 209 210

        return filename


    @staticmethod
X
xj.lin 已提交
211
    def GetVectorListFromRawFile(group_id, filename="todo"):
J
jinhai 已提交
212 213 214
        print("GetVectorListFromRawFile, vectors: ", serialize.to_array(VectorEngine.group_vector_dict[group_id]))
        print("GetVectorListFromRawFile, vector_ids: ", serialize.to_int_array(VectorEngine.group_vector_id_dict[group_id]))
        return serialize.to_array(VectorEngine.group_vector_dict[group_id]), serialize.to_int_array(VectorEngine.group_vector_id_dict[group_id])
J
jinhai 已提交
215

J
jinhai 已提交
216 217
    @staticmethod
    def ClearRawFile(group_id):
J
jinhai 已提交
218 219 220
        print("VectorEngine.group_vector_dict: ", VectorEngine.group_vector_dict)
        del VectorEngine.group_vector_dict[group_id]
        del VectorEngine.group_vector_id_dict[group_id]
J
jinhai 已提交
221 222
        return VectorEngine.SUCCESS_CODE