未验证 提交 457fb887 编写于 作者: D del-zhenwu 提交者: GitHub

[skip ci] add argo.yaml and update readme for benchmark (#7094)

Signed-off-by: Ndel-zhenwu <zhenxiang.li@zilliz.com>
上级 9551dc3b
......@@ -14,7 +14,7 @@ FROM python:3.6.8-jessie
SHELL ["/bin/bash", "-o", "pipefail", "-c"]
RUN apt-get update && apt-get install -y --no-install-recommends wget apt-transport-https && \
wget -qO- "https://get.helm.sh/helm-v3.0.2-linux-amd64.tar.gz" | tar --strip-components=1 -xz -C /usr/local/bin linux-amd64/helm && \
wget -qO- "https://get.helm.sh/helm-v3.6.1-linux-amd64.tar.gz" | tar --strip-components=1 -xz -C /usr/local/bin linux-amd64/helm && \
wget -P /tmp https://mirrors.aliyun.com/kubernetes/apt/doc/apt-key.gpg && \
apt-key add /tmp/apt-key.gpg && \
sh -c 'echo deb https://mirrors.aliyun.com/kubernetes/apt/ kubernetes-xenial main > /etc/apt/sources.list.d/kubernetes.list' && \
......@@ -25,6 +25,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends wget apt-transp
COPY requirements.txt /requirements.txt
RUN python3 -m pip install -r /requirements.txt
RUN python3 -m pip install --no-cache-dir -r /requirements.txt
WORKDIR /root
\ No newline at end of file
WORKDIR /root
`milvus_benchmark` is a non-functional testing tool which allows users to run tests on k8s cluster or at local, the primary use case is performance/load/stability testing, the objective is to expose problems in milvus project.
The milvus_benchmark is a non-functional testing tool or service which allows users to run tests on k8s cluster or at local, the primary use case is performance/load/stability testing, the objective is to expose problems in milvus project.
## Quick start
### Description
### Description
- Test cases in `milvus_benchmark` can be organized with `yaml`
- Test can run with local mode or helm mode
- local: install and start your local server, and pass the host/port param when start the tests
- helm: install the server by helm, which will manage the milvus in k8s cluster, and you can interagte the test stage into argo workflow or jenkins pipeline
### Usage
### Usage:
1. Using jenkins:
- Using jenkins:
Use `ci/main_jenkinsfile` as the jenkins pipeline file
2. Using argo:
- Using argo:
example argo workflow yaml configuration: `ci/argo.yaml`
3. Local test:
- Local test:
1). set PYTHONPATH:
1. set PYTHONPATH:
`export PYTHONPATH=/your/project/path/milvus_benchmark`
2). prepare data:
2. prepare data:
if we need to use the sift/deep dataset as the raw data input, we need to mount NAS and update `RAW_DATA_DIR` in `config.py`, the example mount command:
`sudo mount -t cifs -o username=test,vers=1.0 //172.16.70.249/test /test`
3). install requirements:
3. install requirements:
`pip install -r requirements.txt`
4). write test yaml and run with the yaml param:
4. write test yaml and run with the yaml param:
`cd milvus-benchmark/ && python main.py --local --host=* --port=19530 --suite=suites/2_insert_data.yaml`
### Definitions of test suite:
### Test suite
#### Description
Test suite yaml defines the test process, users need to add test suite yaml if adding a customized test into the current test framework.
#### Example
Take the test file `2_insert_data.yaml` as an example
```
insert_performance:
collections:
-
milvus:
db_config.primary_path: /test/milvus/db_data_2/cluster/sift_1m_128_l2
wal_enable: true
collection_name: sift_1m_128_l2
ni_per: 50000
build_index: false
index_type: ivf_sq8
index_param:
nlist: 1024
```
- `insert_performance`
The top level is the runner type: the other test types including: `search_performance/build_performance/insert_performance/accuracy/locust_insert/...`, each test type corresponds to the different runner conponent defined in directory `runnners`
- other fields under runner type
The other parts in the test yaml is the params pass to the runner, such as:
- The field `collection_name` means which kind of collection will be created in milvus
- The field `ni_per` means the batch size
- The filed `build_index` means that whether to create index during inserting
While using argo workflow as benchmark pipeline, the test suite is made of both `client` and `server` configmap, an example will be like this:
`server`
```
kind: ConfigMap
apiVersion: v1
metadata:
name: server-cluster-8c16m
namespace: qa
uid: 3752f85c-c840-40c6-a5db-ae44146ad8b5
resourceVersion: '42213135'
creationTimestamp: '2021-05-14T07:00:53Z'
managedFields:
- manager: dashboard
operation: Update
apiVersion: v1
time: '2021-05-14T07:00:53Z'
fieldsType: FieldsV1
fieldsV1:
'f:data':
.: {}
'f:config.yaml': {}
data:
config.yaml: |
server:
server_tag: "8c16m"
milvus:
deploy_mode: "cluster"
```
`client`
```
kind: ConfigMap
apiVersion: v1
metadata:
name: client-insert-batch-1000
namespace: qa
uid: 8604c277-f00f-47c7-8fcb-9b3bc97efa74
resourceVersion: '42988547'
creationTimestamp: '2021-07-09T08:33:02Z'
managedFields:
- manager: dashboard
operation: Update
apiVersion: v1
fieldsType: FieldsV1
fieldsV1:
'f:data':
.: {}
'f:config.yaml': {}
data:
config.yaml: |
insert_performance:
collections:
-
milvus:
wal_enable: true
collection_name: sift_1m_128_l2
ni_per: 1000
build_index: false
index_type: ivf_sq8
index_param:
nlist: 1024
```
## Overview of the benchmark
### Conponents
- `main.py`
The entry file: parse the input params and initialize the other conponent: `metric`, `env`, `runner`
- `metric`
The test result can be used to analyze the regression or improvement of the milvus system, so we upload the metrics of the test result when a test suite run finished, and then use `redash` to make sense of our data
Test suite yaml defines the test process, users need to write test suite yaml if adding a customized test into the current test framework.
- `db`
Take the test file `2_insert_data.yaml` as an example, the top level is the test type: `insert_performance`, there are lots of test types including: `search_performance/build_performance/insert_performance/accuracy/locust_insert/...`, each test type corresponds to this different runner defined in directory `runnners`, the other parts in the test yaml is the params pass to the runner, such as the field `collection_name` means which kind of collection will be created in milvus.
Currently we use the `mongodb` to store the test result
### Test result:
- `env`
Test result will be uploaded if run with the helm mode, which will be used to judge if the test run pass or failed.
The `env` component defines the server environment and environment management, the instance of the `env` corresponds to the run mode of the benchmark
- `local`: Only defines the host and port for testing
- `helm/docker`: Install and uninstall the server in benchmark stage
- `runner`
The actual executor in benchmark, each test type defined in test suite will generate the corresponding runner instance, there are three stages in `runner`:
- `extract_cases`: There are several test cases defined in each test suite yaml, and each case shares the same server environment and shares the same `prepare` stage, but the `metric` for each case is different, so we need to extract cases from the test suite before the cases runs
- `prepare`: Prepare the data and operations, for example, before running searching, index needs to be created and data needs to be loaded
- `run_case`: Do the core operation and set `metric` value
- `suites`: There are two ways to take the content to be tested as input parameters:
- Test suite files under `suites` directory
- Test suite configmap name including `server_config_map` and `client_config_map` if using argo workflow
- `update.py`: While using argo workflow as benchmark pipeline, we have two steps in workflow template: `install-milvus` and `client-test`
- In stage `install-milvus`, `update.py` is used to generate a new `values.yaml` which will be a param while in `helm install` operation
- In stage `client-test`, it runs `main.py` and receives the milvus host and port as the cmd params, with the run mode `local`
### Conceptual overview
The following diagram shows the runtime execution graph of the benchmark (local mode based on argo workflow)
<img src="asserts/uml.jpg" />
metadata:
name: benchmark
namespace: qa
uid: e8a51212-9b27-441d-b357-e73c63854ccf
resourceVersion: '63697833'
generation: 41
creationTimestamp: '2021-05-25T11:04:40Z'
labels:
workflows.argoproj.io/creator: system-serviceaccount-argo-argo-server
managedFields:
- manager: argo
operation: Update
apiVersion: argoproj.io/v1alpha1
fieldsType: FieldsV1
fieldsV1:
'f:metadata':
'f:labels':
.: {}
'f:workflows.argoproj.io/creator': {}
'f:spec':
.: {}
'f:arguments':
.: {}
'f:parameters': {}
'f:entrypoint': {}
'f:nodeSelector':
.: {}
'f:node-role.kubernetes.io/benchmark': {}
'f:onExit': {}
'f:serviceAccountName': {}
'f:templates': {}
'f:tolerations': {}
'f:volumes': {}
spec:
templates:
- name: benchmark-loop
inputs: {}
outputs: {}
metadata: {}
steps:
- - name: call-benchmark-test
template: benchmark
arguments:
parameters:
- name: server-instance
value: '{{workflow.name}}-{{item.instanceId}}'
- name: server-configmap
value: '{{item.server-configmap}}'
- name: client-configmap
value: '{{item.client-configmap}}'
withParam: '{{workflow.parameters.configmaps}}'
- name: uninstall-all
inputs: {}
outputs: {}
metadata: {}
steps:
- - name: uninstall-milvus
template: uninstall-milvus
arguments:
parameters:
- name: server-instance
value: '{{workflow.name}}-{{item.instanceId}}'
withParam: '{{workflow.parameters.configmaps}}'
- name: benchmark
inputs:
parameters:
- name: server-instance
- name: server-configmap
- name: client-configmap
outputs: {}
metadata: {}
steps:
- - name: install-milvus
template: install-milvus
arguments:
parameters:
- name: server-instance
value: '{{inputs.parameters.server-instance}}'
- name: server-configmap
value: '{{inputs.parameters.server-configmap}}'
- - name: client-test
template: client-test
arguments:
parameters:
- name: server-instance
value: '{{inputs.parameters.server-instance}}'
- name: server-configmap
value: '{{inputs.parameters.server-configmap}}'
- name: client-configmap
value: '{{inputs.parameters.client-configmap}}'
- name: uninstall-milvus
inputs:
parameters:
- name: server-instance
outputs: {}
metadata: {}
container:
name: ''
image: 'registry.zilliz.com/milvus/milvus-test-env:v0.5'
command:
- /bin/sh
- '-c'
args:
- ' helm uninstall -n qa-milvus {{inputs.parameters.server-instance}} && kubectl delete pvc -l app.kubernetes.io/instance={{inputs.parameters.server-instance}} -n qa-milvus '
resources: {}
volumeMounts:
- name: kube-config
mountPath: /root/.kube
- name: install-milvus
inputs:
parameters:
- name: server-instance
- name: server-configmap
artifacts:
- name: charts
path: /src/helm
git:
repo: 'git@github.com:milvus-io/milvus-helm.git'
revision: master
sshPrivateKeySecret:
name: github-key
key: ssh-private-key
- name: benchmark-src
path: /src/benchmark
git:
repo: 'git@github.com:zilliztech/milvus_benchmark.git'
revision: '{{workflow.parameters.test-client-branch}}'
sshPrivateKeySecret:
name: github-key
key: ssh-private-key
outputs: {}
metadata: {}
container:
name: ''
image: 'registry.zilliz.com/milvus/milvus-test-env:v0.5'
command:
- /bin/sh
- '-c'
args:
- ' cd /src/helm/charts/milvus && cp -r /src/benchmark/milvus_benchmark/* . && cp /configmap-server/config.yaml . && python update.py --src-values=values.yaml --deploy-params=config.yaml && cat values.yaml && helm install -n qa-milvus --set image.all.repository={{workflow.parameters.milvus-image-repository}} --set image.all.tag={{workflow.parameters.milvus-image-tag}} --set image.all.pullPolicy=Always --set etcd.persistence.enabled=false --set servicemonitor.enabled=true --wait --timeout 15m {{inputs.parameters.server-instance}} . && kubectl get pods -n qa-milvus -l app.kubernetes.io/instance={{inputs.parameters.server-instance}} '
resources: {}
volumeMounts:
- name: kube-config
readOnly: true
mountPath: /root/.kube
- name: benchmark-server-configmap
mountPath: /configmap-server
volumes:
- name: benchmark-server-configmap
configMap:
name: '{{inputs.parameters.server-configmap}}'
- name: client-test
inputs:
parameters:
- name: server-instance
- name: server-configmap
- name: client-configmap
artifacts:
- name: source
path: /src
git:
repo: 'git@github.com:zilliztech/milvus_benchmark.git'
revision: '{{workflow.parameters.test-client-branch}}'
sshPrivateKeySecret:
name: github-key
key: ssh-private-key
outputs: {}
metadata: {}
container:
name: ''
image: 'registry.zilliz.com/milvus/milvus-test-env:v0.5'
command:
- /bin/sh
- '-c'
args:
- ' cd /src && pip install -r requirements.txt -i https://pypi.doubanio.com/simple/ --trusted-host pypi.doubanio.com && pip install -i https://test.pypi.org/simple/ pymilvus=={{workflow.parameters.test-sdk-version}} && cd milvus_benchmark && export PYTHONPATH=/src && python main.py --host={{inputs.parameters.server-instance}}-milvus.qa-milvus.svc.cluster.local --local --suite=/configmap-client/config.yaml --server-config=/configmap-server/config.yaml'
resources:
limits:
cpu: '4'
memory: 4Gi
volumeMounts:
- name: kube-config
readOnly: true
mountPath: /root/.kube
- name: benchmark-server-configmap
mountPath: /configmap-server
- name: benchmark-client-configmap
mountPath: /configmap-client
- name: db-data-path
mountPath: /test
volumes:
- name: benchmark-server-configmap
configMap:
name: '{{inputs.parameters.server-configmap}}'
- name: benchmark-client-configmap
configMap:
name: '{{inputs.parameters.client-configmap}}'
- name: db-data-path
flexVolume:
driver: fstab/cifs
fsType: cifs
secretRef:
name: cifs-test-secret
options:
mountOptions: vers=1.0
networkPath: //172.16.70.249/test
activeDeadlineSeconds: 21600
entrypoint: benchmark-loop
arguments:
parameters:
- name: milvus-image-repository
value: harbor.zilliz.cc/dockerhub/milvusdb/milvus-dev
- name: milvus-image-tag
value: master-latest
- name: test-client-branch
value: master
- name: test-sdk-version
value: 2.0.0rc4.dev1
- name: configmaps
value: ' [ {"instanceId":"1", "server-configmap": "server-single-8c16m", "client-configmap": "client-acc-sift-ivf-flat" } ]'
serviceAccountName: qa-admin
volumes:
- name: kube-config
secret:
secretName: qa-admin-config
nodeSelector:
node-role.kubernetes.io/benchmark: ''
tolerations:
- key: node-role.kubernetes.io/benchmark
operator: Exists
effect: NoSchedule
onExit: uninstall-all
......@@ -6,11 +6,8 @@ from kubernetes import client, config
from kubernetes.client.rest import ApiException
from milvus_benchmark import config as cf
config.load_kube_config()
api_instance = client.CustomObjectsApi()
logger = logging.getLogger("milvus_benchmark.chaos.chaosOpt")
class ChaosOpt(object):
def __init__(self, kind, group=cf.DEFAULT_GROUP, version=cf.DEFAULT_VERSION, namespace=cf.CHAOS_NAMESPACE):
self.group = group
......@@ -25,6 +22,8 @@ class ChaosOpt(object):
# body = create_chaos_config(self.plural, self.metadata_name, spec_params)
# logger.info(body)
pretty = 'true'
config.load_kube_config()
api_instance = client.CustomObjectsApi()
try:
api_response = api_instance.create_namespaced_custom_object(self.group, self.version, self.namespace,
plural=self.plural, body=body, pretty=pretty)
......@@ -37,6 +36,8 @@ class ChaosOpt(object):
def delete_chaos_object(self, metadata_name):
print(metadata_name)
try:
config.load_kube_config()
api_instance = client.CustomObjectsApi()
data = api_instance.delete_namespaced_custom_object(self.group, self.version, self.namespace, self.plural,
metadata_name)
logger.info(data)
......@@ -46,6 +47,8 @@ class ChaosOpt(object):
def list_chaos_object(self):
try:
config.load_kube_config()
api_instance = client.CustomObjectsApi()
data = api_instance.list_namespaced_custom_object(self.group, self.version, self.namespace,
plural=self.plural)
# pprint(data)
......
......@@ -4,7 +4,6 @@ from operator import methodcaller
from kubernetes import client, config
from milvus_benchmark import config as cf
logger = logging.getLogger("milvus_benchmark.chaos.utils")
......
......@@ -145,10 +145,10 @@ class MilvusClient(object):
self._milvus.create_partition(collection_name, tag)
@time_wrapper
def insert(self, entities, collection_name=None):
def insert(self, entities, collection_name=None, timeout=None):
tmp_collection_name = self._collection_name if collection_name is None else collection_name
try:
insert_res = self._milvus.insert(tmp_collection_name, entities)
insert_res = self._milvus.insert(tmp_collection_name, entities, timeout=timeout)
return insert_res.primary_keys
except Exception as e:
logger.error(str(e))
......@@ -234,9 +234,9 @@ class MilvusClient(object):
# raise Exception("Error occured")
@time_wrapper
def flush(self,_async=False, collection_name=None):
def flush(self, _async=False, collection_name=None, timeout=None):
tmp_collection_name = self._collection_name if collection_name is None else collection_name
self._milvus.flush([tmp_collection_name], _async=_async)
self._milvus.flush([tmp_collection_name], _async=_async, timeout=timeout)
@time_wrapper
def compact(self, collection_name=None):
......@@ -246,11 +246,11 @@ class MilvusClient(object):
# only support "in" in expr
@time_wrapper
def get(self, ids, collection_name=None):
def get(self, ids, collection_name=None, timeout=None):
tmp_collection_name = self._collection_name if collection_name is None else collection_name
# res = self._milvus.get(tmp_collection_name, ids, output_fields=None, partition_names=None)
ids_expr = "id in %s" % (str(ids))
res = self._milvus.query(tmp_collection_name, ids_expr, output_fields=None, partition_names=None)
res = self._milvus.query(tmp_collection_name, ids_expr, output_fields=None, partition_names=None, timeout=timeout)
return res
@time_wrapper
......@@ -343,7 +343,7 @@ class MilvusClient(object):
ids.append(res.ids)
return ids
def query_rand(self, nq_max=100):
def query_rand(self, nq_max=100, timeout=None):
# for ivf search
dimension = 128
top_k = random.randint(1, 100)
......@@ -360,9 +360,9 @@ class MilvusClient(object):
"metric_type": utils.metric_type_trans(metric_type),
"params": search_param}
}}
self.query(vector_query)
self.query(vector_query, timeout=timeout)
def load_query_rand(self, nq_max=100):
def load_query_rand(self, nq_max=100, timeout=None):
# for ivf search
dimension = 128
top_k = random.randint(1, 100)
......@@ -379,7 +379,7 @@ class MilvusClient(object):
"metric_type": utils.metric_type_trans(metric_type),
"params": search_param}
}}
self.load_and_query(vector_query)
self.load_and_query(vector_query, timeout=timeout)
# TODO: need to check
def count(self, collection_name=None):
......
import os
import sys
import time
import argparse
import logging
import traceback
......@@ -45,16 +46,17 @@ def get_image_tag(image_version):
# back_scheduler.shutdown(wait=False)
def run_suite(run_type, suite, env_mode, env_params):
def run_suite(run_type, suite, env_mode, env_params, timeout=None):
try:
start_status = False
metric = api.Metric()
deploy_mode = env_params["deploy_mode"]
deploy_opology = env_params["deploy_opology"] if "deploy_opology" in env_params else None
env = get_env(env_mode, deploy_mode)
metric.set_run_id()
metric.set_mode(env_mode)
metric.env = Env()
metric.server = Server(version=config.SERVER_VERSION, mode=deploy_mode)
metric.server = Server(version=config.SERVER_VERSION, mode=deploy_mode, deploy_opology=deploy_opology)
logger.info(env_params)
if env_mode == "local":
metric.hardware = Hardware("")
......@@ -228,7 +230,8 @@ def main():
"host": args.host,
"port": args.port,
"deploy_mode": deploy_mode,
"server_tag": server_tag
"server_tag": server_tag,
"deploy_opology": deploy_params_dict
}
suite_file = args.suite
with open(suite_file) as f:
......@@ -242,8 +245,9 @@ def main():
# ensure there is only one case in suite
# suite = {"run_type": run_type, "run_params": collections[0]}
suite = collections[0]
timeout = suite["timeout"] if "timeout" in suite else None
env_mode = "local"
return run_suite(run_type, suite, env_mode, env_params)
return run_suite(run_type, suite, env_mode, env_params, timeout=timeout)
# job = back_scheduler.add_job(run_suite, args=[run_type, suite, env_mode, env_params], misfire_grace_time=36000)
# logger.info(job)
# logger.info(job.id)
......
......@@ -13,12 +13,13 @@ class Server:
}
"""
def __init__(self, version=None, mode=None, build_commit=None):
def __init__(self, version=None, mode=None, build_commit=None, deploy_opology=None):
self._version = '0.1'
self._type = 'server'
self.version = version
self.mode = mode
self.build_commit = build_commit
self.deploy_opology = deploy_opology
# self.md5 = md5
def json_md5(self):
......
from .insert import InsertRunner
from .insert import InsertRunner, BPInsertRunner
from .locust import LocustInsertRunner, LocustSearchRunner, LocustRandomRunner
from .search import SearchRunner, InsertSearchRunner
from .build import BuildRunner, InsertBuildRunner
......@@ -11,6 +11,7 @@ from .chaos import SimpleChaosRunner
def get_runner(name, env, metric):
return {
"insert_performance": InsertRunner(env, metric),
"bp_insert_performance": BPInsertRunner(env, metric),
"search_performance": SearchRunner(env, metric),
"insert_search_performance": InsertSearchRunner(env, metric),
"locust_insert_performance": LocustInsertRunner(env, metric),
......
......@@ -90,7 +90,7 @@ class AccuracyRunner(BaseRunner):
self.milvus.set_collection(collection_name)
if not self.milvus.exists_collection():
logger.info("collection not exist")
self.milvus.load_collection()
self.milvus.load_collection(timeout=600)
def run_case(self, case_metric, **case_param):
collection_size = case_param["collection_size"]
......
......@@ -116,4 +116,6 @@ class InsertGetRunner(GetRunner):
flush_time = round(time.time() - start_time, 2)
logger.debug({"collection count": self.milvus.count()})
logger.debug({"flush_time": flush_time})
self.milvus.load_collection()
logger.debug("Start load collection")
self.milvus.load_collection(timeout=1200)
logger.debug("Load collection end")
......@@ -123,3 +123,121 @@ class InsertRunner(BaseRunner):
build_time = round(time.time()-start_time, 2)
tmp_result.update({"flush_time": flush_time, "build_time": build_time})
return tmp_result
class BPInsertRunner(BaseRunner):
"""run insert"""
name = "bp_insert_performance"
def __init__(self, env, metric):
super(BPInsertRunner, self).__init__(env, metric)
def extract_cases(self, collection):
collection_name = collection["collection_name"] if "collection_name" in collection else None
(data_type, collection_size, dimension, metric_type) = parser.collection_parser(collection_name)
ni_pers = collection["ni_pers"]
build_index = collection["build_index"] if "build_index" in collection else False
index_info = None
vector_type = utils.get_vector_type(data_type)
other_fields = collection["other_fields"] if "other_fields" in collection else None
index_field_name = None
index_type = None
index_param = None
if build_index is True:
index_type = collection["index_type"]
index_param = collection["index_param"]
index_info = {
"index_type": index_type,
"index_param": index_param
}
index_field_name = utils.get_default_field_name(vector_type)
flush = True
if "flush" in collection and collection["flush"] == "no":
flush = False
case_metrics = list()
case_params = list()
for ni_per in ni_pers:
collection_info = {
"dimension": dimension,
"metric_type": metric_type,
"dataset_name": collection_name,
"collection_size": collection_size,
"other_fields": other_fields,
"ni_per": ni_per
}
self.init_metric(self.name, collection_info, index_info, None)
case_metric = copy.deepcopy(self.metric)
case_metric.set_case_metric_type()
case_metrics.append(case_metric)
case_param = {
"collection_name": collection_name,
"data_type": data_type,
"dimension": dimension,
"collection_size": collection_size,
"ni_per": ni_per,
"metric_type": metric_type,
"vector_type": vector_type,
"other_fields": other_fields,
"build_index": build_index,
"flush_after_insert": flush,
"index_field_name": index_field_name,
"index_type": index_type,
"index_param": index_param,
}
case_params.append(case_param)
return case_params, case_metrics
def prepare(self, **case_param):
collection_name = case_param["collection_name"]
dimension = case_param["dimension"]
vector_type = case_param["vector_type"]
other_fields = case_param["other_fields"]
index_field_name = case_param["index_field_name"]
build_index = case_param["build_index"]
self.milvus.set_collection(collection_name)
if self.milvus.exists_collection():
logger.debug("Start drop collection")
self.milvus.drop()
time.sleep(utils.DELETE_INTERVAL_TIME)
self.milvus.create_collection(dimension, data_type=vector_type,
other_fields=other_fields)
# TODO: update fields in collection_info
# fields = self.get_fields(self.milvus, collection_name)
# collection_info = {
# "dimension": dimension,
# "metric_type": metric_type,
# "dataset_name": collection_name,
# "fields": fields
# }
if build_index is True:
if case_param["index_type"]:
self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"])
logger.debug(self.milvus.describe_index(index_field_name))
else:
build_index = False
logger.warning("Please specify the index_type")
# TODO: error handler
def run_case(self, case_metric, **case_param):
collection_name = case_param["collection_name"]
dimension = case_param["dimension"]
index_field_name = case_param["index_field_name"]
build_index = case_param["build_index"]
# TODO:
tmp_result = self.insert(self.milvus, collection_name, case_param["data_type"], dimension, case_param["collection_size"], case_param["ni_per"])
flush_time = 0.0
build_time = 0.0
if case_param["flush_after_insert"] is True:
start_time = time.time()
self.milvus.flush()
flush_time = round(time.time()-start_time, 2)
logger.debug(self.milvus.count())
if build_index is True:
logger.debug("Start build index for last file")
start_time = time.time()
self.milvus.create_index(index_field_name, case_param["index_type"], case_param["metric_type"], index_param=case_param["index_param"])
build_time = round(time.time()-start_time, 2)
tmp_result.update({"flush_time": flush_time, "build_time": build_time})
return tmp_result
......@@ -269,13 +269,14 @@ class LocustSearchRunner(LocustRunner):
load_start_time = time.time()
self.milvus.load_collection()
logger.debug({"load_time": round(time.time()-load_start_time, 2)})
search_param = None
for op in case_param["task"]["types"]:
if op["type"] == "query":
search_param = op["params"]["search_param"]
break
logger.info("index_field_name: {}".format(index_field_name))
self.milvus.warm_query(index_field_name, search_param, metric_type, times=2)
# search_param = None
# for op in case_param["task"]["types"]:
# if op["type"] == "query":
# search_param = op["params"]["search_param"]
# break
# logger.info("index_field_name: {}".format(index_field_name))
# TODO: enable warm query
# self.milvus.warm_query(index_field_name, search_param, metric_type, times=2)
class LocustRandomRunner(LocustRunner):
......
......@@ -28,20 +28,20 @@ class Tasks(TaskSet):
if isinstance(filter, dict) and "term" in filter:
filter_query.append(eval(filter["term"]))
# logger.debug(filter_query)
self.client.query(vector_query, filter_query=filter_query, log=False, timeout=120)
self.client.query(vector_query, filter_query=filter_query, log=False, timeout=30)
@task
def flush(self):
self.client.flush(log=False)
self.client.flush(log=False, timeout=30)
@task
def load(self):
self.client.load_collection()
self.client.load_collection(timeout=30)
@task
def release(self):
self.client.release_collection()
self.client.load_collection()
self.client.load_collection(timeout=30)
# @task
# def release_index(self):
......
......@@ -96,7 +96,7 @@ class SearchRunner(BaseRunner):
return False
logger.debug(self.milvus.count())
logger.info("Start load collection")
self.milvus.load_collection()
self.milvus.load_collection(timeout=1200)
# TODO: enable warm query
# self.milvus.warm_query(index_field_name, search_params[0], times=2)
......@@ -262,7 +262,7 @@ class InsertSearchRunner(BaseRunner):
logger.info(self.milvus.count())
logger.info("Start load collection")
load_start_time = time.time()
self.milvus.load_collection()
self.milvus.load_collection(timeout=1200)
logger.debug({"load_time": round(time.time()-load_start_time, 2)})
def run_case(self, case_metric, **case_param):
......@@ -284,7 +284,7 @@ class InsertSearchRunner(BaseRunner):
tmp_result = {"insert": self.insert_result, "build_time": self.build_time, "search_time": min_query_time, "avc_search_time": avg_query_time}
#
# logger.info("Start load collection")
# self.milvus.load_collection()
# self.milvus.load_collection(timeout=1200)
# logger.info("Release load collection")
# self.milvus.release_collection()
return tmp_result
\ No newline at end of file
import math
from locust import User, TaskSet, task, constant
from locust import LoadTestShape
class StepLoadShape(LoadTestShape):
"""
A step load shape
Keyword arguments:
step_time -- Time between steps
step_load -- User increase amount at each step
spawn_rate -- Users to stop/start per second at every step
time_limit -- Time limit in seconds
"""
step_time = 30
step_load = 10
spawn_rate = 10
time_limit = 600
def tick(self):
run_time = self.get_run_time()
if run_time > self.time_limit:
return None
current_step = math.floor(run_time / self.step_time) + 1
return (current_step * self.step_load, self.spawn_rate)
class UserTasks(TaskSet):
@task
def get_root(self):
print("in usertasks")
class WebsiteUser(User):
wait_time = constant(0.5)
tasks = [UserTasks]
shape = StepLoadShape
......@@ -113,7 +113,7 @@ def update_values(src_values_file, deploy_params_file):
logging.info("TODO: Need to schedule pod on GPU server")
logging.debug("Add tolerations into standalone server")
values_dict['standalone']['tolerations'] = perf_tolerations
# values_dict['minio']['tolerations'] = perf_tolerations
values_dict['minio']['tolerations'] = perf_tolerations
values_dict['etcd']['tolerations'] = perf_tolerations
else:
# TODO: mem limits on distributed mode
......@@ -146,7 +146,7 @@ def update_values(src_values_file, deploy_params_file):
values_dict['indexNode']['tolerations'] = perf_tolerations
values_dict['dataNode']['tolerations'] = perf_tolerations
values_dict['etcd']['tolerations'] = perf_tolerations
# values_dict['minio']['tolerations'] = perf_tolerations
values_dict['minio']['tolerations'] = perf_tolerations
values_dict['pulsarStandalone']['tolerations'] = perf_tolerations
# TODO: for distributed deployment
# values_dict['pulsar']['autoRecovery']['tolerations'] = perf_tolerations
......
# pymilvus==0.2.14
# pymilvus-distributed>=0.0.61
--extra-index-url https://test.pypi.org/simple/
pymilvus==2.0.0rc2.dev12
# for local install
# --extra-index-url https://test.pypi.org/simple/
# pymilvus==2.0.0rc3.dev8
grpcio==1.37.1
grpcio-testing==1.37.1
grpcio-tools==1.37.1
scipy==1.3.1
scikit-learn==0.19.1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册