提交 eb81c71f 编写于 作者: M milvus-ci-robot

Merge remote-tracking branch 'upstream/0.5.1' into 0.5.1


Former-commit-id: 8c40f7de5789c87f543e680a656c0d4a8e3043b9
......@@ -5,6 +5,8 @@ Please mark all change in change log and use the ticket from JIRA.
# Milvus 0.5.1 (TODO)
## Bug
- \#104 - test_scheduler core dump
## Improvement
- \#64 - Improvement dump function in scheduler
- \#80 - Print version information into log during server start
......
......@@ -49,6 +49,15 @@ JobMgr::Stop() {
}
}
json
JobMgr::Dump() const {
json ret{
{"running", running_},
{"event_queue_length", queue_.size()},
};
return ret;
}
void
JobMgr::Put(const JobPtr& job) {
{
......
......@@ -28,13 +28,14 @@
#include <vector>
#include "ResourceMgr.h"
#include "interface/interfaces.h"
#include "job/Job.h"
#include "task/Task.h"
namespace milvus {
namespace scheduler {
class JobMgr {
class JobMgr : public interface::dumpable {
public:
explicit JobMgr(ResourceMgrPtr res_mgr);
......@@ -44,6 +45,9 @@ class JobMgr {
void
Stop();
json
Dump() const override;
public:
void
Put(const JobPtr& job);
......
......@@ -170,16 +170,20 @@ ResourceMgr::GetNumGpuResource() const {
return num;
}
std::string
ResourceMgr::Dump() {
std::stringstream ss;
ss << "ResourceMgr contains " << resources_.size() << " resources." << std::endl;
json
ResourceMgr::Dump() const {
json resources{};
for (auto& res : resources_) {
ss << res->Dump();
resources.push_back(res->Dump());
}
return ss.str();
json ret{
{"number_of_resource", resources_.size()},
{"number_of_disk_resource", disk_resources_.size()},
{"number_of_cpu_resource", cpu_resources_.size()},
{"number_of_gpu_resource", gpu_resources_.size()},
{"resources", resources},
};
return ret;
}
std::string
......@@ -187,9 +191,9 @@ ResourceMgr::DumpTaskTables() {
std::stringstream ss;
ss << ">>>>>>>>>>>>>>>ResourceMgr::DumpTaskTable<<<<<<<<<<<<<<<" << std::endl;
for (auto& resource : resources_) {
ss << resource->Dump() << std::endl;
ss << resource->task_table().Dump();
ss << resource->Dump() << std::endl << std::endl;
ss << resource->name() << std::endl;
ss << resource->task_table().Dump().dump();
ss << resource->name() << std::endl << std::endl;
}
return ss.str();
}
......
......@@ -25,13 +25,14 @@
#include <utility>
#include <vector>
#include "interface/interfaces.h"
#include "resource/Resource.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
class ResourceMgr {
class ResourceMgr : public interface::dumpable {
public:
ResourceMgr() = default;
......@@ -103,8 +104,8 @@ class ResourceMgr {
public:
/******** Utility Functions ********/
std::string
Dump();
json
Dump() const override;
std::string
DumpTaskTables();
......
......@@ -66,9 +66,13 @@ Scheduler::PostEvent(const EventPtr& event) {
event_cv_.notify_one();
}
std::string
Scheduler::Dump() {
return std::string();
json
Scheduler::Dump() const {
json ret{
{"running", running_},
{"event_queue_length", event_queue_.size()},
};
return ret;
}
void
......
......@@ -25,14 +25,14 @@
#include <unordered_map>
#include "ResourceMgr.h"
#include "interface/interfaces.h"
#include "resource/Resource.h"
#include "utils/Log.h"
namespace milvus {
namespace scheduler {
// TODO(wxyu): refactor, not friendly to unittest, logical in framework code
class Scheduler {
class Scheduler : public interface::dumpable {
public:
explicit Scheduler(ResourceMgrWPtr res_mgr);
......@@ -57,11 +57,8 @@ class Scheduler {
void
PostEvent(const EventPtr& event);
/*
* Dump as string;
*/
std::string
Dump();
json
Dump() const override;
private:
/******** Events ********/
......
......@@ -53,7 +53,7 @@ ToString(TaskTableItemState state) {
}
json
TaskTimestamp::Dump() {
TaskTimestamp::Dump() const {
json ret{
{"start", start}, {"load", load}, {"loaded", loaded}, {"execute", execute},
{"executed", executed}, {"move", move}, {"moved", moved}, {"finish", finish},
......@@ -141,7 +141,7 @@ TaskTableItem::Moved() {
}
json
TaskTableItem::Dump() {
TaskTableItem::Dump() const {
json ret{
{"id", id},
{"task", (int64_t)task.get()},
......@@ -263,7 +263,7 @@ TaskTable::Get(uint64_t index) {
//}
json
TaskTable::Dump() {
TaskTable::Dump() const {
json ret;
for (auto& item : table_) {
ret.push_back(item->Dump());
......
......@@ -54,7 +54,7 @@ struct TaskTimestamp : public interface::dumpable {
uint64_t finish = 0;
json
Dump() override;
Dump() const override;
};
struct TaskTableItem : public interface::dumpable {
......@@ -92,7 +92,7 @@ struct TaskTableItem : public interface::dumpable {
Moved();
json
Dump() override;
Dump() const override;
};
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
......@@ -245,7 +245,7 @@ class TaskTable : public interface::dumpable {
* Dump;
*/
json
Dump() override;
Dump() const override;
private:
std::uint64_t id_ = 0;
......
......@@ -37,7 +37,7 @@ struct dumpable {
}
virtual json
Dump() = 0;
Dump() const = 0;
};
} // namespace interface
......
......@@ -54,5 +54,13 @@ BuildIndexJob::BuildIndexDone(size_t to_index_id) {
SERVER_LOG_DEBUG << "BuildIndexJob " << id() << " finish index file: " << to_index_id;
}
json
BuildIndexJob::Dump() const {
json ret{
{"number_of_to_index_file", to_index_files_.size()},
};
return ret;
}
} // namespace scheduler
} // namespace milvus
......@@ -53,6 +53,9 @@ class BuildIndexJob : public Job {
void
BuildIndexDone(size_t to_index_id);
json
Dump() const override;
public:
Status&
GetStatus() {
......
......@@ -45,5 +45,15 @@ DeleteJob::ResourceDone() {
cv_.notify_one();
}
json
DeleteJob::Dump() const {
json ret{
{"table_id", table_id_},
{"number_of_resource", num_resource_},
{"number_of_done", done_resource},
};
return ret;
}
} // namespace scheduler
} // namespace milvus
......@@ -44,6 +44,9 @@ class DeleteJob : public Job {
void
ResourceDone();
json
Dump() const override;
public:
std::string
table_id() const {
......
......@@ -27,6 +27,8 @@
#include <unordered_map>
#include <vector>
#include "scheduler/interface/interfaces.h"
namespace milvus {
namespace scheduler {
......@@ -39,7 +41,7 @@ enum class JobType {
using JobId = std::uint64_t;
class Job {
class Job : public interface::dumpable {
public:
inline JobId
id() const {
......
......@@ -63,5 +63,15 @@ SearchJob::GetStatus() {
return status_;
}
json
SearchJob::Dump() const {
json ret{
{"topk", topk_},
{"nq", nq_},
{"nprobe", nprobe_},
};
return ret;
}
} // namespace scheduler
} // namespace milvus
......@@ -61,6 +61,9 @@ class SearchJob : public Job {
Status&
GetStatus();
json
Dump() const override;
public:
uint64_t
topk() const {
......
......@@ -24,7 +24,7 @@ namespace scheduler {
std::ostream&
operator<<(std::ostream& out, const CpuResource& resource) {
out << resource.Dump();
out << resource.Dump().dump();
return out;
}
......
......@@ -28,11 +28,6 @@ class CpuResource : public Resource {
public:
explicit CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
return "<CpuResource, name=" + name_ + ">";
}
friend std::ostream&
operator<<(std::ostream& out, const CpuResource& resource);
......
......@@ -28,11 +28,6 @@ class DiskResource : public Resource {
public:
explicit DiskResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
return "<DiskResource, name=" + name_ + ">";
}
friend std::ostream&
operator<<(std::ostream& out, const DiskResource& resource);
......
......@@ -22,7 +22,7 @@ namespace scheduler {
std::ostream&
operator<<(std::ostream& out, const GpuResource& resource) {
out << resource.Dump();
out << resource.Dump().dump();
return out;
}
......
......@@ -29,11 +29,6 @@ class GpuResource : public Resource {
public:
explicit GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
return "<GpuResource, name=" + name_ + ">";
}
friend std::ostream&
operator<<(std::ostream& out, const GpuResource& resource);
......
......@@ -38,15 +38,21 @@ Node::GetNeighbours() {
return ret;
}
std::string
Node::Dump() {
std::stringstream ss;
ss << "<Node, id=" << std::to_string(id_) << ">::neighbours:" << std::endl;
json
Node::Dump() const {
json neighbours;
for (auto& neighbour : neighbours_) {
ss << "\t<Neighbour, id=" << std::to_string(neighbour.first);
ss << ", connection: " << neighbour.second.connection.Dump() << ">" << std::endl;
json n;
n["id"] = neighbour.first;
n["connection"] = neighbour.second.connection.Dump();
neighbours.push_back(n);
}
return ss.str();
json ret{
{"id", id_},
{"neighbours", neighbours},
};
return ret;
}
void
......
......@@ -24,6 +24,7 @@
#include "Connection.h"
#include "scheduler/TaskTable.h"
#include "scheduler/interface/interfaces.h"
namespace milvus {
namespace scheduler {
......@@ -41,7 +42,7 @@ struct Neighbour {
};
// TODO(lxj): return type void -> Status
class Node {
class Node : public interface::dumpable {
public:
Node();
......@@ -52,8 +53,8 @@ class Node {
GetNeighbours();
public:
std::string
Dump();
json
Dump() const override;
private:
std::mutex mutex_;
......
......@@ -32,6 +32,22 @@ operator<<(std::ostream& out, const Resource& resource) {
return out;
}
std::string
ToString(ResourceType type) {
switch (type) {
case ResourceType::DISK: {
return "DISK";
}
case ResourceType::CPU: {
return "CPU";
}
case ResourceType::GPU: {
return "GPU";
}
default: { return "UNKNOWN"; }
}
}
Resource::Resource(std::string name, ResourceType type, uint64_t device_id, bool enable_loader, bool enable_executor)
: name_(std::move(name)),
type_(type),
......@@ -89,6 +105,22 @@ Resource::WakeupExecutor() {
exec_cv_.notify_one();
}
json
Resource::Dump() const {
json ret{
{"device_id", device_id_},
{"name", name_},
{"type", ToString(type_)},
{"task_average_cost", TaskAvgCost()},
{"task_total_cost", total_cost_},
{"total_tasks", total_task_},
{"running", running_},
{"enable_loader", enable_loader_},
{"enable_executor", enable_executor_},
};
return ret;
}
uint64_t
Resource::NumOfTaskToExec() {
uint64_t count = 0;
......
......@@ -77,10 +77,8 @@ class Resource : public Node, public std::enable_shared_from_this<Resource> {
subscriber_ = std::move(subscriber);
}
inline virtual std::string
Dump() const {
return "<Resource>";
}
json
Dump() const override;
public:
inline std::string
......@@ -121,6 +119,9 @@ class Resource : public Node, public std::enable_shared_from_this<Resource> {
// TODO(wxyu): need double ?
inline uint64_t
TaskAvgCost() const {
if (total_task_ == 0) {
return 0;
}
return total_cost_ / total_task_;
}
......
......@@ -29,11 +29,6 @@ class TestResource : public Resource {
public:
explicit TestResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor);
inline std::string
Dump() const override {
return "<TestResource, name=" + name_ + ">";
}
friend std::ostream&
operator<<(std::ostream& out, const TestResource& resource);
......
......@@ -17,9 +17,9 @@ allure-pytest==2.7.0
pytest-print==0.1.2
pytest-level==0.1.1
six==1.12.0
thrift==0.11.0
typed-ast==1.3.5
wcwidth==0.1.7
wrapt==1.11.1
zipp==0.5.1
pymilvus-test>=0.2.0
scikit-learn>=0.19.1
pymilvus-test>=0.2.0
\ No newline at end of file
......@@ -16,9 +16,6 @@ ADD_TIMEOUT = 60
nprobe = 1
epsilon = 0.0001
index_params = random.choice(gen_index_params())
logging.getLogger().info(index_params)
class TestAddBase:
"""
......@@ -26,6 +23,15 @@ class TestAddBase:
The following cases are used to test `add_vectors / index / search / delete` mixed function
******************************************************************
"""
@pytest.fixture(
scope="function",
params=gen_simple_index_params()
)
def get_simple_index_params(self, request, args):
if "internal" not in args:
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in open source")
return request.param
def test_add_vector_create_table(self, connect, table):
'''
......@@ -71,7 +77,7 @@ class TestAddBase:
method: delete table_2 and add vector to table_1
expected: status ok
'''
param = {'table_name': 'test_delete_table_add_vector_another',
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -79,7 +85,6 @@ class TestAddBase:
status = connect.delete_table(table)
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(param['table_name'], vector)
connect.delete_table(param['table_name'])
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
......@@ -101,14 +106,13 @@ class TestAddBase:
method: add vector and delete table
expected: status ok
'''
param = {'table_name': 'test_add_vector_delete_another_table',
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
status = connect.create_table(param)
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(table, vector)
status = connect.delete_table(param['table_name'])
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
......@@ -131,7 +135,7 @@ class TestAddBase:
method: add vector , sleep, and delete table
expected: status ok
'''
param = {'table_name': 'test_add_vector_sleep_delete_another_table',
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -143,86 +147,91 @@ class TestAddBase:
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_create_index_add_vector(self, connect, table):
def test_create_index_add_vector(self, connect, table, get_simple_index_params):
'''
target: test add vector after build index
method: build index and add vector
expected: status ok
'''
status = connect.create_index(table, index_params)
index_param = get_simple_index_params
status = connect.create_index(table, index_param)
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(table, vector)
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_create_index_add_vector_another(self, connect, table):
def test_create_index_add_vector_another(self, connect, table, get_simple_index_params):
'''
target: test add vector to table_2 after build index for table_1
method: build index and add vector
expected: status ok
'''
param = {'table_name': 'test_create_index_add_vector_another',
index_param = get_simple_index_params
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
status = connect.create_table(param)
status = connect.create_index(table, index_params)
status = connect.create_index(table, index_param)
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(table, vector)
connect.delete_table(param['table_name'])
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_add_vector_create_index(self, connect, table):
def test_add_vector_create_index(self, connect, table, get_simple_index_params):
'''
target: test build index add after vector
method: add vector and build index
expected: status ok
'''
index_param = get_simple_index_params
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(table, vector)
status = connect.create_index(table, index_params)
status = connect.create_index(table, index_param)
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_add_vector_create_index_another(self, connect, table):
def test_add_vector_create_index_another(self, connect, table, get_simple_index_params):
'''
target: test add vector to table_2 after build index for table_1
method: build index and add vector
expected: status ok
'''
param = {'table_name': 'test_add_vector_create_index_another',
index_param = get_simple_index_params
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
status = connect.create_table(param)
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(table, vector)
status = connect.create_index(param['table_name'], index_params)
connect.delete_table(param['table_name'])
status = connect.create_index(param['table_name'], index_param)
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_add_vector_sleep_create_index(self, connect, table):
def test_add_vector_sleep_create_index(self, connect, table, get_simple_index_params):
'''
target: test build index add after vector for a while
method: add vector and build index
expected: status ok
'''
index_param = get_simple_index_params
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(table, vector)
time.sleep(1)
status = connect.create_index(table, index_params)
status = connect.create_index(table, index_param)
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_add_vector_sleep_create_index_another(self, connect, table):
def test_add_vector_sleep_create_index_another(self, connect, table, get_simple_index_params):
'''
target: test add vector to table_2 after build index for table_1 for a while
method: build index and add vector
expected: status ok
'''
param = {'table_name': 'test_add_vector_sleep_create_index_another',
index_param = get_simple_index_params
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -230,8 +239,7 @@ class TestAddBase:
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(table, vector)
time.sleep(1)
status = connect.create_index(param['table_name'], index_params)
connect.delete_table(param['table_name'])
status = connect.create_index(param['table_name'], index_param)
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
......@@ -253,7 +261,7 @@ class TestAddBase:
method: search table and add vector
expected: status ok
'''
param = {'table_name': 'test_search_vector_add_vector_another',
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -261,7 +269,6 @@ class TestAddBase:
vector = gen_single_vector(dim)
status, result = connect.search_vectors(table, 1, nprobe, vector)
status, ids = connect.add_vectors(param['table_name'], vector)
connect.delete_table(param['table_name'])
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
......@@ -283,7 +290,7 @@ class TestAddBase:
method: search table and add vector
expected: status ok
'''
param = {'table_name': 'test_add_vector_search_vector_another',
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -291,7 +298,6 @@ class TestAddBase:
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(table, vector)
status, result = connect.search_vectors(param['table_name'], 1, nprobe, vector)
connect.delete_table(param['table_name'])
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
......@@ -314,7 +320,7 @@ class TestAddBase:
method: search table , sleep, and add vector
expected: status ok
'''
param = {'table_name': 'test_add_vector_sleep_search_vector_another',
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -323,7 +329,6 @@ class TestAddBase:
status, ids = connect.add_vectors(table, vector)
time.sleep(1)
status, result = connect.search_vectors(param['table_name'], 1, nprobe, vector)
connect.delete_table(param['table_name'])
assert status.OK()
"""
......@@ -594,6 +599,15 @@ class TestAddIP:
The following cases are used to test `add_vectors / index / search / delete` mixed function
******************************************************************
"""
@pytest.fixture(
scope="function",
params=gen_simple_index_params()
)
def get_simple_index_params(self, request, args):
if "internal" not in args:
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in open source")
return request.param
def test_add_vector_create_table(self, connect, ip_table):
'''
......@@ -639,7 +653,7 @@ class TestAddIP:
method: delete table_2 and add vector to table_1
expected: status ok
'''
param = {'table_name': 'test_delete_table_add_vector_another',
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -647,7 +661,6 @@ class TestAddIP:
status = connect.delete_table(ip_table)
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(param['table_name'], vector)
connect.delete_table(param['table_name'])
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
......@@ -699,7 +712,7 @@ class TestAddIP:
method: add vector , sleep, and delete table
expected: status ok
'''
param = {'table_name': 'test_add_vector_sleep_delete_another_table',
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -711,86 +724,90 @@ class TestAddIP:
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_create_index_add_vector(self, connect, ip_table):
def test_create_index_add_vector(self, connect, ip_table, get_simple_index_params):
'''
target: test add vector after build index
method: build index and add vector
expected: status ok
'''
status = connect.create_index(ip_table, index_params)
index_param = get_simple_index_params
status = connect.create_index(ip_table, index_param)
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(ip_table, vector)
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_create_index_add_vector_another(self, connect, ip_table):
def test_create_index_add_vector_another(self, connect, ip_table, get_simple_index_params):
'''
target: test add vector to table_2 after build index for table_1
method: build index and add vector
expected: status ok
'''
param = {'table_name': 'test_create_index_add_vector_another',
index_param = get_simple_index_params
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
status = connect.create_table(param)
status = connect.create_index(ip_table, index_params)
status = connect.create_index(ip_table, index_param)
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(ip_table, vector)
connect.delete_table(param['table_name'])
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_add_vector_create_index(self, connect, ip_table):
def test_add_vector_create_index(self, connect, ip_table, get_simple_index_params):
'''
target: test build index add after vector
method: add vector and build index
expected: status ok
'''
index_param = get_simple_index_params
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(ip_table, vector)
status = connect.create_index(ip_table, index_params)
status = connect.create_index(ip_table, index_param)
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_add_vector_create_index_another(self, connect, ip_table):
def test_add_vector_create_index_another(self, connect, ip_table, get_simple_index_params):
'''
target: test add vector to table_2 after build index for table_1
method: build index and add vector
expected: status ok
'''
param = {'table_name': 'test_add_vector_create_index_another',
index_param = get_simple_index_params
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
status = connect.create_table(param)
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(ip_table, vector)
status = connect.create_index(param['table_name'], index_params)
connect.delete_table(param['table_name'])
status = connect.create_index(param['table_name'], index_param)
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_add_vector_sleep_create_index(self, connect, ip_table):
def test_add_vector_sleep_create_index(self, connect, ip_table, get_simple_index_params):
'''
target: test build index add after vector for a while
method: add vector and build index
expected: status ok
'''
index_param = get_simple_index_params
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(ip_table, vector)
time.sleep(1)
status = connect.create_index(ip_table, index_params)
status = connect.create_index(ip_table, index_param)
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
def test_add_vector_sleep_create_index_another(self, connect, ip_table):
def test_add_vector_sleep_create_index_another(self, connect, ip_table, get_simple_index_params):
'''
target: test add vector to table_2 after build index for table_1 for a while
method: build index and add vector
expected: status ok
'''
param = {'table_name': 'test_add_vector_sleep_create_index_another',
index_param = get_simple_index_params
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -798,8 +815,7 @@ class TestAddIP:
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(ip_table, vector)
time.sleep(1)
status = connect.create_index(param['table_name'], index_params)
connect.delete_table(param['table_name'])
status = connect.create_index(param['table_name'], index_param)
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
......@@ -821,7 +837,7 @@ class TestAddIP:
method: search table and add vector
expected: status ok
'''
param = {'table_name': 'test_search_vector_add_vector_another',
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -829,7 +845,6 @@ class TestAddIP:
vector = gen_single_vector(dim)
status, result = connect.search_vectors(ip_table, 1, nprobe, vector)
status, ids = connect.add_vectors(param['table_name'], vector)
connect.delete_table(param['table_name'])
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
......@@ -851,7 +866,7 @@ class TestAddIP:
method: search table and add vector
expected: status ok
'''
param = {'table_name': 'test_add_vector_search_vector_another',
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -859,7 +874,6 @@ class TestAddIP:
vector = gen_single_vector(dim)
status, ids = connect.add_vectors(ip_table, vector)
status, result = connect.search_vectors(param['table_name'], 1, nprobe, vector)
connect.delete_table(param['table_name'])
assert status.OK()
@pytest.mark.timeout(ADD_TIMEOUT)
......@@ -882,7 +896,7 @@ class TestAddIP:
method: search table , sleep, and add vector
expected: status ok
'''
param = {'table_name': 'test_add_vector_sleep_search_vector_another',
param = {'table_name': gen_unique_str(),
'dimension': dim,
'index_file_size': index_file_size,
'metric_type': MetricType.L2}
......@@ -891,7 +905,6 @@ class TestAddIP:
status, ids = connect.add_vectors(ip_table, vector)
time.sleep(1)
status, result = connect.search_vectors(param['table_name'], 1, nprobe, vector)
connect.delete_table(param['table_name'])
assert status.OK()
"""
......@@ -1130,7 +1143,7 @@ class TestAddIP:
nq = 100
vectors = gen_vectors(nq, dim)
table_list = []
for i in range(50):
for i in range(20):
table_name = gen_unique_str('test_add_vector_multi_tables')
table_list.append(table_name)
param = {'table_name': table_name,
......@@ -1140,7 +1153,7 @@ class TestAddIP:
connect.create_table(param)
time.sleep(2)
for j in range(10):
for i in range(50):
for i in range(20):
status, ids = connect.add_vectors(table_name=table_list[i], records=vectors)
assert status.OK()
......
......@@ -8,6 +8,7 @@ import pdb
import threading
from multiprocessing import Pool, Process
import numpy
import sklearn.preprocessing
from milvus import Milvus, IndexType, MetricType
from utils import *
......@@ -15,7 +16,7 @@ nb = 10000
dim = 128
index_file_size = 10
vectors = gen_vectors(nb, dim)
vectors /= numpy.linalg.norm(vectors)
vectors = sklearn.preprocessing.normalize(vectors, axis=1, norm='l2')
vectors = vectors.tolist()
BUILD_TIMEOUT = 60
nprobe = 1
......@@ -218,29 +219,26 @@ class TestIndexBase:
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_no_vectors_then_add_vectors(self, connect, table):
def test_create_index_no_vectors_then_add_vectors(self, connect, table, get_simple_index_params):
'''
target: test create index interface when there is no vectors in table, and does not affect the subsequent process
method: create table and add no vectors in it, and then create index, add vectors in it
expected: return code equals to 0
'''
nlist = 16384
index_param = {"index_type": IndexType.IVF_SQ8, "nlist": nlist}
index_param = get_simple_index_params
status = connect.create_index(table, index_param)
status, ids = connect.add_vectors(table, vectors)
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_same_index_repeatedly(self, connect, table):
def test_create_same_index_repeatedly(self, connect, table, get_simple_index_params):
'''
target: check if index can be created repeatedly, with the same create_index params
method: create index after index have been built
expected: return code success, and search ok
'''
nlist = 16384
status, ids = connect.add_vectors(table, vectors)
index_param = {"index_type": IndexType.IVF_SQ8, "nlist": nlist}
# index_params = get_index_params
index_param = get_simple_index_params
status = connect.create_index(table, index_param)
status = connect.create_index(table, index_param)
assert status.OK()
......@@ -390,9 +388,9 @@ class TestIndexBase:
method: create table and add vectors in it, create index, call drop index
expected: return code 0, and default index param
'''
index_params = get_index_params
index_param = get_index_params
status, ids = connect.add_vectors(table, vectors)
status = connect.create_index(table, index_params)
status = connect.create_index(table, index_param)
assert status.OK()
status, result = connect.describe_index(table)
logging.getLogger().info(result)
......@@ -404,15 +402,15 @@ class TestIndexBase:
assert result._table_name == table
assert result._index_type == IndexType.FLAT
def test_drop_index_repeatly(self, connect, table, get_simple_index_params):
def test_drop_index_repeatly(self, connect, table, get_index_params):
'''
target: test drop index repeatly
method: create index, call drop index, and drop again
expected: return code 0
'''
index_params = get_simple_index_params
index_param = get_index_params
status, ids = connect.add_vectors(table, vectors)
status = connect.create_index(table, index_params)
status = connect.create_index(table, index_param)
assert status.OK()
status, result = connect.describe_index(table)
logging.getLogger().info(result)
......@@ -688,14 +686,13 @@ class TestIndexIP:
assert status.OK()
@pytest.mark.timeout(BUILD_TIMEOUT)
def test_create_index_no_vectors_then_add_vectors(self, connect, ip_table):
def test_create_index_no_vectors_then_add_vectors(self, connect, ip_table, get_simple_index_params):
'''
target: test create index interface when there is no vectors in table, and does not affect the subsequent process
method: create table and add no vectors in it, and then create index, add vectors in it
expected: return code equals to 0
'''
nlist = 16384
index_param = {"index_type": IndexType.IVF_SQ8, "nlist": nlist}
index_param = get_simple_index_params
status = connect.create_index(ip_table, index_param)
status, ids = connect.add_vectors(ip_table, vectors)
assert status.OK()
......
......@@ -6,7 +6,7 @@ import datetime
import logging
from time import sleep
from multiprocessing import Process
import numpy
import sklearn.preprocessing
from milvus import Milvus, IndexType, MetricType
from utils import *
......@@ -15,7 +15,7 @@ index_file_size = 10
table_id = "test_mix"
add_interval_time = 2
vectors = gen_vectors(100000, dim)
vectors /= numpy.linalg.norm(vectors)
vectors = sklearn.preprocessing.normalize(vectors, axis=1, norm='l2')
vectors = vectors.tolist()
top_k = 1
nprobe = 1
......@@ -26,9 +26,9 @@ index_params = {'index_type': IndexType.IVFLAT, 'nlist': 16384}
class TestMixBase:
# TODO: enable
def _test_search_during_createIndex(self, args):
def test_search_during_createIndex(self, args):
loops = 100000
table = "test_search_during_createIndex"
table = gen_unique_str()
query_vecs = [vectors[0], vectors[1]]
uri = "tcp://%s:%s" % (args["ip"], args["port"])
id_0 = 0; id_1 = 0
......@@ -54,6 +54,7 @@ class TestMixBase:
status, ids = milvus_instance.add_vectors(table, vectors)
logging.getLogger().info(status)
def search(milvus_instance):
logging.getLogger().info("In search vectors")
for i in range(loops):
status, result = milvus_instance.search_vectors(table, top_k, nprobe, query_vecs)
logging.getLogger().info(status)
......@@ -69,6 +70,7 @@ class TestMixBase:
p_create.start()
p_create.join()
@pytest.mark.level(2)
def test_mix_multi_tables(self, connect):
'''
target: test functions with multiple tables of different metric_types and index_types
......@@ -77,6 +79,7 @@ class TestMixBase:
expected: status ok
'''
nq = 10000
nlist= 16384
vectors = gen_vectors(nq, dim)
table_list = []
idx = []
......@@ -112,17 +115,17 @@ class TestMixBase:
#create index
for i in range(10):
index_params = {'index_type': IndexType.FLAT, 'nlist': 16384}
index_params = {'index_type': IndexType.FLAT, 'nlist': nlist}
status = connect.create_index(table_list[i], index_params)
assert status.OK()
status = connect.create_index(table_list[30 + i], index_params)
assert status.OK()
index_params = {'index_type': IndexType.IVFLAT, 'nlist': 16384}
index_params = {'index_type': IndexType.IVFLAT, 'nlist': nlist}
status = connect.create_index(table_list[10 + i], index_params)
assert status.OK()
status = connect.create_index(table_list[40 + i], index_params)
assert status.OK()
index_params = {'index_type': IndexType.IVF_SQ8, 'nlist': 16384}
index_params = {'index_type': IndexType.IVF_SQ8, 'nlist': nlist}
status = connect.create_index(table_list[20 + i], index_params)
assert status.OK()
status = connect.create_index(table_list[50 + i], index_params)
......
......@@ -54,7 +54,7 @@ class TestSearchBase:
"""
@pytest.fixture(
scope="function",
params=[1, 99, 101, 1024, 2048, 2049]
params=[1, 99, 1024, 2048, 2049]
)
def get_top_k(self, request):
yield request.param
......@@ -220,7 +220,6 @@ class TestSearchBase:
scope="function",
params=[
(get_last_day(2), get_last_day(1)),
(get_last_day(2), get_current_day()),
(get_next_day(1), get_next_day(2))
]
)
......@@ -482,8 +481,9 @@ class TestSearchBase:
"""
class TestSearchParamsInvalid(object):
index_params = random.choice(gen_index_params())
logging.getLogger().info(index_params)
nlist = 16384
index_param = {"index_type": IndexType.IVF_SQ8, "nlist": nlist}
logging.getLogger().info(index_param)
def init_data(self, connect, table, nb=100):
'''
......@@ -528,7 +528,7 @@ class TestSearchParamsInvalid(object):
def get_top_k(self, request):
yield request.param
@pytest.mark.level(2)
@pytest.mark.level(1)
def test_search_with_invalid_top_k(self, connect, table, get_top_k):
'''
target: test search fuction, with the wrong top_k
......@@ -539,9 +539,12 @@ class TestSearchParamsInvalid(object):
logging.getLogger().info(top_k)
nprobe = 1
query_vecs = gen_vectors(1, dim)
with pytest.raises(Exception) as e:
if isinstance(top_k, int):
status, result = connect.search_vectors(table, top_k, nprobe, query_vecs)
res = connect.server_version()
assert not status.OK()
else:
with pytest.raises(Exception) as e:
status, result = connect.search_vectors(table, top_k, nprobe, query_vecs)
@pytest.mark.level(2)
def test_search_with_invalid_top_k_ip(self, connect, ip_table, get_top_k):
......@@ -554,10 +557,12 @@ class TestSearchParamsInvalid(object):
logging.getLogger().info(top_k)
nprobe = 1
query_vecs = gen_vectors(1, dim)
with pytest.raises(Exception) as e:
if isinstance(top_k, int):
status, result = connect.search_vectors(ip_table, top_k, nprobe, query_vecs)
res = connect.server_version()
assert not status.OK()
else:
with pytest.raises(Exception) as e:
status, result = connect.search_vectors(ip_table, top_k, nprobe, query_vecs)
"""
Test search table with invalid nprobe
"""
......@@ -568,7 +573,7 @@ class TestSearchParamsInvalid(object):
def get_nprobes(self, request):
yield request.param
@pytest.mark.level(2)
@pytest.mark.level(1)
def test_search_with_invalid_nrpobe(self, connect, table, get_nprobes):
'''
target: test search fuction, with the wrong top_k
......@@ -579,7 +584,7 @@ class TestSearchParamsInvalid(object):
nprobe = get_nprobes
logging.getLogger().info(nprobe)
query_vecs = gen_vectors(1, dim)
if isinstance(nprobe, int) and nprobe > 0:
if isinstance(nprobe, int):
status, result = connect.search_vectors(table, top_k, nprobe, query_vecs)
assert not status.OK()
else:
......@@ -597,7 +602,7 @@ class TestSearchParamsInvalid(object):
nprobe = get_nprobes
logging.getLogger().info(nprobe)
query_vecs = gen_vectors(1, dim)
if isinstance(nprobe, int) and nprobe > 0:
if isinstance(nprobe, int):
status, result = connect.search_vectors(ip_table, top_k, nprobe, query_vecs)
assert not status.OK()
else:
......@@ -614,7 +619,7 @@ class TestSearchParamsInvalid(object):
def get_query_ranges(self, request):
yield request.param
@pytest.mark.level(2)
@pytest.mark.level(1)
def test_search_flat_with_invalid_query_range(self, connect, table, get_query_ranges):
'''
target: test search fuction, with the wrong query_range
......
......@@ -178,6 +178,7 @@ class TestTable:
assert res.table_name == table_name
assert res.metric_type == MetricType.L2
@pytest.mark.level(2)
def test_table_describe_table_name_ip(self, connect):
'''
target: test describe table created with correct params
......@@ -266,6 +267,7 @@ class TestTable:
status = connect.delete_table(table)
assert not assert_has_table(connect, table)
@pytest.mark.level(2)
def test_delete_table_ip(self, connect, ip_table):
'''
target: test delete table created with correct params
......@@ -335,7 +337,6 @@ class TestTable:
time.sleep(2)
assert status.OK()
@pytest.mark.level(2)
def test_delete_create_table_repeatedly_ip(self, connect):
'''
target: test delete and create the same table repeatedly
......@@ -587,25 +588,25 @@ class TestTable:
"""
@pytest.fixture(
scope="function",
params=gen_index_params()
params=gen_simple_index_params()
)
def get_index_params(self, request, args):
def get_simple_index_params(self, request, args):
if "internal" not in args:
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in open source")
return request.param
@pytest.mark.level(1)
def test_preload_table(self, connect, table, get_index_params):
index_params = get_index_params
def test_preload_table(self, connect, table, get_simple_index_params):
index_params = get_simple_index_params
status, ids = connect.add_vectors(table, vectors)
status = connect.create_index(table, index_params)
status = connect.preload_table(table)
assert status.OK()
@pytest.mark.level(1)
def test_preload_table_ip(self, connect, ip_table, get_index_params):
index_params = get_index_params
def test_preload_table_ip(self, connect, ip_table, get_simple_index_params):
index_params = get_simple_index_params
status, ids = connect.add_vectors(ip_table, vectors)
status = connect.create_index(ip_table, index_params)
status = connect.preload_table(ip_table)
......@@ -613,19 +614,21 @@ class TestTable:
@pytest.mark.level(1)
def test_preload_table_not_existed(self, connect, table):
table_name = gen_unique_str("test_preload_table_not_existed")
index_params = random.choice(gen_index_params())
table_name = gen_unique_str()
nlist = 16384
index_param = {"index_type": IndexType.IVF_SQ8, "nlist": nlist}
status, ids = connect.add_vectors(table, vectors)
status = connect.create_index(table, index_params)
status = connect.create_index(table, index_param)
status = connect.preload_table(table_name)
assert not status.OK()
@pytest.mark.level(1)
@pytest.mark.level(2)
def test_preload_table_not_existed_ip(self, connect, ip_table):
table_name = gen_unique_str("test_preload_table_not_existed")
index_params = random.choice(gen_index_params())
table_name = gen_unique_str()
nlist = 16384
index_param = {"index_type": IndexType.IVF_SQ8, "nlist": nlist}
status, ids = connect.add_vectors(ip_table, vectors)
status = connect.create_index(ip_table, index_params)
status = connect.create_index(ip_table, index_param)
status = connect.preload_table(table_name)
assert not status.OK()
......@@ -634,7 +637,7 @@ class TestTable:
status = connect.preload_table(table)
assert status.OK()
@pytest.mark.level(1)
@pytest.mark.level(2)
def test_preload_table_no_vectors_ip(self, connect, ip_table):
status = connect.preload_table(ip_table)
assert status.OK()
......@@ -728,7 +731,7 @@ class TestCreateTableIndexSizeInvalid(object):
'dimension': dim,
'index_file_size': file_size,
'metric_type': MetricType.L2}
if isinstance(file_size, int) and file_size > 0:
if isinstance(file_size, int):
status = connect.create_table(param)
assert not status.OK()
else:
......@@ -779,7 +782,7 @@ def preload_table(connect, **params):
return status
def has(connect, **params):
status = assert_has_table(connect, params["table_name"])
status, result = connect.has_table(params["table_name"])
return status
def show(connect, **params):
......@@ -803,7 +806,7 @@ def create_index(connect, **params):
return status
func_map = {
# 0:has,
0:has,
1:show,
10:create_table,
11:describe,
......
......@@ -23,7 +23,7 @@ class TestTableCount:
@pytest.fixture(
scope="function",
params=[
100,
1,
5000,
100000,
],
......@@ -36,9 +36,9 @@ class TestTableCount:
"""
@pytest.fixture(
scope="function",
params=gen_index_params()
params=gen_simple_index_params()
)
def get_index_params(self, request, args):
def get_simple_index_params(self, request, args):
if "internal" not in args:
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in open source")
......@@ -58,14 +58,14 @@ class TestTableCount:
status, res = connect.get_table_row_count(table)
assert res == nb
def test_table_rows_count_after_index_created(self, connect, table, get_index_params):
def test_table_rows_count_after_index_created(self, connect, table, get_simple_index_params):
'''
target: test get_table_row_count, after index have been created
method: add vectors in db, and create index, then calling get_table_row_count with correct params
expected: get_table_row_count raise exception
'''
nb = 100
index_params = get_index_params
index_params = get_simple_index_params
vectors = gen_vectors(nb, dim)
res = connect.add_vectors(table_name=table, records=vectors)
time.sleep(add_time_interval)
......@@ -91,7 +91,7 @@ class TestTableCount:
assert the value returned by get_table_row_count method is equal to 0
expected: the count is equal to 0
'''
table_name = gen_unique_str("test_table")
table_name = gen_unique_str()
param = {'table_name': table_name,
'dimension': dim,
'index_file_size': index_file_size}
......@@ -142,8 +142,8 @@ class TestTableCount:
nq = 100
vectors = gen_vectors(nq, dim)
table_list = []
for i in range(50):
table_name = gen_unique_str('test_table_rows_count_multi_tables')
for i in range(20):
table_name = gen_unique_str()
table_list.append(table_name)
param = {'table_name': table_name,
'dimension': dim,
......@@ -152,7 +152,7 @@ class TestTableCount:
connect.create_table(param)
res = connect.add_vectors(table_name=table_name, records=vectors)
time.sleep(2)
for i in range(50):
for i in range(20):
status, res = connect.get_table_row_count(table_list[i])
assert status.OK()
assert res == nq
......@@ -166,7 +166,7 @@ class TestTableCountIP:
@pytest.fixture(
scope="function",
params=[
100,
1,
5000,
100000,
],
......@@ -180,9 +180,9 @@ class TestTableCountIP:
@pytest.fixture(
scope="function",
params=gen_index_params()
params=gen_simple_index_params()
)
def get_index_params(self, request, args):
def get_simple_index_params(self, request, args):
if "internal" not in args:
if request.param["index_type"] == IndexType.IVF_SQ8H:
pytest.skip("sq8h not support in open source")
......@@ -202,14 +202,14 @@ class TestTableCountIP:
status, res = connect.get_table_row_count(ip_table)
assert res == nb
def test_table_rows_count_after_index_created(self, connect, ip_table, get_index_params):
def test_table_rows_count_after_index_created(self, connect, ip_table, get_simple_index_params):
'''
target: test get_table_row_count, after index have been created
method: add vectors in db, and create index, then calling get_table_row_count with correct params
expected: get_table_row_count raise exception
'''
nb = 100
index_params = get_index_params
index_params = get_simple_index_params
vectors = gen_vectors(nb, dim)
res = connect.add_vectors(table_name=ip_table, records=vectors)
time.sleep(add_time_interval)
......@@ -243,10 +243,8 @@ class TestTableCountIP:
status, res = connect.get_table_row_count(ip_table)
assert res == 0
# TODO: enable
@pytest.mark.level(2)
@pytest.mark.timeout(20)
def _test_table_rows_count_multiprocessing(self, connect, ip_table, args):
@pytest.mark.timeout(60)
def test_table_rows_count_multiprocessing(self, connect, ip_table, args):
'''
target: test table rows_count is correct or not with multiprocess
method: create table and add vectors in it,
......@@ -286,7 +284,7 @@ class TestTableCountIP:
nq = 100
vectors = gen_vectors(nq, dim)
table_list = []
for i in range(50):
for i in range(20):
table_name = gen_unique_str('test_table_rows_count_multi_tables')
table_list.append(table_name)
param = {'table_name': table_name,
......@@ -296,7 +294,7 @@ class TestTableCountIP:
connect.create_table(param)
res = connect.add_vectors(table_name=table_name, records=vectors)
time.sleep(2)
for i in range(50):
for i in range(20):
status, res = connect.get_table_row_count(table_list[i])
assert status.OK()
assert res == nq
\ No newline at end of file
......@@ -26,9 +26,9 @@ def gen_vector(nb, d, seed=np.random.RandomState(1234)):
return xb.tolist()
def gen_unique_str(str=None):
def gen_unique_str(str_value=None):
prefix = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(8))
return prefix if str is None else str + "_" + prefix
return "test_"+prefix if str_value is None else str_value+"_"+prefix
def get_current_day():
......@@ -449,10 +449,11 @@ def gen_index_params():
return gen_params(index_types, nlists)
def gen_simple_index_params():
index_params = []
index_types = [IndexType.FLAT, IndexType.IVFLAT, IndexType.IVF_SQ8, IndexType.IVF_SQ8H]
nlists = [16384]
nlists = [1024]
def gen_params(index_types, nlists):
return [ {"index_type": index_type, "nlist": nlist} \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册