提交 cfbe86df 编写于 作者: X xj.lin

Merge branch 'develop' into jinhai

from engine.retrieval import search_index from engine.retrieval import search_index
from engine.ingestion import build_index from engine.ingestion import build_index
from engine.ingestion import serialize from engine.ingestion import serialize
import numpy as np
class Singleton(type): class Singleton(type):
_instances = {} _instances = {}
def __call__(cls, *args, **kwargs): def __call__(cls, *args, **kwargs):
if cls not in cls._instances: if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
...@@ -12,7 +14,7 @@ class Singleton(type): ...@@ -12,7 +14,7 @@ class Singleton(type):
class Scheduler(metaclass=Singleton): class Scheduler(metaclass=Singleton):
def Search(self, index_file_key, vectors, k): def search(self, index_file_key, vectors, k):
# assert index_file_key # assert index_file_key
# assert vectors # assert vectors
assert k != 0 assert k != 0
...@@ -20,7 +22,6 @@ class Scheduler(metaclass=Singleton): ...@@ -20,7 +22,6 @@ class Scheduler(metaclass=Singleton):
query_vectors = serialize.to_array(vectors) query_vectors = serialize.to_array(vectors)
return self.__scheduler(index_file_key, query_vectors, k) return self.__scheduler(index_file_key, query_vectors, k)
def __scheduler(self, index_data_key, vectors, k): def __scheduler(self, index_data_key, vectors, k):
result_list = [] result_list = []
...@@ -36,18 +37,35 @@ class Scheduler(metaclass=Singleton): ...@@ -36,18 +37,35 @@ class Scheduler(metaclass=Singleton):
if 'index' in index_data_key: if 'index' in index_data_key:
index_data_list = index_data_key['index'] index_data_list = index_data_key['index']
for key in index_data_list: for key in index_data_list:
index = GetIndexData(key) index = get_index_data(key)
searcher = search_index.FaissSearch(index) searcher = search_index.FaissSearch(index)
result_list.append(searcher.search_by_vectors(vectors, k)) result_list.append(searcher.search_by_vectors(vectors, k))
if len(result_list) == 1: if len(result_list) == 1:
return result_list[0].vectors return result_list[0].vectors
total_result = [] return result_list; # TODO(linxj): add topk
# result = search_index.top_k(result_list, k) # d_list = np.array([])
return result_list # v_list = np.array([])
# for result in result_list:
# rd = result.distance
# rv = result.vectors
#
# td_list = np.array([])
# tv_list = np.array([])
# for d, v in zip(rd, rv):
# td_list = np.append(td_list, d)
# tv_list = np.append(tv_list, v)
# d_list = np.add(d_list, td_list)
# v_list = np.add(v_list, td_list)
#
# print(d_list)
# print(v_list)
# result_map = [d_list, v_list]
# top_k_result = search_index.top_k(result_map, k)
# return top_k_result
def GetIndexData(key): def get_index_data(key):
return serialize.read_index(key) return serialize.read_index(key)
...@@ -9,11 +9,10 @@ class TestScheduler(unittest.TestCase): ...@@ -9,11 +9,10 @@ class TestScheduler(unittest.TestCase):
def test_schedule(self): def test_schedule(self):
d = 64 d = 64
nb = 10000 nb = 10000
nq = 100 nq = 2
nt = 5000 nt = 5000
xt, xb, xq = get_dataset(d, nb, nt, nq) xt, xb, xq = get_dataset(d, nb, nt, nq)
file_name = "/tmp/faiss/tempfile_1" file_name = "/tmp/tempfile_1"
index = faiss.IndexFlatL2(d) index = faiss.IndexFlatL2(d)
print(index.is_trained) print(index.is_trained)
...@@ -61,5 +60,6 @@ def get_dataset(d, nb, nt, nq): ...@@ -61,5 +60,6 @@ def get_dataset(d, nb, nt, nq):
x = x.astype('float32') x = x.astype('float32')
return x[:nt], x[nt:-nq], x[-nq:] return x[:nt], x[nt:-nq], x[-nq:]
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -156,7 +156,7 @@ class VectorEngine(object): ...@@ -156,7 +156,7 @@ class VectorEngine(object):
scheduler_instance = Scheduler() scheduler_instance = Scheduler()
vectors = [] vectors = []
vectors.append(vector) vectors.append(vector)
result = scheduler_instance.Search(index_map, vectors, limit) result = scheduler_instance.search(index_map, vectors, limit)
vector_id = [0] vector_id = [0]
......
...@@ -3,7 +3,7 @@ from enum import Enum, unique ...@@ -3,7 +3,7 @@ from enum import Enum, unique
@unique @unique
class INDEX_DEVICES(Enum): class INDEXDEVICES(Enum):
CPU = 0 CPU = 0
GPU = 1 GPU = 1
MULTI_GPU = 2 MULTI_GPU = 2
...@@ -15,7 +15,7 @@ def FactoryIndex(index_name="DefaultIndex"): ...@@ -15,7 +15,7 @@ def FactoryIndex(index_name="DefaultIndex"):
class Index(): class Index():
def build(self, d, vectors, vector_ids, DEVICE=INDEX_DEVICES.CPU): def build(self, d, vectors, vector_ids, DEVICE=INDEXDEVICES.CPU):
pass pass
@staticmethod @staticmethod
...@@ -35,7 +35,7 @@ class DefaultIndex(Index): ...@@ -35,7 +35,7 @@ class DefaultIndex(Index):
# maybe need to specif parameters # maybe need to specif parameters
pass pass
def build(self, d, vectors, vector_ids, DEVICE=INDEX_DEVICES.CPU): def build(self, d, vectors, vector_ids, DEVICE=INDEXDEVICES.CPU):
index = faiss.IndexFlatL2(d) # trained index = faiss.IndexFlatL2(d) # trained
index.add(vectors) index.add(vectors)
return index return index
...@@ -47,7 +47,7 @@ class LowMemoryIndex(Index): ...@@ -47,7 +47,7 @@ class LowMemoryIndex(Index):
self.__bytes_per_vector = 8 self.__bytes_per_vector = 8
self.__bits_per_sub_vector = 8 self.__bits_per_sub_vector = 8
def build(d, vectors, vector_ids, DEVICE=INDEX_DEVICES.CPU): def build(d, vectors, vector_ids, DEVICE=INDEXDEVICES.CPU):
# quantizer = faiss.IndexFlatL2(d) # quantizer = faiss.IndexFlatL2(d)
# index = faiss.IndexIVFPQ(quantizer, d, self.nlist, # index = faiss.IndexIVFPQ(quantizer, d, self.nlist,
# self.__bytes_per_vector, self.__bits_per_sub_vector) # self.__bytes_per_vector, self.__bits_per_sub_vector)
......
import faiss import faiss
import numpy as np import numpy as np
def write_index(index, file_name): def write_index(index, file_name):
faiss.write_index(index, file_name) faiss.write_index(index, file_name)
def read_index(file_name): def read_index(file_name):
return faiss.read_index(file_name) return faiss.read_index(file_name)
def to_array(vec): def to_array(vec):
return np.asarray(vec).astype('float32') return np.asarray(vec).astype('float32')
def to_str_array(vec): def to_str_array(vec):
return np.asarray(vec).astype('str') return np.asarray(vec).astype('str')
...@@ -65,7 +65,6 @@ class TestBuildIndex(unittest.TestCase): ...@@ -65,7 +65,6 @@ class TestBuildIndex(unittest.TestCase):
assert np.all(Dnew == Dref) and np.all(Inew == Iref) assert np.all(Dnew == Dref) and np.all(Inew == Iref)
def get_dataset(d, nb, nt, nq): def get_dataset(d, nb, nt, nq):
"""A dataset that is not completely random but still challenging to """A dataset that is not completely random but still challenging to
index index
...@@ -83,6 +82,5 @@ def get_dataset(d, nb, nt, nq): ...@@ -83,6 +82,5 @@ def get_dataset(d, nb, nt, nq):
return x[:nt], x[nt:-nq], x[-nq:] return x[:nt], x[nt:-nq], x[-nq:]
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
import faiss import faiss
import numpy as np
class SearchResult(): class SearchResult():
...@@ -32,7 +33,9 @@ class FaissSearch(): ...@@ -32,7 +33,9 @@ class FaissSearch():
D, I = self.__index.search(vector_list, k) D, I = self.__index.search(vector_list, k)
return SearchResult(D, I) return SearchResult(D, I)
import heapq
# import heapq
def top_k(input, k): def top_k(input, k):
#sorted = heapq.nsmallest(k, input, key=input.key)
pass pass
# sorted = heapq.nsmallest(k, input, key=np.sum(input.get()))
# return sorted
# import numpy as np
# d = 64 # dimension
# nb = 100000 # database size
# nq = 10000 # nb of queries
# np.random.seed(1234) # make reproducible
# xb = np.random.random((nb, d)).astype('float32')
# xb[:, 0] += np.arange(nb) / 1000.
# xq = np.random.random((nq, d)).astype('float32')
# xq[:, 0] += np.arange(nq) / 1000.
#
# import faiss # make faiss available
#
# res = faiss.StandardGpuResources() # use a single GPU
#
# ## Using a flat index
#
# index_flat = faiss.IndexFlatL2(d) # build a flat (CPU) index
#
# # make it a flat GPU index
# gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, index_flat)
#
# gpu_index_flat.add(xb) # add vectors to the index
# print(gpu_index_flat.ntotal)
#
# k = 4 # we want to see 4 nearest neighbors
# D, I = gpu_index_flat.search(xq, k) # actual search
# print(I[:5]) # neighbors of the 5 first queries
# print(I[-5:]) # neighbors of the 5 last queries
#
#
# ## Using an IVF index
#
# nlist = 100
# quantizer = faiss.IndexFlatL2(d) # the other index
# index_ivf = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_L2)
# # here we specify METRIC_L2, by default it performs inner-product search
#
# # make it an IVF GPU index
# gpu_index_ivf = faiss.index_cpu_to_gpu(res, 0, index_ivf)
#
# assert not gpu_index_ivf.is_trained
# gpu_index_ivf.train(xb) # add vectors to the index
# assert gpu_index_ivf.is_trained
#
# gpu_index_ivf.add(xb) # add vectors to the index
# print(gpu_index_ivf.ntotal)
#
# k = 4 # we want to see 4 nearest neighbors
# D, I = gpu_index_ivf.search(xq, k) # actual search
# print(I[:5]) # neighbors of the 5 first queries
# print(I[-5:])
import numpy as np
import pytest
@pytest.mark.skip(reason="Not for pytest")
def basic_test():
d = 64 # dimension
nb = 100000 # database size
nq = 10000 # nb of queries
np.random.seed(1234) # make reproducible
xb = np.random.random((nb, d)).astype('float32')
xb[:, 0] += np.arange(nb) / 1000.
xc = np.random.random((nb, d)).astype('float32')
xc[:, 0] += np.arange(nb) / 1000.
xq = np.random.random((nq, d)).astype('float32')
xq[:, 0] += np.arange(nq) / 1000.
import faiss # make faiss available
index = faiss.IndexFlatL2(d) # build the index
print(index.is_trained)
index.add(xb) # add vectors to the index
print(index.ntotal)
#faiss.write_index(index, "/tmp/faiss/tempfile_1")
writer = faiss.VectorIOWriter()
faiss.write_index(index, writer)
ar_data = faiss.vector_to_array(writer.data)
import pickle
pickle.dump(ar_data, open("/tmp/faiss/ser_1", "wb"))
#index_3 = pickle.load("/tmp/faiss/ser_1")
# index_2 = faiss.IndexFlatL2(d) # build the index
# print(index_2.is_trained)
# index_2.add(xc) # add vectors to the index
# print(index_2.ntotal)
# faiss.write_index(index, "/tmp/faiss/tempfile_2")
#
# index_3 = faiss.read_index
# k = 4 # we want to see 4 nearest neighbors
# D, I = index.search(xb[:5], k) # sanity check
# print(I)
# print(D)
# D, I = index.search(xq, k) # actual search
# print(I[:5]) # neighbors of the 5 first queries
# print(I[-5:]) # neighbors of the 5 last queries
if __name__ == '__main__':
basic_test()
...@@ -3,6 +3,7 @@ from ..search_index import * ...@@ -3,6 +3,7 @@ from ..search_index import *
import unittest import unittest
import numpy as np import numpy as np
class TestSearchSingleThread(unittest.TestCase): class TestSearchSingleThread(unittest.TestCase):
def test_search_by_vectors(self): def test_search_by_vectors(self):
d = 64 d = 64
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册