diff --git a/pyengine/engine/controller/scheduler.py b/pyengine/engine/controller/scheduler.py new file mode 100644 index 0000000000000000000000000000000000000000..9c7583c069515ebcf0327ebacf7deb1c6df718f2 --- /dev/null +++ b/pyengine/engine/controller/scheduler.py @@ -0,0 +1,50 @@ +from engine.retrieval import search_index +from engine.ingestion import build_index + +class Singleton(type): + _instances = {} + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +class Scheduler(metaclass=Singleton): + def Search(self, index_file_key, vectors, k): + assert index_file_key + assert vectors + assert k + + return self.__scheduler(index_file_key, vectors, k) + + + def __scheduler(self, index_data_key, vectors, k): + result_list = [] + + raw_data_list = index_data_key['raw'] + index_data_list = index_data_key['index'] + + for key in raw_data_list: + raw_data, d = self.GetRawData(key) + index_builder = build_index.FactoryIndex() + index = index_builder().build(d, raw_data) + searcher = search_index.FaissSearch(index) # silly + result_list.append(searcher.search_by_vectors(vectors, k)) + + for key in index_data_list: + index = self.GetIndexData(key) + searcher = search_index.FaissSearch(index) + result_list.append(searcher.search_by_vectors(vectors, k)) + + if len(result_list) == 1: + return result_list[0].vectors + + result = search_index.top_k(sum(result_list), k) + return result + + + def GetIndexData(self, key): + pass + + def GetRawData(self, key): + pass diff --git a/pyengine/engine/controller/vector_engine.py b/pyengine/engine/controller/vector_engine.py index 7c4e227f450bd4b1fc107ade0478194fc31bdcfb..b169ae551e293eb42d064e54dff758dcdccf96a2 100644 --- a/pyengine/engine/controller/vector_engine.py +++ b/pyengine/engine/controller/vector_engine.py @@ -6,6 +6,8 @@ from engine.controller.index_file_handler import IndexFileHandler from engine.settings import ROW_LIMIT from flask import jsonify from engine import db +from engine.ingestion import build_index +from engine.controller.scheduler import Scheduler import sys, os class VectorEngine(object): @@ -83,6 +85,7 @@ class VectorEngine(object): return VectorEngine.GROUP_NOT_EXIST file = FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').first() + group = GroupTable.query.filter(GroupTable.group_name == group_id).first() if file: print('insert into exist file') # insert into raw file @@ -90,14 +93,17 @@ class VectorEngine(object): # check if the file can be indexed if file.row_number + 1 >= ROW_LIMIT: - # read data from raw file - data = GetVectorsFromRawFile() + raw_data = GetVectorListFromRawFile(group_id) + d = group.dimension # create index + index_builder = build_index.FactoryIndex() + index = index_builder().build(d, raw_data) + + # TODO(jinhai): store index into Cache index_filename = file.filename + '_index' - CreateIndex(group_id, index_filename, data) - # update record into database + # TODO(jinhai): Update raw_file_name => index_file_name FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').update({'row_number':file.row_number + 1, 'type': 'index'}) pass @@ -130,22 +136,15 @@ class VectorEngine(object): # find all files files = FileTable.query.filter(FileTable.group_name == group_id).all() + raw_keys = [ i.filename for i in files if i.type == 'raw' ] + index_keys = [ i.filename for i in files if i.type == 'index' ] + index_map = {} + index_map['raw'] = raw_keys + index_map['index'] = index_keys # {raw:[key1, key2], index:[key3, key4]} - for file in files: - if(file.type == 'raw'): - # create index - # add vector list - # train - # get topk - print('search in raw file: ', file.filename) - else: - # get topk - print('search in index file: ', file.filename) - data = IndexFileHandler.Read(file.filename, file.type) + scheduler_instance = Scheduler() + result = scheduler_instance.Search(index_map, vector, limit) - # according to difference files get topk of each - # reduce the topk from them - # construct response and send back vector_id = 0 return VectorEngine.SUCCESS_CODE, vector_id @@ -179,17 +178,11 @@ class VectorEngine(object): VectorEngine.group_dict[group_id].append(vector) print('InsertVectorIntoRawFile: ', VectorEngine.group_dict[group_id]) - - # if filename exist - # append - # if filename not exist - # create file - # append return filename @staticmethod - def GetVectorListFromRawFile(group_id, filename): + def GetVectorListFromRawFile(group_id, filename="todo"): return VectorEngine.group_dict[group_id] @staticmethod diff --git a/pyengine/engine/ingestion/build_index.py b/pyengine/engine/ingestion/build_index.py new file mode 100644 index 0000000000000000000000000000000000000000..ebb07499270cabeeeaf43503c7b2627e3a110e2f --- /dev/null +++ b/pyengine/engine/ingestion/build_index.py @@ -0,0 +1,55 @@ +import faiss +from enum import Enum, unique + + +@unique +class INDEX_DEVICES(Enum): + CPU = 0 + GPU = 1 + MULTI_GPU = 2 + + +def FactoryIndex(index_name="DefaultIndex"): + cls = globals()[index_name] + return cls # invoke __init__() by user + + +class Index(): + def build(d, vectors, DEVICE=INDEX_DEVICES.CPU): + pass + + @staticmethod + def increase(trained_index, vectors): + trained_index.add((vectors)) + + @staticmethod + def serialize(index): + writer = faiss.VectorIOWriter() + faiss.write_index(index, writer) + array_data = faiss.vector_to_array(writer.data) + return array_data + + +class DefaultIndex(Index): + def __init__(self, *args, **kwargs): + # maybe need to specif parameters + pass + + def build(d, vectors, DEVICE=INDEX_DEVICES.CPU): + index = faiss.IndexFlatL2(d) # trained + index.add(vectors) + return index + + +class LowMemoryIndex(Index): + def __init__(self, *args, **kwargs): + self.__nlist = 100 + self.__bytes_per_vector = 8 + self.__bits_per_sub_vector = 8 + + def build(d, vectors, DEVICE=INDEX_DEVICES.CPU): + # quantizer = faiss.IndexFlatL2(d) + # index = faiss.IndexIVFPQ(quantizer, d, self.nlist, + # self.__bytes_per_vector, self.__bits_per_sub_vector) + # return index + pass \ No newline at end of file diff --git a/pyengine/engine/retrieval/scheduler.py b/pyengine/engine/retrieval/scheduler.py deleted file mode 100644 index 576f22fa7a9d6b99f5c09f14ae9c3d0be1f5338e..0000000000000000000000000000000000000000 --- a/pyengine/engine/retrieval/scheduler.py +++ /dev/null @@ -1,33 +0,0 @@ -import faiss -import numpy as np - -class FaissSearch(): - def __init__(self, index, id_to_vector_map=None): - self.__index = index - if id_to_vector_map is None: - self.__id_to_vector_map = [] - - # def search_by_ids(self, id_list, k): - # pass - - def search_by_vectors(self, vector_list, k): - id_list = [None] * len(vector_list) - - result = self.__search(id_list, vector_list, k) - return result - - def __search(self, id_list, vector_list, k): - D, I = self.__index.search(vector_list, k) - return I - - -# class FaissIndex(): -# def build_index(self, vector_list, dimension): -# # return index -# pass -# -# def build_index_cpu(self): -# pass -# -# def build_index_gpu(self): -# pass \ No newline at end of file diff --git a/pyengine/engine/retrieval/search_index.py b/pyengine/engine/retrieval/search_index.py new file mode 100644 index 0000000000000000000000000000000000000000..313781618154f75156327c95d0830c0f745edbe6 --- /dev/null +++ b/pyengine/engine/retrieval/search_index.py @@ -0,0 +1,36 @@ +import faiss + + +class SearchResult(): + def __init__(self, D, I): + self.distance = D + self.vectors = I + + def __add__(self, other): + self.distance += other.distance + self.vectors += other.vectors + + +class FaissSearch(): + def __init__(self, index_data, id_to_vector_map=None): + self.__index = index_data + + if id_to_vector_map is None: + self.__id_to_vector_map = [] + + # def search_by_ids(self, id_list, k): + # pass + + def search_by_vectors(self, query_vectors, k): + id_list = [None] * len(query_vectors) + + result = self.__search(id_list, query_vectors, k) + return result + + def __search(self, id_list, vector_list, k): + D, I = self.__index.search(vector_list, k) + return SearchResult(D, I) + + +def top_k(input, k): + pass \ No newline at end of file diff --git a/pyengine/engine/retrieval/tests/__init__.py b/pyengine/engine/retrieval/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/pyengine/engine/retrieval/tests/basic_test.py b/pyengine/engine/retrieval/tests/basic_test.py new file mode 100644 index 0000000000000000000000000000000000000000..f789395527a730b882597667bebf1163c7e28092 --- /dev/null +++ b/pyengine/engine/retrieval/tests/basic_test.py @@ -0,0 +1,104 @@ +# import numpy as np + +# d = 64 # dimension +# nb = 100000 # database size +# nq = 10000 # nb of queries +# np.random.seed(1234) # make reproducible +# xb = np.random.random((nb, d)).astype('float32') +# xb[:, 0] += np.arange(nb) / 1000. +# xq = np.random.random((nq, d)).astype('float32') +# xq[:, 0] += np.arange(nq) / 1000. +# +# import faiss # make faiss available +# +# res = faiss.StandardGpuResources() # use a single GPU +# +# ## Using a flat index +# +# index_flat = faiss.IndexFlatL2(d) # build a flat (CPU) index +# +# # make it a flat GPU index +# gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, index_flat) +# +# gpu_index_flat.add(xb) # add vectors to the index +# print(gpu_index_flat.ntotal) +# +# k = 4 # we want to see 4 nearest neighbors +# D, I = gpu_index_flat.search(xq, k) # actual search +# print(I[:5]) # neighbors of the 5 first queries +# print(I[-5:]) # neighbors of the 5 last queries +# +# +# ## Using an IVF index +# +# nlist = 100 +# quantizer = faiss.IndexFlatL2(d) # the other index +# index_ivf = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_L2) +# # here we specify METRIC_L2, by default it performs inner-product search +# +# # make it an IVF GPU index +# gpu_index_ivf = faiss.index_cpu_to_gpu(res, 0, index_ivf) +# +# assert not gpu_index_ivf.is_trained +# gpu_index_ivf.train(xb) # add vectors to the index +# assert gpu_index_ivf.is_trained +# +# gpu_index_ivf.add(xb) # add vectors to the index +# print(gpu_index_ivf.ntotal) +# +# k = 4 # we want to see 4 nearest neighbors +# D, I = gpu_index_ivf.search(xq, k) # actual search +# print(I[:5]) # neighbors of the 5 first queries +# print(I[-5:]) + + +import numpy as np +import pytest + +@pytest.mark.skip(reason="Not for pytest") +def basic_test(): + d = 64 # dimension + nb = 100000 # database size + nq = 10000 # nb of queries + np.random.seed(1234) # make reproducible + xb = np.random.random((nb, d)).astype('float32') + xb[:, 0] += np.arange(nb) / 1000. + xc = np.random.random((nb, d)).astype('float32') + xc[:, 0] += np.arange(nb) / 1000. + xq = np.random.random((nq, d)).astype('float32') + xq[:, 0] += np.arange(nq) / 1000. + + import faiss # make faiss available + index = faiss.IndexFlatL2(d) # build the index + print(index.is_trained) + index.add(xb) # add vectors to the index + print(index.ntotal) + #faiss.write_index(index, "/tmp/faiss/tempfile_1") + + writer = faiss.VectorIOWriter() + faiss.write_index(index, writer) + ar_data = faiss.vector_to_array(writer.data) + import pickle + pickle.dump(ar_data, open("/tmp/faiss/ser_1", "wb")) + + #index_3 = pickle.load("/tmp/faiss/ser_1") + + + # index_2 = faiss.IndexFlatL2(d) # build the index + # print(index_2.is_trained) + # index_2.add(xc) # add vectors to the index + # print(index_2.ntotal) + # faiss.write_index(index, "/tmp/faiss/tempfile_2") + # + # index_3 = faiss.read_index + + # k = 4 # we want to see 4 nearest neighbors + # D, I = index.search(xb[:5], k) # sanity check + # print(I) + # print(D) + # D, I = index.search(xq, k) # actual search + # print(I[:5]) # neighbors of the 5 first queries + # print(I[-5:]) # neighbors of the 5 last queries + +if __name__ == '__main__': + basic_test() \ No newline at end of file diff --git a/pyengine/engine/retrieval/tests/scheduler_test.py b/pyengine/engine/retrieval/tests/scheduler_test.py new file mode 100644 index 0000000000000000000000000000000000000000..2644cbec0260ae159589422ee4e86d09ad737f49 --- /dev/null +++ b/pyengine/engine/retrieval/tests/scheduler_test.py @@ -0,0 +1,3 @@ +from engine.controller import scheduler + +# scheduler.Scheduler.Search() \ No newline at end of file diff --git a/pyengine/tests/basic_test.py b/pyengine/tests/basic_test.py deleted file mode 100644 index 262910432e237e8131710fca795bb41cdd481312..0000000000000000000000000000000000000000 --- a/pyengine/tests/basic_test.py +++ /dev/null @@ -1,52 +0,0 @@ -import numpy as np - -d = 64 # dimension -nb = 100000 # database size -nq = 10000 # nb of queries -np.random.seed(1234) # make reproducible -xb = np.random.random((nb, d)).astype('float32') -xb[:, 0] += np.arange(nb) / 1000. -xq = np.random.random((nq, d)).astype('float32') -xq[:, 0] += np.arange(nq) / 1000. - -import faiss # make faiss available - -res = faiss.StandardGpuResources() # use a single GPU - -## Using a flat index - -index_flat = faiss.IndexFlatL2(d) # build a flat (CPU) index - -# make it a flat GPU index -gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, index_flat) - -gpu_index_flat.add(xb) # add vectors to the index -print(gpu_index_flat.ntotal) - -k = 4 # we want to see 4 nearest neighbors -D, I = gpu_index_flat.search(xq, k) # actual search -print(I[:5]) # neighbors of the 5 first queries -print(I[-5:]) # neighbors of the 5 last queries - - -## Using an IVF index - -nlist = 100 -quantizer = faiss.IndexFlatL2(d) # the other index -index_ivf = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_L2) -# here we specify METRIC_L2, by default it performs inner-product search - -# make it an IVF GPU index -gpu_index_ivf = faiss.index_cpu_to_gpu(res, 0, index_ivf) - -assert not gpu_index_ivf.is_trained -gpu_index_ivf.train(xb) # add vectors to the index -assert gpu_index_ivf.is_trained - -gpu_index_ivf.add(xb) # add vectors to the index -print(gpu_index_ivf.ntotal) - -k = 4 # we want to see 4 nearest neighbors -D, I = gpu_index_ivf.search(xq, k) # actual search -print(I[:5]) # neighbors of the 5 first queries -print(I[-5:]) \ No newline at end of file