From 584c60d36380d25037fd1998a97ee68dc1e1e0e2 Mon Sep 17 00:00:00 2001 From: "xj.lin" Date: Sun, 24 Mar 2019 17:07:38 +0800 Subject: [PATCH] add index_build --- pyengine/engine/controller/scheduler.py | 50 ++++++++++ pyengine/engine/controller/vector_engine.py | 54 +++++----- pyengine/engine/ingestion/build_index.py | 55 +++++++++++ pyengine/engine/retrieval/scheduler.py | 33 ------- pyengine/engine/retrieval/search_index.py | 36 +++++++ pyengine/engine/retrieval/tests/__init__.py | 0 pyengine/engine/retrieval/tests/basic_test.py | 98 +++++++++++++++++++ .../engine/retrieval/tests/scheduler_test.py | 3 + pyengine/tests/basic_test.py | 52 ---------- 9 files changed, 270 insertions(+), 111 deletions(-) create mode 100644 pyengine/engine/controller/scheduler.py create mode 100644 pyengine/engine/ingestion/build_index.py delete mode 100644 pyengine/engine/retrieval/scheduler.py create mode 100644 pyengine/engine/retrieval/search_index.py create mode 100644 pyengine/engine/retrieval/tests/__init__.py create mode 100644 pyengine/engine/retrieval/tests/basic_test.py create mode 100644 pyengine/engine/retrieval/tests/scheduler_test.py delete mode 100644 pyengine/tests/basic_test.py diff --git a/pyengine/engine/controller/scheduler.py b/pyengine/engine/controller/scheduler.py new file mode 100644 index 00000000..9c7583c0 --- /dev/null +++ b/pyengine/engine/controller/scheduler.py @@ -0,0 +1,50 @@ +from engine.retrieval import search_index +from engine.ingestion import build_index + +class Singleton(type): + _instances = {} + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +class Scheduler(metaclass=Singleton): + def Search(self, index_file_key, vectors, k): + assert index_file_key + assert vectors + assert k + + return self.__scheduler(index_file_key, vectors, k) + + + def __scheduler(self, index_data_key, vectors, k): + result_list = [] + + raw_data_list = index_data_key['raw'] + index_data_list = index_data_key['index'] + + for key in raw_data_list: + raw_data, d = self.GetRawData(key) + index_builder = build_index.FactoryIndex() + index = index_builder().build(d, raw_data) + searcher = search_index.FaissSearch(index) # silly + result_list.append(searcher.search_by_vectors(vectors, k)) + + for key in index_data_list: + index = self.GetIndexData(key) + searcher = search_index.FaissSearch(index) + result_list.append(searcher.search_by_vectors(vectors, k)) + + if len(result_list) == 1: + return result_list[0].vectors + + result = search_index.top_k(sum(result_list), k) + return result + + + def GetIndexData(self, key): + pass + + def GetRawData(self, key): + pass diff --git a/pyengine/engine/controller/vector_engine.py b/pyengine/engine/controller/vector_engine.py index 4252f7a1..f8e156e1 100644 --- a/pyengine/engine/controller/vector_engine.py +++ b/pyengine/engine/controller/vector_engine.py @@ -6,6 +6,8 @@ from engine.controller.index_file_handler import IndexFileHandler from engine.settings import ROW_LIMIT from flask import jsonify from engine import db +from engine.ingestion import build_index +from engine.controller.scheduler import Scheduler import sys, os class VectorEngine(object): @@ -77,6 +79,7 @@ class VectorEngine(object): def AddVector(group_id, vector): print(group_id, vector) file = FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').first() + group = GroupTable.query.filter(GroupTable.group_name == group_id).first() if file: print('insert into exist file') # insert into raw file @@ -84,14 +87,17 @@ class VectorEngine(object): # check if the file can be indexed if file.row_number + 1 >= ROW_LIMIT: - # read data from raw file - data = GetVectorsFromRawFile() + raw_data = GetVectorListFromRawFile(group_id) + d = group.dimension # create index + index_builder = build_index.FactoryIndex() + index = index_builder().build(d, raw_data) + + # TODO(jinhai): store index into Cache index_filename = file.filename + '_index' - CreateIndex(group_id, index_filename, data) - # update record into database + # TODO(jinhai): Update raw_file_name => index_file_name FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').update({'row_number':file.row_number + 1, 'type': 'index'}) pass @@ -116,23 +122,18 @@ class VectorEngine(object): @staticmethod - def SearchVector(group_id, vector, limit): + def SearchVector(group_id, vectors, limit): # find all files files = FileTable.query.filter(FileTable.group_name == group_id).all() - for file in files: - if(file.type == 'raw'): - # create index - # add vector list - # train - # get topk - print('search in raw file: ', file.filename) - pass - else: - # get topk - print('search in index file: ', file.filename) - data = IndexFileHandler.Read(file.filename, file.type) - pass + raw_keys = [ i.filename for i in files if i.type == 'raw' ] + index_keys = [ i.filename for i in files if i.type == 'index' ] + index_map = dict + index_map['raw'] = raw_keys + index_map['index'] = index_keys # {raw:[key1, key2], index:[key3, key4]} + + scheduler_instance = Scheduler + result = scheduler_instance.Search(index_map, vectors, k=limit) # according to difference files get topk of each # reduce the topk from them @@ -140,13 +141,14 @@ class VectorEngine(object): return jsonify({'code': 0}) - @staticmethod - def CreateIndex(group_id): - # create index - file = FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').first() - path = GroupHandler.GetGroupDirectory(group_id) + '/' + file.filename - print('Going to create index for: ', path) - return jsonify({'code': 0}) + # TODO(linxj): Debug Interface. UnSopport now + # @staticmethod + # def CreateIndex(group_id): + # # create index + # file = FileTable.query.filter(FileTable.group_name == group_id).filter(FileTable.type == 'raw').first() + # path = GroupHandler.GetGroupDirectory(group_id) + '/' + file.filename + # print('Going to create index for: ', path) + # return jsonify({'code': 0}) @staticmethod @@ -171,6 +173,6 @@ class VectorEngine(object): @staticmethod - def GetVectorListFromRawFile(group_id, filename): + def GetVectorListFromRawFile(group_id, filename="todo"): return VectorEngine.group_dict[group_id] diff --git a/pyengine/engine/ingestion/build_index.py b/pyengine/engine/ingestion/build_index.py new file mode 100644 index 00000000..ebb07499 --- /dev/null +++ b/pyengine/engine/ingestion/build_index.py @@ -0,0 +1,55 @@ +import faiss +from enum import Enum, unique + + +@unique +class INDEX_DEVICES(Enum): + CPU = 0 + GPU = 1 + MULTI_GPU = 2 + + +def FactoryIndex(index_name="DefaultIndex"): + cls = globals()[index_name] + return cls # invoke __init__() by user + + +class Index(): + def build(d, vectors, DEVICE=INDEX_DEVICES.CPU): + pass + + @staticmethod + def increase(trained_index, vectors): + trained_index.add((vectors)) + + @staticmethod + def serialize(index): + writer = faiss.VectorIOWriter() + faiss.write_index(index, writer) + array_data = faiss.vector_to_array(writer.data) + return array_data + + +class DefaultIndex(Index): + def __init__(self, *args, **kwargs): + # maybe need to specif parameters + pass + + def build(d, vectors, DEVICE=INDEX_DEVICES.CPU): + index = faiss.IndexFlatL2(d) # trained + index.add(vectors) + return index + + +class LowMemoryIndex(Index): + def __init__(self, *args, **kwargs): + self.__nlist = 100 + self.__bytes_per_vector = 8 + self.__bits_per_sub_vector = 8 + + def build(d, vectors, DEVICE=INDEX_DEVICES.CPU): + # quantizer = faiss.IndexFlatL2(d) + # index = faiss.IndexIVFPQ(quantizer, d, self.nlist, + # self.__bytes_per_vector, self.__bits_per_sub_vector) + # return index + pass \ No newline at end of file diff --git a/pyengine/engine/retrieval/scheduler.py b/pyengine/engine/retrieval/scheduler.py deleted file mode 100644 index 576f22fa..00000000 --- a/pyengine/engine/retrieval/scheduler.py +++ /dev/null @@ -1,33 +0,0 @@ -import faiss -import numpy as np - -class FaissSearch(): - def __init__(self, index, id_to_vector_map=None): - self.__index = index - if id_to_vector_map is None: - self.__id_to_vector_map = [] - - # def search_by_ids(self, id_list, k): - # pass - - def search_by_vectors(self, vector_list, k): - id_list = [None] * len(vector_list) - - result = self.__search(id_list, vector_list, k) - return result - - def __search(self, id_list, vector_list, k): - D, I = self.__index.search(vector_list, k) - return I - - -# class FaissIndex(): -# def build_index(self, vector_list, dimension): -# # return index -# pass -# -# def build_index_cpu(self): -# pass -# -# def build_index_gpu(self): -# pass \ No newline at end of file diff --git a/pyengine/engine/retrieval/search_index.py b/pyengine/engine/retrieval/search_index.py new file mode 100644 index 00000000..31378161 --- /dev/null +++ b/pyengine/engine/retrieval/search_index.py @@ -0,0 +1,36 @@ +import faiss + + +class SearchResult(): + def __init__(self, D, I): + self.distance = D + self.vectors = I + + def __add__(self, other): + self.distance += other.distance + self.vectors += other.vectors + + +class FaissSearch(): + def __init__(self, index_data, id_to_vector_map=None): + self.__index = index_data + + if id_to_vector_map is None: + self.__id_to_vector_map = [] + + # def search_by_ids(self, id_list, k): + # pass + + def search_by_vectors(self, query_vectors, k): + id_list = [None] * len(query_vectors) + + result = self.__search(id_list, query_vectors, k) + return result + + def __search(self, id_list, vector_list, k): + D, I = self.__index.search(vector_list, k) + return SearchResult(D, I) + + +def top_k(input, k): + pass \ No newline at end of file diff --git a/pyengine/engine/retrieval/tests/__init__.py b/pyengine/engine/retrieval/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pyengine/engine/retrieval/tests/basic_test.py b/pyengine/engine/retrieval/tests/basic_test.py new file mode 100644 index 00000000..1d31a9fb --- /dev/null +++ b/pyengine/engine/retrieval/tests/basic_test.py @@ -0,0 +1,98 @@ +# import numpy as np + +# d = 64 # dimension +# nb = 100000 # database size +# nq = 10000 # nb of queries +# np.random.seed(1234) # make reproducible +# xb = np.random.random((nb, d)).astype('float32') +# xb[:, 0] += np.arange(nb) / 1000. +# xq = np.random.random((nq, d)).astype('float32') +# xq[:, 0] += np.arange(nq) / 1000. +# +# import faiss # make faiss available +# +# res = faiss.StandardGpuResources() # use a single GPU +# +# ## Using a flat index +# +# index_flat = faiss.IndexFlatL2(d) # build a flat (CPU) index +# +# # make it a flat GPU index +# gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, index_flat) +# +# gpu_index_flat.add(xb) # add vectors to the index +# print(gpu_index_flat.ntotal) +# +# k = 4 # we want to see 4 nearest neighbors +# D, I = gpu_index_flat.search(xq, k) # actual search +# print(I[:5]) # neighbors of the 5 first queries +# print(I[-5:]) # neighbors of the 5 last queries +# +# +# ## Using an IVF index +# +# nlist = 100 +# quantizer = faiss.IndexFlatL2(d) # the other index +# index_ivf = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_L2) +# # here we specify METRIC_L2, by default it performs inner-product search +# +# # make it an IVF GPU index +# gpu_index_ivf = faiss.index_cpu_to_gpu(res, 0, index_ivf) +# +# assert not gpu_index_ivf.is_trained +# gpu_index_ivf.train(xb) # add vectors to the index +# assert gpu_index_ivf.is_trained +# +# gpu_index_ivf.add(xb) # add vectors to the index +# print(gpu_index_ivf.ntotal) +# +# k = 4 # we want to see 4 nearest neighbors +# D, I = gpu_index_ivf.search(xq, k) # actual search +# print(I[:5]) # neighbors of the 5 first queries +# print(I[-5:]) + + +import numpy as np + +d = 64 # dimension +nb = 100000 # database size +nq = 10000 # nb of queries +np.random.seed(1234) # make reproducible +xb = np.random.random((nb, d)).astype('float32') +xb[:, 0] += np.arange(nb) / 1000. +xc = np.random.random((nb, d)).astype('float32') +xc[:, 0] += np.arange(nb) / 1000. +xq = np.random.random((nq, d)).astype('float32') +xq[:, 0] += np.arange(nq) / 1000. + +import faiss # make faiss available +index = faiss.IndexFlatL2(d) # build the index +print(index.is_trained) +index.add(xb) # add vectors to the index +print(index.ntotal) +#faiss.write_index(index, "/tmp/faiss/tempfile_1") + +writer = faiss.VectorIOWriter() +faiss.write_index(index, writer) +ar_data = faiss.vector_to_array(writer.data) +import pickle +pickle.dump(ar_data, open("/tmp/faiss/ser_1", "wb")) + +#index_3 = pickle.load("/tmp/faiss/ser_1") + + +# index_2 = faiss.IndexFlatL2(d) # build the index +# print(index_2.is_trained) +# index_2.add(xc) # add vectors to the index +# print(index_2.ntotal) +# faiss.write_index(index, "/tmp/faiss/tempfile_2") +# +# index_3 = faiss.read_index + +# k = 4 # we want to see 4 nearest neighbors +# D, I = index.search(xb[:5], k) # sanity check +# print(I) +# print(D) +# D, I = index.search(xq, k) # actual search +# print(I[:5]) # neighbors of the 5 first queries +# print(I[-5:]) # neighbors of the 5 last queries \ No newline at end of file diff --git a/pyengine/engine/retrieval/tests/scheduler_test.py b/pyengine/engine/retrieval/tests/scheduler_test.py new file mode 100644 index 00000000..71dc6e7b --- /dev/null +++ b/pyengine/engine/retrieval/tests/scheduler_test.py @@ -0,0 +1,3 @@ +from engine.controller import scheduler + +scheduler.Scheduler.Search() \ No newline at end of file diff --git a/pyengine/tests/basic_test.py b/pyengine/tests/basic_test.py deleted file mode 100644 index 26291043..00000000 --- a/pyengine/tests/basic_test.py +++ /dev/null @@ -1,52 +0,0 @@ -import numpy as np - -d = 64 # dimension -nb = 100000 # database size -nq = 10000 # nb of queries -np.random.seed(1234) # make reproducible -xb = np.random.random((nb, d)).astype('float32') -xb[:, 0] += np.arange(nb) / 1000. -xq = np.random.random((nq, d)).astype('float32') -xq[:, 0] += np.arange(nq) / 1000. - -import faiss # make faiss available - -res = faiss.StandardGpuResources() # use a single GPU - -## Using a flat index - -index_flat = faiss.IndexFlatL2(d) # build a flat (CPU) index - -# make it a flat GPU index -gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, index_flat) - -gpu_index_flat.add(xb) # add vectors to the index -print(gpu_index_flat.ntotal) - -k = 4 # we want to see 4 nearest neighbors -D, I = gpu_index_flat.search(xq, k) # actual search -print(I[:5]) # neighbors of the 5 first queries -print(I[-5:]) # neighbors of the 5 last queries - - -## Using an IVF index - -nlist = 100 -quantizer = faiss.IndexFlatL2(d) # the other index -index_ivf = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_L2) -# here we specify METRIC_L2, by default it performs inner-product search - -# make it an IVF GPU index -gpu_index_ivf = faiss.index_cpu_to_gpu(res, 0, index_ivf) - -assert not gpu_index_ivf.is_trained -gpu_index_ivf.train(xb) # add vectors to the index -assert gpu_index_ivf.is_trained - -gpu_index_ivf.add(xb) # add vectors to the index -print(gpu_index_ivf.ntotal) - -k = 4 # we want to see 4 nearest neighbors -D, I = gpu_index_ivf.search(xq, k) # actual search -print(I[:5]) # neighbors of the 5 first queries -print(I[-5:]) \ No newline at end of file -- GitLab