Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
f2182dd4
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
f2182dd4
编写于
3月 24, 2019
作者:
J
jinhai
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into jinhai
上级
c6a1b300
584c60d3
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
266 addition
and
77 deletion
+266
-77
pyengine/engine/controller/scheduler.py
pyengine/engine/controller/scheduler.py
+50
-0
pyengine/engine/controller/vector_engine.py
pyengine/engine/controller/vector_engine.py
+18
-25
pyengine/engine/ingestion/build_index.py
pyengine/engine/ingestion/build_index.py
+55
-0
pyengine/engine/retrieval/search_index.py
pyengine/engine/retrieval/search_index.py
+36
-0
pyengine/engine/retrieval/tests/__init__.py
pyengine/engine/retrieval/tests/__init__.py
+0
-0
pyengine/engine/retrieval/tests/basic_test.py
pyengine/engine/retrieval/tests/basic_test.py
+104
-0
pyengine/engine/retrieval/tests/scheduler_test.py
pyengine/engine/retrieval/tests/scheduler_test.py
+3
-0
pyengine/tests/basic_test.py
pyengine/tests/basic_test.py
+0
-52
未找到文件。
pyengine/engine/controller/scheduler.py
0 → 100644
浏览文件 @
f2182dd4
from
engine.retrieval
import
search_index
from
engine.ingestion
import
build_index
class
Singleton
(
type
):
_instances
=
{}
def
__call__
(
cls
,
*
args
,
**
kwargs
):
if
cls
not
in
cls
.
_instances
:
cls
.
_instances
[
cls
]
=
super
(
Singleton
,
cls
).
__call__
(
*
args
,
**
kwargs
)
return
cls
.
_instances
[
cls
]
class
Scheduler
(
metaclass
=
Singleton
):
def
Search
(
self
,
index_file_key
,
vectors
,
k
):
assert
index_file_key
assert
vectors
assert
k
return
self
.
__scheduler
(
index_file_key
,
vectors
,
k
)
def
__scheduler
(
self
,
index_data_key
,
vectors
,
k
):
result_list
=
[]
raw_data_list
=
index_data_key
[
'raw'
]
index_data_list
=
index_data_key
[
'index'
]
for
key
in
raw_data_list
:
raw_data
,
d
=
self
.
GetRawData
(
key
)
index_builder
=
build_index
.
FactoryIndex
()
index
=
index_builder
().
build
(
d
,
raw_data
)
searcher
=
search_index
.
FaissSearch
(
index
)
# silly
result_list
.
append
(
searcher
.
search_by_vectors
(
vectors
,
k
))
for
key
in
index_data_list
:
index
=
self
.
GetIndexData
(
key
)
searcher
=
search_index
.
FaissSearch
(
index
)
result_list
.
append
(
searcher
.
search_by_vectors
(
vectors
,
k
))
if
len
(
result_list
)
==
1
:
return
result_list
[
0
].
vectors
result
=
search_index
.
top_k
(
sum
(
result_list
),
k
)
return
result
def
GetIndexData
(
self
,
key
):
pass
def
GetRawData
(
self
,
key
):
pass
pyengine/engine/controller/vector_engine.py
浏览文件 @
f2182dd4
...
@@ -6,6 +6,8 @@ from engine.controller.index_file_handler import IndexFileHandler
...
@@ -6,6 +6,8 @@ from engine.controller.index_file_handler import IndexFileHandler
from
engine.settings
import
ROW_LIMIT
from
engine.settings
import
ROW_LIMIT
from
flask
import
jsonify
from
flask
import
jsonify
from
engine
import
db
from
engine
import
db
from
engine.ingestion
import
build_index
from
engine.controller.scheduler
import
Scheduler
import
sys
,
os
import
sys
,
os
class
VectorEngine
(
object
):
class
VectorEngine
(
object
):
...
@@ -83,6 +85,7 @@ class VectorEngine(object):
...
@@ -83,6 +85,7 @@ class VectorEngine(object):
return
VectorEngine
.
GROUP_NOT_EXIST
return
VectorEngine
.
GROUP_NOT_EXIST
file
=
FileTable
.
query
.
filter
(
FileTable
.
group_name
==
group_id
).
filter
(
FileTable
.
type
==
'raw'
).
first
()
file
=
FileTable
.
query
.
filter
(
FileTable
.
group_name
==
group_id
).
filter
(
FileTable
.
type
==
'raw'
).
first
()
group
=
GroupTable
.
query
.
filter
(
GroupTable
.
group_name
==
group_id
).
first
()
if
file
:
if
file
:
print
(
'insert into exist file'
)
print
(
'insert into exist file'
)
# insert into raw file
# insert into raw file
...
@@ -90,14 +93,17 @@ class VectorEngine(object):
...
@@ -90,14 +93,17 @@ class VectorEngine(object):
# check if the file can be indexed
# check if the file can be indexed
if
file
.
row_number
+
1
>=
ROW_LIMIT
:
if
file
.
row_number
+
1
>=
ROW_LIMIT
:
# read data from raw file
raw_data
=
GetVectorListFromRawFile
(
group_id
)
d
ata
=
GetVectorsFromRawFile
()
d
=
group
.
dimension
# create index
# create index
index_builder
=
build_index
.
FactoryIndex
()
index
=
index_builder
().
build
(
d
,
raw_data
)
# TODO(jinhai): store index into Cache
index_filename
=
file
.
filename
+
'_index'
index_filename
=
file
.
filename
+
'_index'
CreateIndex
(
group_id
,
index_filename
,
data
)
#
update record into databas
e
#
TODO(jinhai): Update raw_file_name => index_file_nam
e
FileTable
.
query
.
filter
(
FileTable
.
group_name
==
group_id
).
filter
(
FileTable
.
type
==
'raw'
).
update
({
'row_number'
:
file
.
row_number
+
1
,
'type'
:
'index'
})
FileTable
.
query
.
filter
(
FileTable
.
group_name
==
group_id
).
filter
(
FileTable
.
type
==
'raw'
).
update
({
'row_number'
:
file
.
row_number
+
1
,
'type'
:
'index'
})
pass
pass
...
@@ -130,22 +136,15 @@ class VectorEngine(object):
...
@@ -130,22 +136,15 @@ class VectorEngine(object):
# find all files
# find all files
files
=
FileTable
.
query
.
filter
(
FileTable
.
group_name
==
group_id
).
all
()
files
=
FileTable
.
query
.
filter
(
FileTable
.
group_name
==
group_id
).
all
()
raw_keys
=
[
i
.
filename
for
i
in
files
if
i
.
type
==
'raw'
]
index_keys
=
[
i
.
filename
for
i
in
files
if
i
.
type
==
'index'
]
index_map
=
{}
index_map
[
'raw'
]
=
raw_keys
index_map
[
'index'
]
=
index_keys
# {raw:[key1, key2], index:[key3, key4]}
for
file
in
files
:
scheduler_instance
=
Scheduler
()
if
(
file
.
type
==
'raw'
):
result
=
scheduler_instance
.
Search
(
index_map
,
vector
,
limit
)
# create index
# add vector list
# train
# get topk
print
(
'search in raw file: '
,
file
.
filename
)
else
:
# get topk
print
(
'search in index file: '
,
file
.
filename
)
data
=
IndexFileHandler
.
Read
(
file
.
filename
,
file
.
type
)
# according to difference files get topk of each
# reduce the topk from them
# construct response and send back
vector_id
=
0
vector_id
=
0
return
VectorEngine
.
SUCCESS_CODE
,
vector_id
return
VectorEngine
.
SUCCESS_CODE
,
vector_id
...
@@ -179,17 +178,11 @@ class VectorEngine(object):
...
@@ -179,17 +178,11 @@ class VectorEngine(object):
VectorEngine
.
group_dict
[
group_id
].
append
(
vector
)
VectorEngine
.
group_dict
[
group_id
].
append
(
vector
)
print
(
'InsertVectorIntoRawFile: '
,
VectorEngine
.
group_dict
[
group_id
])
print
(
'InsertVectorIntoRawFile: '
,
VectorEngine
.
group_dict
[
group_id
])
# if filename exist
# append
# if filename not exist
# create file
# append
return
filename
return
filename
@
staticmethod
@
staticmethod
def
GetVectorListFromRawFile
(
group_id
,
filename
):
def
GetVectorListFromRawFile
(
group_id
,
filename
=
"todo"
):
return
VectorEngine
.
group_dict
[
group_id
]
return
VectorEngine
.
group_dict
[
group_id
]
@
staticmethod
@
staticmethod
...
...
pyengine/engine/ingestion/build_index.py
0 → 100644
浏览文件 @
f2182dd4
import
faiss
from
enum
import
Enum
,
unique
@
unique
class
INDEX_DEVICES
(
Enum
):
CPU
=
0
GPU
=
1
MULTI_GPU
=
2
def
FactoryIndex
(
index_name
=
"DefaultIndex"
):
cls
=
globals
()[
index_name
]
return
cls
# invoke __init__() by user
class
Index
():
def
build
(
d
,
vectors
,
DEVICE
=
INDEX_DEVICES
.
CPU
):
pass
@
staticmethod
def
increase
(
trained_index
,
vectors
):
trained_index
.
add
((
vectors
))
@
staticmethod
def
serialize
(
index
):
writer
=
faiss
.
VectorIOWriter
()
faiss
.
write_index
(
index
,
writer
)
array_data
=
faiss
.
vector_to_array
(
writer
.
data
)
return
array_data
class
DefaultIndex
(
Index
):
def
__init__
(
self
,
*
args
,
**
kwargs
):
# maybe need to specif parameters
pass
def
build
(
d
,
vectors
,
DEVICE
=
INDEX_DEVICES
.
CPU
):
index
=
faiss
.
IndexFlatL2
(
d
)
# trained
index
.
add
(
vectors
)
return
index
class
LowMemoryIndex
(
Index
):
def
__init__
(
self
,
*
args
,
**
kwargs
):
self
.
__nlist
=
100
self
.
__bytes_per_vector
=
8
self
.
__bits_per_sub_vector
=
8
def
build
(
d
,
vectors
,
DEVICE
=
INDEX_DEVICES
.
CPU
):
# quantizer = faiss.IndexFlatL2(d)
# index = faiss.IndexIVFPQ(quantizer, d, self.nlist,
# self.__bytes_per_vector, self.__bits_per_sub_vector)
# return index
pass
\ No newline at end of file
pyengine/engine/retrieval/s
cheduler
.py
→
pyengine/engine/retrieval/s
earch_index
.py
浏览文件 @
f2182dd4
import
faiss
import
faiss
import
numpy
as
np
class
SearchResult
():
def
__init__
(
self
,
D
,
I
):
self
.
distance
=
D
self
.
vectors
=
I
def
__add__
(
self
,
other
):
self
.
distance
+=
other
.
distance
self
.
vectors
+=
other
.
vectors
class
FaissSearch
():
class
FaissSearch
():
def
__init__
(
self
,
index
,
id_to_vector_map
=
None
):
def
__init__
(
self
,
index_data
,
id_to_vector_map
=
None
):
self
.
__index
=
index
self
.
__index
=
index_data
if
id_to_vector_map
is
None
:
if
id_to_vector_map
is
None
:
self
.
__id_to_vector_map
=
[]
self
.
__id_to_vector_map
=
[]
# def search_by_ids(self, id_list, k):
# def search_by_ids(self, id_list, k):
# pass
# pass
def
search_by_vectors
(
self
,
vector_list
,
k
):
def
search_by_vectors
(
self
,
query_vectors
,
k
):
id_list
=
[
None
]
*
len
(
vector_list
)
id_list
=
[
None
]
*
len
(
query_vectors
)
result
=
self
.
__search
(
id_list
,
vector_list
,
k
)
result
=
self
.
__search
(
id_list
,
query_vectors
,
k
)
return
result
return
result
def
__search
(
self
,
id_list
,
vector_list
,
k
):
def
__search
(
self
,
id_list
,
vector_list
,
k
):
D
,
I
=
self
.
__index
.
search
(
vector_list
,
k
)
D
,
I
=
self
.
__index
.
search
(
vector_list
,
k
)
return
I
return
SearchResult
(
D
,
I
)
# class FaissIndex():
def
top_k
(
input
,
k
):
# def build_index(self, vector_list, dimension):
pass
# # return index
\ No newline at end of file
# pass
#
# def build_index_cpu(self):
# pass
#
# def build_index_gpu(self):
# pass
\ No newline at end of file
pyengine/engine/retrieval/tests/__init__.py
0 → 100644
浏览文件 @
f2182dd4
pyengine/engine/retrieval/tests/basic_test.py
0 → 100644
浏览文件 @
f2182dd4
# import numpy as np
# d = 64 # dimension
# nb = 100000 # database size
# nq = 10000 # nb of queries
# np.random.seed(1234) # make reproducible
# xb = np.random.random((nb, d)).astype('float32')
# xb[:, 0] += np.arange(nb) / 1000.
# xq = np.random.random((nq, d)).astype('float32')
# xq[:, 0] += np.arange(nq) / 1000.
#
# import faiss # make faiss available
#
# res = faiss.StandardGpuResources() # use a single GPU
#
# ## Using a flat index
#
# index_flat = faiss.IndexFlatL2(d) # build a flat (CPU) index
#
# # make it a flat GPU index
# gpu_index_flat = faiss.index_cpu_to_gpu(res, 0, index_flat)
#
# gpu_index_flat.add(xb) # add vectors to the index
# print(gpu_index_flat.ntotal)
#
# k = 4 # we want to see 4 nearest neighbors
# D, I = gpu_index_flat.search(xq, k) # actual search
# print(I[:5]) # neighbors of the 5 first queries
# print(I[-5:]) # neighbors of the 5 last queries
#
#
# ## Using an IVF index
#
# nlist = 100
# quantizer = faiss.IndexFlatL2(d) # the other index
# index_ivf = faiss.IndexIVFFlat(quantizer, d, nlist, faiss.METRIC_L2)
# # here we specify METRIC_L2, by default it performs inner-product search
#
# # make it an IVF GPU index
# gpu_index_ivf = faiss.index_cpu_to_gpu(res, 0, index_ivf)
#
# assert not gpu_index_ivf.is_trained
# gpu_index_ivf.train(xb) # add vectors to the index
# assert gpu_index_ivf.is_trained
#
# gpu_index_ivf.add(xb) # add vectors to the index
# print(gpu_index_ivf.ntotal)
#
# k = 4 # we want to see 4 nearest neighbors
# D, I = gpu_index_ivf.search(xq, k) # actual search
# print(I[:5]) # neighbors of the 5 first queries
# print(I[-5:])
import
numpy
as
np
import
pytest
@
pytest
.
mark
.
skip
(
reason
=
"Not for pytest"
)
def
basic_test
():
d
=
64
# dimension
nb
=
100000
# database size
nq
=
10000
# nb of queries
np
.
random
.
seed
(
1234
)
# make reproducible
xb
=
np
.
random
.
random
((
nb
,
d
)).
astype
(
'float32'
)
xb
[:,
0
]
+=
np
.
arange
(
nb
)
/
1000.
xc
=
np
.
random
.
random
((
nb
,
d
)).
astype
(
'float32'
)
xc
[:,
0
]
+=
np
.
arange
(
nb
)
/
1000.
xq
=
np
.
random
.
random
((
nq
,
d
)).
astype
(
'float32'
)
xq
[:,
0
]
+=
np
.
arange
(
nq
)
/
1000.
import
faiss
# make faiss available
index
=
faiss
.
IndexFlatL2
(
d
)
# build the index
print
(
index
.
is_trained
)
index
.
add
(
xb
)
# add vectors to the index
print
(
index
.
ntotal
)
#faiss.write_index(index, "/tmp/faiss/tempfile_1")
writer
=
faiss
.
VectorIOWriter
()
faiss
.
write_index
(
index
,
writer
)
ar_data
=
faiss
.
vector_to_array
(
writer
.
data
)
import
pickle
pickle
.
dump
(
ar_data
,
open
(
"/tmp/faiss/ser_1"
,
"wb"
))
#index_3 = pickle.load("/tmp/faiss/ser_1")
# index_2 = faiss.IndexFlatL2(d) # build the index
# print(index_2.is_trained)
# index_2.add(xc) # add vectors to the index
# print(index_2.ntotal)
# faiss.write_index(index, "/tmp/faiss/tempfile_2")
#
# index_3 = faiss.read_index
# k = 4 # we want to see 4 nearest neighbors
# D, I = index.search(xb[:5], k) # sanity check
# print(I)
# print(D)
# D, I = index.search(xq, k) # actual search
# print(I[:5]) # neighbors of the 5 first queries
# print(I[-5:]) # neighbors of the 5 last queries
if
__name__
==
'__main__'
:
basic_test
()
\ No newline at end of file
pyengine/engine/retrieval/tests/scheduler_test.py
0 → 100644
浏览文件 @
f2182dd4
from
engine.controller
import
scheduler
# scheduler.Scheduler.Search()
\ No newline at end of file
pyengine/tests/basic_test.py
已删除
100644 → 0
浏览文件 @
c6a1b300
import
numpy
as
np
d
=
64
# dimension
nb
=
100000
# database size
nq
=
10000
# nb of queries
np
.
random
.
seed
(
1234
)
# make reproducible
xb
=
np
.
random
.
random
((
nb
,
d
)).
astype
(
'float32'
)
xb
[:,
0
]
+=
np
.
arange
(
nb
)
/
1000.
xq
=
np
.
random
.
random
((
nq
,
d
)).
astype
(
'float32'
)
xq
[:,
0
]
+=
np
.
arange
(
nq
)
/
1000.
import
faiss
# make faiss available
res
=
faiss
.
StandardGpuResources
()
# use a single GPU
## Using a flat index
index_flat
=
faiss
.
IndexFlatL2
(
d
)
# build a flat (CPU) index
# make it a flat GPU index
gpu_index_flat
=
faiss
.
index_cpu_to_gpu
(
res
,
0
,
index_flat
)
gpu_index_flat
.
add
(
xb
)
# add vectors to the index
print
(
gpu_index_flat
.
ntotal
)
k
=
4
# we want to see 4 nearest neighbors
D
,
I
=
gpu_index_flat
.
search
(
xq
,
k
)
# actual search
print
(
I
[:
5
])
# neighbors of the 5 first queries
print
(
I
[
-
5
:])
# neighbors of the 5 last queries
## Using an IVF index
nlist
=
100
quantizer
=
faiss
.
IndexFlatL2
(
d
)
# the other index
index_ivf
=
faiss
.
IndexIVFFlat
(
quantizer
,
d
,
nlist
,
faiss
.
METRIC_L2
)
# here we specify METRIC_L2, by default it performs inner-product search
# make it an IVF GPU index
gpu_index_ivf
=
faiss
.
index_cpu_to_gpu
(
res
,
0
,
index_ivf
)
assert
not
gpu_index_ivf
.
is_trained
gpu_index_ivf
.
train
(
xb
)
# add vectors to the index
assert
gpu_index_ivf
.
is_trained
gpu_index_ivf
.
add
(
xb
)
# add vectors to the index
print
(
gpu_index_ivf
.
ntotal
)
k
=
4
# we want to see 4 nearest neighbors
D
,
I
=
gpu_index_ivf
.
search
(
xq
,
k
)
# actual search
print
(
I
[:
5
])
# neighbors of the 5 first queries
print
(
I
[
-
5
:])
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录