提交 2967e58a 编写于 作者: J jinhai

MS-279 Merge from Branch-0.3.1


Former-commit-id: 3b7a93e4c647b3234e885500e2e96301a863fbc1
......@@ -17,3 +17,5 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-1 - Add CHANGELOG.md
- MS-161 - Add CI / CD Module to Milvus Project
- MS-202 - Add Milvus Jenkins project email notification
- MS-215 - Add Milvus cluster CI/CD groovy file
- MS-277 - Update CUDA Version to V10.1
try {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}"
if (currentBuild.result == 'ABORTED') {
throw new hudson.AbortException("Dev Test Aborted !")
} else if (currentBuild.result == 'FAILURE') {
error("Dev Test Failure !")
def result = sh script: "helm status ${env.JOB_NAME}-${env.BUILD_NUMBER}", returnStatus: true
if (!result) {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}"
}
} catch (exc) {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
def result = sh script: "helm status ${env.JOB_NAME}-${env.BUILD_NUMBER}", returnStatus: true
if (!result) {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}"
}
throw exc
}
try {
def result = sh script: "helm status ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster", returnStatus: true
if (!result) {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster"
}
} catch (exc) {
def result = sh script: "helm status ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster", returnStatus: true
if (!result) {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster"
}
throw exc
}
try {
sh 'helm init --client-only --skip-refresh --stable-repo-url https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts'
sh 'helm repo add milvus https://registry.zilliz.com/chartrepo/milvus'
sh 'helm repo update'
dir ("milvus-helm") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus-helm.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
dir ("milvus/milvus-cluster") {
sh "helm install --wait --timeout 300 --set roServers.image.tag=${DOCKER_VERSION} --set woServers.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP -f ci/values.yaml --name ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster --namespace milvus-cluster --version 0.3.1 . "
}
}
/*
timeout(time: 2, unit: 'MINUTES') {
waitUntil {
def result = sh script: "nc -z -w 3 ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster-milvus-cluster-proxy.milvus-cluster.svc.cluster.local 19530", returnStatus: true
return !result
}
}
*/
} catch (exc) {
echo 'Helm running failed!'
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster"
throw exc
}
timeout(time: 10, unit: 'MINUTES') {
try {
dir ("${PROJECT_NAME}_test") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh 'python3 -m pip install -r requirements_cluster.txt'
sh "pytest . --alluredir=cluster_test_out --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster-milvus-cluster-proxy.milvus-cluster.svc.cluster.local"
}
} catch (exc) {
echo 'Milvus Cluster Test Failed !'
throw exc
}
}
......@@ -2,10 +2,15 @@ try {
sh 'helm init --client-only --skip-refresh --stable-repo-url https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts'
sh 'helm repo add milvus https://registry.zilliz.com/chartrepo/milvus'
sh 'helm repo update'
sh "helm install --set engine.image.repository=registry.zilliz.com/${PROJECT_NAME}/engine --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.JOB_NAME}-${env.BUILD_NUMBER} --version 0.3.0 milvus/milvus-gpu"
dir ("milvus-helm") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus-helm.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
dir ("milvus/milvus-gpu") {
sh "helm install --wait --timeout 300 --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.JOB_NAME}-${env.BUILD_NUMBER} -f ci/values.yaml --namespace milvus-1 --version 0.3.1 ."
}
}
} catch (exc) {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo 'Helm running failed!'
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}"
throw exc
}
container('milvus-testframework') {
timeout(time: 10, unit: 'MINUTES') {
gitlabCommitStatus(name: 'Dev Test') {
try {
dir ("${PROJECT_NAME}_test") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git"]]])
sh 'python3 -m pip install -r requirements.txt'
sh "pytest . --alluredir=test_out --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.kube-opt.svc.cluster.local"
}
} catch (exc) {
updateGitlabCommitStatus name: 'Dev Test', state: 'failed'
currentBuild.result = 'FAILURE'
echo 'Milvus Test Failed !'
timeout(time: 20, unit: 'MINUTES') {
try {
dir ("${PROJECT_NAME}_test") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh 'python3 -m pip install -r requirements.txt'
sh "pytest . --alluredir=test_out --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.milvus-1.svc.cluster.local"
}
// mysql database backend test
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
if (!fileExists('milvus-helm')) {
dir ("milvus-helm") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus-helm.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
}
}
dir ("milvus-helm") {
dir ("milvus/milvus-gpu") {
sh "helm install --wait --timeout 300 --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.JOB_NAME}-${env.BUILD_NUMBER} -f ci/db_backend/mysql_values.yaml --namespace milvus-2 --version 0.3.1 ."
}
}
dir ("${PROJECT_NAME}_test") {
sh "pytest . --alluredir=test_out --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.milvus-2.svc.cluster.local"
}
} catch (exc) {
echo 'Milvus Test Failed !'
throw exc
}
}
container('milvus-build-env') {
timeout(time: 20, unit: 'MINUTES') {
timeout(time: 30, unit: 'MINUTES') {
gitlabCommitStatus(name: 'Build Engine') {
dir ("milvus_engine") {
try {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus.git"]]])
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
dir ("cpp") {
sh "git config --global user.email \"test@zilliz.com\""
sh "git config --global user.name \"test\""
......
container('milvus-build-env') {
timeout(time: 20, unit: 'MINUTES') {
timeout(time: 30, unit: 'MINUTES') {
gitlabCommitStatus(name: 'Build Engine') {
dir ("milvus_engine") {
try {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus.git"]]])
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
dir ("cpp") {
sh "git config --global user.email \"test@zilliz.com\""
sh "git config --global user.name \"test\""
sh "./build.sh -t ${params.BUILD_TYPE}"
}
} catch (exc) {
......
......@@ -3,7 +3,7 @@ container('publish-docker') {
gitlabCommitStatus(name: 'Publish Engine Docker') {
try {
dir ("${PROJECT_NAME}_build") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:build/milvus_build.git"]]])
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:build/milvus_build.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
dir ("docker/deploy/ubuntu16.04/free_version") {
sh "curl -O -u anonymous: ftp://192.168.1.126/data/${PROJECT_NAME}/engine/${JOB_NAME}-${BUILD_ID}/${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz"
sh "tar zxvf ${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz"
......@@ -12,7 +12,10 @@ container('publish-docker') {
def customImage = docker.build("${PROJECT_NAME}/engine:${DOCKER_VERSION}")
customImage.push()
}
echo "Docker Pull Command: docker pull registry.zilliz.com/${PROJECT_NAME}/engine:${DOCKER_VERSION}"
if (currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
updateGitlabCommitStatus name: 'Publish Engine Docker', state: 'success'
echo "Docker Pull Command: docker pull registry.zilliz.com/${PROJECT_NAME}/engine:${DOCKER_VERSION}"
}
} catch (exc) {
updateGitlabCommitStatus name: 'Publish Engine Docker', state: 'canceled'
throw exc
......@@ -29,3 +32,4 @@ container('publish-docker') {
}
}
}
timeout(time: 5, unit: 'MINUTES') {
dir ("${PROJECT_NAME}_test") {
if (fileExists('cluster_test_out')) {
def fileTransfer = load "${env.WORKSPACE}/ci/function/file_transfer.groovy"
fileTransfer.FileTransfer("cluster_test_out/", "${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}", 'nas storage')
if (currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
echo "Milvus Dev Test Out Viewer \"ftp://192.168.1.126/data/${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}\""
}
} else {
error("Milvus Dev Test Out directory don't exists!")
}
}
}
container('milvus-testframework') {
timeout(time: 5, unit: 'MINUTES') {
dir ("${PROJECT_NAME}_test") {
gitlabCommitStatus(name: 'Upload Dev Test Out') {
if (fileExists('test_out')) {
try {
def fileTransfer = load "${env.WORKSPACE}/ci/function/file_transfer.groovy"
fileTransfer.FileTransfer("test_out/", "${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}", 'nas storage')
if (currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
echo "Milvus Dev Test Out Viewer \"ftp://192.168.1.126/data/${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}\""
}
} catch (hudson.AbortException ae) {
updateGitlabCommitStatus name: 'Upload Dev Test Out', state: 'canceled'
currentBuild.result = 'ABORTED'
} catch (exc) {
updateGitlabCommitStatus name: 'Upload Dev Test Out', state: 'failed'
currentBuild.result = 'FAILURE'
}
} else {
updateGitlabCommitStatus name: 'Upload Dev Test Out', state: 'failed'
echo "Milvus Dev Test Out directory don't exists!"
}
timeout(time: 5, unit: 'MINUTES') {
dir ("${PROJECT_NAME}_test") {
if (fileExists('test_out')) {
def fileTransfer = load "${env.WORKSPACE}/ci/function/file_transfer.groovy"
fileTransfer.FileTransfer("test_out/", "${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}", 'nas storage')
if (currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
echo "Milvus Dev Test Out Viewer \"ftp://192.168.1.126/data/${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}\""
}
} else {
error("Milvus Dev Test Out directory don't exists!")
}
}
}
......@@ -35,7 +35,7 @@ pipeline {
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-build-env'
image 'registry.zilliz.com/milvus/milvus-build-env:v0.10'
image 'registry.zilliz.com/milvus/milvus-build-env:v0.12'
ttyEnabled true
command 'cat'
}
......@@ -130,97 +130,187 @@ spec:
}
stage("Deploy to Development") {
stages {
stage("Deploy to Dev") {
parallel {
stage("Single Node") {
agent {
kubernetes {
label 'jenkins-slave'
label 'dev-test'
defaultContainer 'jnlp'
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage('Deploy') {
stage("Deploy to Dev") {
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
}
}
}
}
}
stage("Dev Test") {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_test_out.groovy"
}
}
}
}
}
stage ("Cleanup Dev") {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
}
}
}
}
}
post {
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Single Node CI/CD success !"
}
}
aborted {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'canceled'
echo "Milvus Deloy to Dev aborted !"
echo "Milvus Single Node CI/CD aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo "Milvus Deloy to Dev failure !"
echo "Milvus Single Node CI/CD failure !"
}
}
}
}
stage("Dev Test") {
stage("Cluster") {
agent {
kubernetes {
label 'test'
label 'dev-test'
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-testframework'
image 'registry.zilliz.com/milvus/milvus-test:v0.1'
ttyEnabled true
command 'cat'
}
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage('Test') {
stage("Deploy to Dev") {
steps {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_test_out.groovy"
gitlabCommitStatus(name: 'Deloy to Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_deploy2dev.groovy"
}
}
}
}
}
}
}
stage ("Cleanup Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
stage("Dev Test") {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_cluster_test_out.groovy"
}
}
}
}
}
}
stages {
stage('Cleanup') {
stage ("Cleanup Dev") {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
}
}
}
}
}
post {
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Cluster CI/CD success !"
}
}
aborted {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'canceled'
echo "Milvus Cleanup Dev aborted !"
echo "Milvus Cluster CI/CD aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
echo "Milvus Cleanup Dev failure !"
echo "Milvus Cluster CI/CD failure !"
}
}
}
......@@ -234,6 +324,7 @@ spec:
post {
always {
script {
<<<<<<< HEAD
if (!currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
// Send an email only if the build status has changed from green/unstable to red
emailext subject: '$DEFAULT_SUBJECT',
......@@ -244,6 +335,20 @@ spec:
],
replyTo: '$DEFAULT_REPLYTO',
to: '$DEFAULT_RECIPIENTS'
=======
if (env.gitlabAfter != null) {
if (!currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
// Send an email only if the build status has changed from green/unstable to red
emailext subject: '$DEFAULT_SUBJECT',
body: '$DEFAULT_CONTENT',
recipientProviders: [
[$class: 'DevelopersRecipientProvider'],
[$class: 'RequesterRecipientProvider']
],
replyTo: '$DEFAULT_REPLYTO',
to: '$DEFAULT_RECIPIENTS'
}
>>>>>>> branch-0.3.1
}
}
}
......@@ -270,3 +375,4 @@ spec:
}
}
}
......@@ -35,7 +35,7 @@ pipeline {
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-build-env'
image 'registry.zilliz.com/milvus/milvus-build-env:v0.10'
image 'registry.zilliz.com/milvus/milvus-build-env:v0.12'
ttyEnabled true
command 'cat'
}
......@@ -130,97 +130,187 @@ spec:
}
stage("Deploy to Development") {
stages {
stage("Deploy to Dev") {
parallel {
stage("Single Node") {
agent {
kubernetes {
label 'jenkins-slave'
label 'dev-test'
defaultContainer 'jnlp'
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage('Deploy') {
stage("Deploy to Dev") {
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
}
}
}
}
}
stage("Dev Test") {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_test_out.groovy"
}
}
}
}
}
stage ("Cleanup Dev") {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
}
}
}
}
}
post {
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Single Node CI/CD success !"
}
}
aborted {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'canceled'
echo "Milvus Deloy to Dev aborted !"
echo "Milvus Single Node CI/CD aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo "Milvus Deloy to Dev failure !"
echo "Milvus Single Node CI/CD failure !"
}
}
}
}
stage("Dev Test") {
stage("Cluster") {
agent {
kubernetes {
label 'test'
label 'dev-test'
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-testframework'
image 'registry.zilliz.com/milvus/milvus-test:v0.1'
ttyEnabled true
command 'cat'
}
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage('Test') {
stage("Deploy to Dev") {
steps {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_test_out.groovy"
gitlabCommitStatus(name: 'Deloy to Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_deploy2dev.groovy"
}
}
}
}
}
}
}
stage ("Cleanup Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
stage("Dev Test") {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_cluster_test_out.groovy"
}
}
}
}
}
}
stages {
stage('Cleanup') {
stage ("Cleanup Dev") {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
}
}
}
}
}
post {
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Cluster CI/CD success !"
}
}
aborted {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'canceled'
echo "Milvus Cleanup Dev aborted !"
echo "Milvus Cluster CI/CD aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
echo "Milvus Cleanup Dev failure !"
echo "Milvus Cluster CI/CD failure !"
}
}
}
......@@ -232,6 +322,24 @@ spec:
}
post {
always {
script {
if (env.gitlabAfter != null) {
if (!currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
// Send an email only if the build status has changed from green/unstable to red
emailext subject: '$DEFAULT_SUBJECT',
body: '$DEFAULT_CONTENT',
recipientProviders: [
[$class: 'DevelopersRecipientProvider'],
[$class: 'RequesterRecipientProvider']
],
replyTo: '$DEFAULT_REPLYTO',
to: '$DEFAULT_RECIPIENTS'
}
}
}
}
success {
script {
updateGitlabCommitStatus name: 'CI/CD', state: 'success'
......@@ -254,3 +362,4 @@ spec:
}
}
}
......@@ -35,7 +35,11 @@ pipeline {
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-build-env'
<<<<<<< HEAD
image 'registry.zilliz.com/milvus/milvus-build-env:v0.10'
=======
image 'registry.zilliz.com/milvus/milvus-build-env:v0.12'
>>>>>>> branch-0.3.1
ttyEnabled true
command 'cat'
}
......@@ -130,6 +134,7 @@ spec:
}
stage("Deploy to Development") {
<<<<<<< HEAD
stages {
stage("Deploy to Dev") {
agent {
......@@ -144,12 +149,78 @@ spec:
gitlabCommitStatus(name: 'Deloy to Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
=======
parallel {
stage("Single Node") {
agent {
kubernetes {
label 'dev-test'
defaultContainer 'jnlp'
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage("Deploy to Dev") {
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
}
}
}
}
}
stage("Dev Test") {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_test_out.groovy"
}
}
}
}
}
stage ("Cleanup Dev") {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
>>>>>>> branch-0.3.1
}
}
}
}
}
post {
<<<<<<< HEAD
aborted {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'canceled'
......@@ -161,11 +232,34 @@ spec:
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo "Milvus Deloy to Dev failure !"
=======
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Single Node CI/CD success !"
}
}
aborted {
script {
echo "Milvus Single Node CI/CD aborted !"
}
}
failure {
script {
echo "Milvus Single Node CI/CD failure !"
>>>>>>> branch-0.3.1
}
}
}
}
<<<<<<< HEAD
stage("Dev Test") {
agent {
kubernetes {
......@@ -204,12 +298,76 @@ spec:
gitlabCommitStatus(name: 'Cleanup Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
=======
stage("Cluster") {
agent {
kubernetes {
label 'dev-test'
defaultContainer 'jnlp'
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage("Deploy to Dev") {
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_deploy2dev.groovy"
}
}
}
}
}
stage("Dev Test") {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_cluster_test_out.groovy"
}
}
}
}
}
stage ("Cleanup Dev") {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
>>>>>>> branch-0.3.1
}
}
}
}
}
post {
<<<<<<< HEAD
aborted {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'canceled'
......@@ -221,6 +379,28 @@ spec:
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
echo "Milvus Cleanup Dev failure !"
=======
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Cluster CI/CD success !"
}
}
aborted {
script {
echo "Milvus Cluster CI/CD aborted !"
}
}
failure {
script {
echo "Milvus Cluster CI/CD failure !"
>>>>>>> branch-0.3.1
}
}
}
......@@ -232,6 +412,25 @@ spec:
}
post {
<<<<<<< HEAD
=======
always {
script {
if (!currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
// Send an email only if the build status has changed from green/unstable to red
emailext subject: '$DEFAULT_SUBJECT',
body: '$DEFAULT_CONTENT',
recipientProviders: [
[$class: 'DevelopersRecipientProvider'],
[$class: 'RequesterRecipientProvider']
],
replyTo: '$DEFAULT_REPLYTO',
to: '$DEFAULT_RECIPIENTS'
}
}
}
>>>>>>> branch-0.3.1
success {
script {
updateGitlabCommitStatus name: 'CI/CD', state: 'success'
......@@ -254,3 +453,7 @@ spec:
}
}
}
<<<<<<< HEAD
=======
>>>>>>> branch-0.3.1
......@@ -9,18 +9,40 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-148 - Disable cleanup if mode is read only
- MS-149 - Fixed searching only one index file issue in distributed mode
- MS-153 - fix c_str error when connecting to MySQL
- MS-157 - fix changelog
- MS-190 - use env variable to switch mem manager and fix cmake
- MS-153 - Fix c_str error when connecting to MySQL
- MS-157 - Fix changelog
- MS-190 - Use env variable to switch mem manager and fix cmake
- MS-217 - Fix SQ8 row count bug
- MS-224 - Return AlreadyExist status in MySQLMetaImpl::CreateTable if table already exists
- MS-232 - Add MySQLMetaImpl::UpdateTableFilesToIndex and set maximum_memory to default if config value = 0
- MS-233 - Remove mem manager log
- MS-230 - Change parameter name: Maximum_memory to insert_buffer_size
- MS-234 - Some case cause background merge thread stop
- MS-235 - Some test cases random fail
- MS-236 - Add MySQLMetaImpl::HasNonIndexFiles
- MS-257 - Update bzip2 download url
## Improvement
- MS-156 - Add unittest for merge result functions
- MS-152 - Delete assert in MySQLMetaImpl and change MySQLConnectionPool impl
- MS-204 - Support multi db_path
- MS-206 - Support SQ8 index type
- MS-208 - Add buildinde interface for C++ SDK
- MS-212 - Support Inner product metric type
- MS-241 - Build Faiss with MKL if using Intel CPU; else build with OpenBlas
- MS-242 - Clean up cmake and change MAKE_BUILD_ARGS to be user defined variable
- MS-245 - Improve search result transfer performance
- MS-248 - Support AddVector/SearchVector profiling
- MS-256 - Add more cache config
- MS-260 - Refine log
- MS-249 - Check machine hardware during initialize
- MS-261 - Update faiss version to 1.5.3 and add BUILD_FAISS_WITH_MKL as an option
- MS-278 - add IndexStatsHelper
## New Feature
- MS-137 - Integrate knowhere
- MS-180 - Add new mem manager
- MS-195 - Add nlist and use_blas_threshold conf
- MS-137 - Integrate knowhere
## Task
......@@ -75,6 +97,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-144 - Add nprobe config
- MS-147 - Enable IVF
- MS-130 - Add prometheus_test
## Task
- MS-74 - Change README.md in cpp
- MS-88 - Add support for arm architecture
......
......@@ -116,6 +116,11 @@ set(MILVUS_ENGINE_SRC ${PROJECT_SOURCE_DIR}/src)
add_compile_definitions(PROFILER=${PROFILER})
message("MILVUS_ENABLE_PROFILING = ${MILVUS_ENABLE_PROFILING}")
if (MILVUS_ENABLE_PROFILING STREQUAL "ON")
ADD_DEFINITIONS(-DMILVUS_ENABLE_PROFILING)
endif()
include_directories(${MILVUS_ENGINE_INCLUDE})
include_directories(${MILVUS_ENGINE_SRC})
......
### Compilation
#### Step 1: install necessery tools
centos7 :
yum install gfortran qt4 flex bison mysql-devel mysql
ubuntu16.04 :
sudo apt-get install gfortran qt4-qmake flex bison libmysqlclient-dev mysql-client
cd scripts && sudo ./requirements.sh
If `libmysqlclient_r.so` does not exist after installing MySQL Development Files, you need to create a symbolic link:
......
......@@ -7,8 +7,10 @@ INSTALL_PREFIX=$(pwd)/milvus
MAKE_CLEAN="OFF"
BUILD_COVERAGE="OFF"
DB_PATH="/opt/milvus"
PROFILING="OFF"
BUILD_FAISS_WITH_MKL="OFF"
while getopts "p:d:t:uhlrc" arg
while getopts "p:d:t:uhlrcgm" arg
do
case $arg in
t)
......@@ -36,6 +38,12 @@ do
c)
BUILD_COVERAGE="ON"
;;
g)
PROFILING="ON"
;;
m)
BUILD_FAISS_WITH_MKL="ON"
;;
h) # help
echo "
......@@ -47,9 +55,11 @@ parameter:
-l: build license version(default: OFF)
-r: remove previous build directory(default: OFF)
-c: code coverage(default: OFF)
-g: profiling(default: OFF)
-m: build faiss with MKL(default: OFF)
usage:
./build.sh -t \${BUILD_TYPE} [-u] [-h] [-g] [-r] [-c]
./build.sh -t \${BUILD_TYPE} [-u] [-h] [-g] [-r] [-c] [-m]
"
exit 0
;;
......@@ -77,6 +87,8 @@ if [[ ${MAKE_CLEAN} == "ON" ]]; then
-DCMAKE_LICENSE_CHECK=${LICENSE_CHECK} \
-DBUILD_COVERAGE=${BUILD_COVERAGE} \
-DMILVUS_DB_PATH=${DB_PATH} \
-DMILVUS_ENABLE_PROFILING=${PROFILING} \
-DBUILD_FAISS_WITH_MKL=${BUILD_FAISS_WITH_MKL} \
$@ ../"
echo ${CMAKE_CMD}
......
......@@ -57,8 +57,6 @@ define_option(MILVUS_VERBOSE_THIRDPARTY_BUILD
define_option(MILVUS_WITH_ARROW "Build with ARROW" OFF)
define_option(MILVUS_BOOST_USE_SHARED "Rely on boost shared libraries where relevant" OFF)
define_option(MILVUS_BOOST_VENDORED "Use vendored Boost instead of existing Boost. \
Note that this requires linking Boost statically" ON)
......@@ -110,6 +108,11 @@ define_option(MILVUS_WITH_ZSTD "Build with zstd compression" ${MILVUS_WITH_ZSTD_
define_option(MILVUS_WITH_AWS "Build with AWS SDK" ON)
if (MILVUS_ENABLE_PROFILING STREQUAL "ON")
define_option(MILVUS_WITH_LIBUNWIND "Build with libunwind" ON)
define_option(MILVUS_WITH_GPERFTOOLS "Build with gperftools" ON)
endif()
#----------------------------------------------------------------------
if(MSVC)
set_option_category("MSVC")
......
此差异已折叠。
......@@ -6,7 +6,7 @@
TO_STANDARD_OUTPUT = false
SUBSECOND_PRECISION = 3
PERFORMANCE_TRACKING = false
MAX_LOG_FILE_SIZE = 2097152 ## Throw log files away after 2MB
MAX_LOG_FILE_SIZE = 209715200 ## Throw log files away after 200MB
* DEBUG:
FILENAME = "@MILVUS_DB_PATH@/logs/milvus-%datetime{%H:%m}-debug.log"
ENABLED = true
......
......@@ -6,6 +6,9 @@ server_config:
db_config:
db_path: @MILVUS_DB_PATH@ # milvus data storage path
db_slave_path: # secondry data storage path, split by semicolon
parallel_reduce: false # use multi-threads to reduce topk result
# URI format: dialect://username:password@host:port/database
# All parts except dialect are optional, but you MUST include the delimiters
......@@ -13,10 +16,10 @@ db_config:
db_backend_url: sqlite://:@:/
index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB
archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB
archive_days_threshold: 30 # files older than x days will be archived, unit: day
maximum_memory: 4 # maximum memory allowed, default: 4, unit: GB, should be at least 1 GB.
# the sum of maximum_memory and cpu_cache_capacity should be less than total memory
archive_disk_threshold: 0 # triger archive action if storage size exceed this value, 0 means no limit, unit: GB
archive_days_threshold: 0 # files older than x days will be archived, 0 means no limit, unit: day
insert_buffer_size: 4 # maximum insert buffer size allowed, default: 4, unit: GB, should be at least 1 GB.
# the sum of insert_buffer_size and cpu_cache_capacity should be less than total memory, unit: GB
metric_config:
is_startup: off # if monitoring start: on, off
......@@ -33,6 +36,11 @@ license_config: # license configure
cache_config: # cache configure
cpu_cache_capacity: 16 # how many memory are used as cache, unit: GB, range: 0 ~ less than total memory
cache_free_percent: 0.85 # old data will be erased from cache when cache is full, this value specify how much memory should be kept, range: greater than zero ~ 1.0
insert_cache_immediately: false # insert data will be load into cache immediately for hot query
engine_config:
nprobe: 10
\ No newline at end of file
nprobe: 10
nlist: 16384
use_blas_threshold: 20
metric_type: L2 # compare vectors by euclidean distance(L2) or inner product(IP), optional: L2 or IP
......@@ -13,6 +13,27 @@ DIR_LCOV_OUTPUT="lcov_out"
DIR_GCNO="cmake_build"
DIR_UNITTEST="milvus/bin"
MYSQL_USER_NAME=root
MYSQL_PASSWORD=Fantast1c
MYSQL_HOST='192.168.1.194'
MYSQL_PORT='3306'
MYSQL_DB_NAME=milvus_`date +%s%N`
function mysql_exc()
{
cmd=$1
mysql -h${MYSQL_HOST} -u${MYSQL_USER_NAME} -p${MYSQL_PASSWORD} -e "${cmd}"
if [ $? -ne 0 ]; then
echo "mysql $cmd run failed"
fi
}
mysql_exc "CREATE DATABASE IF NOT EXISTS ${MYSQL_DB_NAME};"
mysql_exc "GRANT ALL PRIVILEGES ON ${MYSQL_DB_NAME}.* TO '${MYSQL_USER_NAME}'@'%';"
mysql_exc "FLUSH PRIVILEGES;"
mysql_exc "USE ${MYSQL_DB_NAME};"
MYSQL_USER_NAME=root
MYSQL_PASSWORD=Fantast1c
......
#!/usr/bin/env bash
wget -P /tmp https://apt.repos.intel.com/intel-gpg-keys/GPG-PUB-KEY-INTEL-SW-PRODUCTS-2019.PUB
apt-key add /tmp/GPG-PUB-KEY-INTEL-SW-PRODUCTS-2019.PUB
sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list'
apt -y update && apt-get -y install intel-mkl-gnu-2019.4-243 intel-mkl-core-2019.4-243
#sh -c 'echo export LD_LIBRARY_PATH=/opt/intel/compilers_and_libraries_2019.4.243/linux/mkl/lib/intel64:\$LD_LIBRARY_PATH > /etc/profile.d/mkl.sh'
#source /etc/profile
......@@ -49,7 +49,6 @@ set(engine_files
${db_files}
${db_scheduler_files}
${wrapper_files}
# metrics/Metrics.cpp
${metrics_files}
${knowhere_files}
)
......@@ -93,10 +92,26 @@ set(third_party_libs
cublas
mysqlpp
${CUDA_TOOLKIT_ROOT_DIR}/lib64/stubs/libnvidia-ml.so
cudart
)
if (MEGASEARCH_WITH_ARROW STREQUAL "ON")
set(third_party_libs ${third_party_libs} arrow)
endif()
endif()
if(${BUILD_FAISS_WITH_MKL} STREQUAL "ON")
set(third_party_libs ${third_party_libs}
${MKL_LIBS})
else()
set(third_party_libs ${third_party_libs}
lapack
openblas)
endif()
if (MILVUS_ENABLE_PROFILING STREQUAL "ON")
set(third_party_libs ${third_party_libs}
gperftools
libunwind)
endif()
if (GPU_VERSION STREQUAL "ON")
link_directories("${CUDA_TOOLKIT_ROOT_DIR}/lib64")
......@@ -147,6 +162,8 @@ if (ENABLE_LICENSE STREQUAL "ON")
endif ()
set(metrics_lib
easyloggingpp
yaml-cpp
prometheus-cpp-push
prometheus-cpp-pull
prometheus-cpp-core
......@@ -213,6 +230,6 @@ install(FILES
${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}
${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3
${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3.2.4
DESTINATION lib) #need to copy libmysqlpp.so
DESTINATION lib)
#add_subdirectory(sdk)
add_subdirectory(sdk)
......@@ -13,9 +13,12 @@ namespace zilliz {
namespace milvus {
namespace cache {
constexpr double DEFAULT_THRESHHOLD_PERCENT = 0.85;
Cache::Cache(int64_t capacity, uint64_t cache_max_count)
: usage_(0),
capacity_(capacity),
freemem_percent_(DEFAULT_THRESHHOLD_PERCENT),
lru_(cache_max_count) {
// AGENT_LOG_DEBUG << "Construct Cache with capacity " << std::to_string(mem_capacity)
}
......@@ -64,15 +67,14 @@ void Cache::insert(const std::string& key, const DataObjPtr& data_ptr) {
usage_ += data_ptr->size();
}
// AGENT_LOG_DEBUG << "Insert into LRU(" << (capacity_ > 0 ? std::to_string(usage_ * 100 / capacity_) : "Nan")
// << "%, +" << data_ptr->size() << ", " << usage_ << ", " << lru_.size() << "):"
// << " " << key;
SERVER_LOG_DEBUG << "Insert " << key << " size:" << data_ptr->size()
<< " bytes into cache, usage: " << usage_ << " bytes";
}
if (usage_ > capacity_) {
// AGENT_LOG_TRACE << "Current usage " << usage_
// << " exceeds cache capacity " << capacity_
// << ", start free memory";
SERVER_LOG_DEBUG << "Current usage " << usage_
<< " exceeds cache capacity " << capacity_
<< ", start free memory";
free_memory();
}
}
......@@ -86,12 +88,9 @@ void Cache::erase(const std::string& key) {
const CacheObjPtr& obj_ptr = lru_.get(key);
const DataObjPtr& data_ptr = obj_ptr->data_;
usage_ -= data_ptr->size();
// AGENT_LOG_DEBUG << "Erase from LRU(" << (capacity_ > 0 ? std::to_string(usage_*100/capacity_) : "Nan")
// << "%, -" << data_ptr->size() << ", " << usage_ << ", " << lru_.size() << "): "
// << (data_ptr->flags().get_flag(DataObjAttr::kPinned) ? "Pinned " : "")
// << (data_ptr->flags().get_flag(DataObjAttr::kValid) ? "Valid " : "")
// << "(ref:" << obj_ptr->ref_ << ") "
// << key;
SERVER_LOG_DEBUG << "Erase " << key << " size: " << data_ptr->size();
lru_.erase(key);
}
......@@ -99,7 +98,7 @@ void Cache::clear() {
std::lock_guard<std::mutex> lock(mutex_);
lru_.clear();
usage_ = 0;
// AGENT_LOG_DEBUG << "Clear LRU !";
SERVER_LOG_DEBUG << "Clear cache !";
}
#if 0 /* caiyd 20190221, need more testing before enable */
......@@ -162,8 +161,11 @@ void Cache::restore_from_file(const std::string& key, const CacheObjPtr& obj_ptr
void Cache::free_memory() {
if (usage_ <= capacity_) return;
int64_t threshhold = capacity_ * THRESHHOLD_PERCENT;
int64_t threshhold = capacity_ * freemem_percent_;
int64_t delta_size = usage_ - threshhold;
if(delta_size <= 0) {
delta_size = 1;//ensure at least one item erased
}
std::set<std::string> key_array;
int64_t released_size = 0;
......@@ -183,7 +185,7 @@ void Cache::free_memory() {
}
}
// AGENT_LOG_DEBUG << "to be released memory size: " << released_size;
SERVER_LOG_DEBUG << "to be released memory size: " << released_size;
for (auto& key : key_array) {
erase(key);
......@@ -193,28 +195,15 @@ void Cache::free_memory() {
}
void Cache::print() {
int64_t still_pinned_count = 0;
int64_t total_pinned_size = 0;
int64_t total_valid_empty_size = 0;
size_t cache_count = 0;
{
std::lock_guard<std::mutex> lock(mutex_);
for (auto it = lru_.begin(); it != lru_.end(); ++it) {
auto& obj_ptr = it->second;
const auto& data_ptr = obj_ptr->data_;
if (data_ptr != nullptr) {
total_pinned_size += data_ptr->size();
++still_pinned_count;
} else {
total_valid_empty_size += data_ptr->size();
}
}
cache_count = lru_.size();
}
SERVER_LOG_DEBUG << "[Still Pinned count]: " << still_pinned_count;
SERVER_LOG_DEBUG << "[Pinned Memory total size(byte)]: " << total_pinned_size;
SERVER_LOG_DEBUG << "[valid_empty total size(byte)]: " << total_valid_empty_size;
SERVER_LOG_DEBUG << "[free memory size(byte)]: " << capacity_ - total_pinned_size - total_valid_empty_size;
SERVER_LOG_DEBUG << "[Cache item count]: " << cache_count;
SERVER_LOG_DEBUG << "[Cache usage]: " << usage_ << " bytes";
SERVER_LOG_DEBUG << "[Cache capacity]: " << capacity_ << " bytes";
}
} // cache
......
......@@ -18,7 +18,6 @@ namespace milvus {
namespace cache {
const std::string SWAP_DIR = ".CACHE";
const float THRESHHOLD_PERCENT = 0.75;
class Cache {
private:
......@@ -45,6 +44,9 @@ public:
int64_t capacity() const { return capacity_; } //unit: BYTE
void set_capacity(int64_t capacity); //unit: BYTE
double freemem_percent() const { return freemem_percent_; };
void set_freemem_percent(double percent) { freemem_percent_ = percent; }
size_t size() const;
bool exists(const std::string& key);
DataObjPtr get(const std::string& key);
......@@ -57,6 +59,7 @@ public:
private:
int64_t usage_;
int64_t capacity_;
double freemem_percent_;
LRU<std::string, CacheObjPtr> lru_;
mutable std::mutex mutex_;
......
......@@ -4,6 +4,7 @@
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include "utils/Log.h"
#include "CacheMgr.h"
#include "metrics/Metrics.h"
......@@ -20,6 +21,7 @@ CacheMgr::~CacheMgr() {
uint64_t CacheMgr::ItemCount() const {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
}
......@@ -28,6 +30,7 @@ uint64_t CacheMgr::ItemCount() const {
bool CacheMgr::ItemExists(const std::string& key) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return false;
}
......@@ -36,6 +39,7 @@ bool CacheMgr::ItemExists(const std::string& key) {
DataObjPtr CacheMgr::GetItem(const std::string& key) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return nullptr;
}
server::Metrics::GetInstance().CacheAccessTotalIncrement();
......@@ -53,6 +57,7 @@ engine::Index_ptr CacheMgr::GetIndex(const std::string& key) {
void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
......@@ -62,6 +67,7 @@ void CacheMgr::InsertItem(const std::string& key, const DataObjPtr& data) {
void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
......@@ -72,6 +78,7 @@ void CacheMgr::InsertItem(const std::string& key, const engine::Index_ptr& index
void CacheMgr::EraseItem(const std::string& key) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
......@@ -81,6 +88,7 @@ void CacheMgr::EraseItem(const std::string& key) {
void CacheMgr::PrintInfo() {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
......@@ -89,6 +97,7 @@ void CacheMgr::PrintInfo() {
void CacheMgr::ClearCache() {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
......@@ -97,6 +106,7 @@ void CacheMgr::ClearCache() {
int64_t CacheMgr::CacheUsage() const {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
}
......@@ -105,6 +115,7 @@ int64_t CacheMgr::CacheUsage() const {
int64_t CacheMgr::CacheCapacity() const {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return 0;
}
......@@ -113,6 +124,7 @@ int64_t CacheMgr::CacheCapacity() const {
void CacheMgr::SetCapacity(int64_t capacity) {
if(cache_ == nullptr) {
SERVER_LOG_ERROR << "Cache doesn't exist";
return;
}
cache_->set_capacity(capacity);
......
......@@ -6,16 +6,29 @@
#include "CpuCacheMgr.h"
#include "server/ServerConfig.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace cache {
namespace {
constexpr int64_t unit = 1024 * 1024 * 1024;
}
CpuCacheMgr::CpuCacheMgr() {
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
int64_t cap = config.GetInt64Value(server::CONFIG_CPU_CACHE_CAPACITY, 16);
cap *= 1024*1024*1024;
cap *= unit;
cache_ = std::make_shared<Cache>(cap, 1UL<<32);
double free_percent = config.GetDoubleValue(server::CACHE_FREE_PERCENT, 0.85);
if(free_percent > 0.0 && free_percent <= 1.0) {
cache_->set_freemem_percent(free_percent);
} else {
SERVER_LOG_ERROR << "Invalid cache_free_percent: " << free_percent <<
", defaultly set to " << cache_->freemem_percent();
}
}
}
......
......@@ -20,6 +20,11 @@ public:
: index_(index)
{}
DataObj(const engine::Index_ptr& index, int64_t size)
: index_(index),
size_(size)
{}
engine::Index_ptr data() { return index_; }
const engine::Index_ptr& data() const { return index_; }
......@@ -28,11 +33,16 @@ public:
return 0;
}
if(size_ > 0) {
return size_;
}
return index_->Count() * index_->Dimension() * sizeof(float);
}
private:
engine::Index_ptr index_ = nullptr;
int64_t size_ = 0;
};
using DataObjPtr = std::shared_ptr<DataObj>;
......
......@@ -11,10 +11,14 @@ namespace zilliz {
namespace milvus {
namespace cache {
namespace {
constexpr int64_t unit = 1024 * 1024 * 1024;
}
GpuCacheMgr::GpuCacheMgr() {
server::ConfigNode& config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_CACHE);
int64_t cap = config.GetInt64Value(server::CONFIG_GPU_CACHE_CAPACITY, 1);
cap *= 1024*1024*1024;
cap *= unit;
cache_ = std::make_shared<Cache>(cap, 1UL<<32);
}
......
......@@ -94,7 +94,7 @@ double
ConfigNode::GetDoubleValue(const std::string &param_key, double default_val) const {
std::string val = GetValue(param_key);
if (!val.empty()) {
return std::strtold(val.c_str(), nullptr);
return std::strtod(val.c_str(), nullptr);
} else {
return default_val;
}
......
......@@ -9,14 +9,14 @@ namespace zilliz {
namespace milvus {
namespace engine {
const size_t K = 1024UL;
const size_t M = K * K;
const size_t G = K * M;
const size_t T = K * G;
constexpr size_t K = 1024UL;
constexpr size_t M = K * K;
constexpr size_t G = K * M;
constexpr size_t T = K * G;
const size_t MAX_TABLE_FILE_MEM = 128 * M;
constexpr size_t MAX_TABLE_FILE_MEM = 128 * M;
const int VECTOR_TYPE_SIZE = sizeof(float);
constexpr int VECTOR_TYPE_SIZE = sizeof(float);
} // namespace engine
} // namespace milvus
......
......@@ -12,11 +12,10 @@ namespace zilliz {
namespace milvus {
namespace engine {
DB::~DB() {}
DB::~DB() = default;
void DB::Open(const Options& options, DB** dbptr) {
*dbptr = DBFactory::Build(options);
return;
}
} // namespace engine
......
......@@ -44,13 +44,15 @@ public:
virtual Status Size(uint64_t& result) = 0;
virtual Status BuildIndex(const std::string& table_id) = 0;
virtual Status DropAll() = 0;
DB() = default;
DB(const DB&) = delete;
DB& operator=(const DB&) = delete;
virtual ~DB();
virtual ~DB() = 0;
}; // DB
} // namespace engine
......
......@@ -89,8 +89,11 @@ DBImpl::DBImpl(const Options& options)
meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
if (options.mode != Options::MODE::READ_ONLY) {
ENGINE_LOG_TRACE << "StartTimerTasks";
StartTimerTasks();
}
}
Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
......@@ -99,6 +102,7 @@ Status DBImpl::CreateTable(meta::TableSchema& table_schema) {
Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& dates) {
//dates partly delete files of the table but currently we don't support
ENGINE_LOG_DEBUG << "Prepare to delete table " << table_id;
mem_mgr_->EraseMemVector(table_id); //not allow insert
meta_ptr_->DeleteTable(table_id); //soft delete table
......@@ -129,6 +133,7 @@ Status DBImpl::GetTableRowCount(const std::string& table_id, uint64_t& row_count
Status DBImpl::InsertVectors(const std::string& table_id_,
uint64_t n, const float* vectors, IDNumbers& vector_ids_) {
ENGINE_LOG_DEBUG << "Insert " << n << " vectors to cache";
auto start_time = METRICS_NOW_TIME;
Status status = mem_mgr_->InsertVectors(table_id_, n, vectors, vector_ids_);
......@@ -137,6 +142,8 @@ Status DBImpl::InsertVectors(const std::string& table_id_,
// std::chrono::microseconds time_span = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
// double average_time = double(time_span.count()) / n;
ENGINE_LOG_DEBUG << "Insert vectors to cache finished";
CollectInsertMetrics(total_time, n, status.ok());
return status;
......@@ -157,6 +164,8 @@ Status DBImpl::Query(const std::string &table_id, uint64_t k, uint64_t nq,
Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
const float* vectors, const meta::DatesT& dates, QueryResults& results) {
ENGINE_LOG_DEBUG << "Query by vectors";
//get all table files from table
meta::DatePartionedTableFilesSchema files;
auto status = meta_ptr_->FilesToSearch(table_id, dates, files);
......@@ -169,12 +178,17 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
}
}
return QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
status = QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
return status;
}
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
uint64_t k, uint64_t nq, const float* vectors,
const meta::DatesT& dates, QueryResults& results) {
ENGINE_LOG_DEBUG << "Query by file ids";
//get specified files
std::vector<size_t> ids;
for (auto &id : file_ids) {
......@@ -194,15 +208,19 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
return Status::Error("Invalid file id");
}
return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
status = QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
return status;
}
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
uint64_t k, uint64_t nq, const float* vectors,
const meta::DatesT& dates, QueryResults& results) {
server::TimeRecorder rc("");
//step 1: get files to search
ENGINE_LOG_DEBUG << "Search DateT Size=" << files.size();
ENGINE_LOG_DEBUG << "Engine query begin, index file count:" << files.size() << " date range count:" << dates.size();
SearchContextPtr context = std::make_shared<SearchContext>(k, nq, vectors);
for (auto &file : files) {
TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
......@@ -215,8 +233,31 @@ Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSch
context->WaitResult();
//step 3: construct results
//step 3: print time cost information
double load_cost = context->LoadCost();
double search_cost = context->SearchCost();
double reduce_cost = context->ReduceCost();
std::string load_info = server::TimeRecorder::GetTimeSpanStr(load_cost);
std::string search_info = server::TimeRecorder::GetTimeSpanStr(search_cost);
std::string reduce_info = server::TimeRecorder::GetTimeSpanStr(reduce_cost);
if(search_cost > 0.0 || reduce_cost > 0.0) {
double total_cost = load_cost + search_cost + reduce_cost;
double load_percent = load_cost/total_cost;
double search_percent = search_cost/total_cost;
double reduce_percent = reduce_cost/total_cost;
ENGINE_LOG_DEBUG << "Engine load index totally cost:" << load_info << " percent: " << load_percent*100 << "%";
ENGINE_LOG_DEBUG << "Engine search index totally cost:" << search_info << " percent: " << search_percent*100 << "%";
ENGINE_LOG_DEBUG << "Engine reduce topk totally cost:" << reduce_info << " percent: " << reduce_percent*100 << "%";
} else {
ENGINE_LOG_DEBUG << "Engine load cost:" << load_info
<< " search cost: " << search_info
<< " reduce cost: " << reduce_info;
}
//step 4: construct results
results = context->GetResult();
rc.ElapseFromBegin("Engine query totally cost");
return Status::OK();
}
......@@ -236,6 +277,8 @@ void DBImpl::BackgroundTimerTask() {
for(auto& iter : index_thread_results_) {
iter.wait();
}
ENGINE_LOG_DEBUG << "DB background thread exit";
break;
}
......@@ -254,6 +297,8 @@ void DBImpl::StartMetricTask() {
return;
}
ENGINE_LOG_TRACE << "Start metric task";
server::Metrics::GetInstance().KeepingAliveCounterIncrement(METRIC_ACTION_INTERVAL);
int64_t cache_usage = cache::CpuCacheMgr::GetInstance()->CacheUsage();
int64_t cache_total = cache::CpuCacheMgr::GetInstance()->CacheCapacity();
......@@ -266,17 +311,14 @@ void DBImpl::StartMetricTask() {
server::Metrics::GetInstance().GPUPercentGaugeSet();
server::Metrics::GetInstance().GPUMemoryUsageGaugeSet();
server::Metrics::GetInstance().OctetsSet();
ENGINE_LOG_TRACE << "Metric task finished";
}
void DBImpl::StartCompactionTask() {
// static int count = 0;
// count++;
// std::cout << "StartCompactionTask: " << count << std::endl;
// std::cout << "c: " << count++ << std::endl;
static uint64_t compact_clock_tick = 0;
compact_clock_tick++;
if(compact_clock_tick%COMPACT_ACTION_INTERVAL != 0) {
// std::cout << "c r: " << count++ << std::endl;
return;
}
......@@ -287,6 +329,10 @@ void DBImpl::StartCompactionTask() {
compact_table_ids_.insert(id);
}
if(!temp_table_ids.empty()) {
SERVER_LOG_DEBUG << "Insert cache serialized";
}
//compactiong has been finished?
if(!compact_thread_results_.empty()) {
std::chrono::milliseconds span(10);
......@@ -305,13 +351,15 @@ void DBImpl::StartCompactionTask() {
Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
const meta::TableFilesSchema& files) {
ENGINE_LOG_DEBUG << "Merge files for table" << table_id;
meta::TableFileSchema table_file;
table_file.table_id_ = table_id;
table_file.date_ = date;
Status status = meta_ptr_->CreateTableFile(table_file);
if (!status.ok()) {
ENGINE_LOG_INFO << status.ToString() << std::endl;
ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
return status;
}
......@@ -350,10 +398,11 @@ Status DBImpl::MergeFiles(const std::string& table_id, const meta::DateT& date,
updated.push_back(table_file);
status = meta_ptr_->UpdateTableFiles(updated);
ENGINE_LOG_DEBUG << "New merged file " << table_file.file_id_ <<
" of size=" << index->PhysicalSize()/(1024*1024) << " M";
" of size " << index->PhysicalSize() << " bytes";
//current disable this line to avoid memory
//index->Cache();
if(options_.insert_cache_immediately_) {
index->Cache();
}
return status;
}
......@@ -362,6 +411,7 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
meta::DatePartionedTableFilesSchema raw_files;
auto status = meta_ptr_->FilesToMerge(table_id, raw_files);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to get merge files for table: " << table_id;
return status;
}
......@@ -383,16 +433,14 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
}
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
// static int b_count = 0;
// b_count++;
// std::cout << "BackgroundCompaction: " << b_count << std::endl;
ENGINE_LOG_TRACE << " Background compaction thread start";
Status status;
for (auto& table_id : table_ids) {
status = BackgroundMergeFiles(table_id);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
return;
continue;//let other table get chance to merge
}
}
......@@ -401,15 +449,16 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
int ttl = 1;
if (options_.mode == Options::MODE::CLUSTER) {
ttl = meta::D_SEC;
// ENGINE_LOG_DEBUG << "Server mode is cluster. Clean up files with ttl = " << std::to_string(ttl) << "seconds.";
}
meta_ptr_->CleanUpFilesWithTTL(ttl);
ENGINE_LOG_TRACE << " Background compaction thread exit";
}
void DBImpl::StartBuildIndexTask() {
void DBImpl::StartBuildIndexTask(bool force) {
static uint64_t index_clock_tick = 0;
index_clock_tick++;
if(index_clock_tick%INDEX_ACTION_INTERVAL != 0) {
if(!force && (index_clock_tick%INDEX_ACTION_INTERVAL != 0)) {
return;
}
......@@ -428,22 +477,42 @@ void DBImpl::StartBuildIndexTask() {
}
}
Status DBImpl::BuildIndex(const std::string& table_id) {
bool has = false;
meta_ptr_->HasNonIndexFiles(table_id, has);
int times = 1;
while (has) {
ENGINE_LOG_DEBUG << "Non index files detected! Will build index " << times;
meta_ptr_->UpdateTableFilesToIndex(table_id);
/* StartBuildIndexTask(true); */
std::this_thread::sleep_for(std::chrono::milliseconds(std::min(10*1000, times*100)));
meta_ptr_->HasNonIndexFiles(table_id, has);
times++;
}
return Status::OK();
/* return BuildIndexByTable(table_id); */
}
Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
ExecutionEnginePtr to_index = EngineFactory::Build(file.dimension_, file.location_, (EngineType)file.engine_type_);
if(to_index == nullptr) {
ENGINE_LOG_ERROR << "Invalid engine type";
return Status::Error("Invalid engine type");
}
try {
//step 1: load index
to_index->Load();
to_index->Load(options_.insert_cache_immediately_);
//step 2: create table file
meta::TableFileSchema table_file;
table_file.table_id_ = file.table_id_;
table_file.date_ = file.date_;
table_file.file_type_ = meta::TableFileSchema::INDEX; //for multi-db-path, distribute index file averagely to each path
Status status = meta_ptr_->CreateTableFile(table_file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Failed to create table: " << status.ToString();
return status;
}
......@@ -476,25 +545,49 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
meta_ptr_->UpdateTableFiles(update_files);
ENGINE_LOG_DEBUG << "New index file " << table_file.file_id_ << " of size "
<< index->PhysicalSize()/(1024*1024) << " M"
<< index->PhysicalSize() << " bytes"
<< " from file " << to_remove.file_id_;
//current disable this line to avoid memory
//index->Cache();
if(options_.insert_cache_immediately_) {
index->Cache();
}
} catch (std::exception& ex) {
return Status::Error("Build index encounter exception", ex.what());
std::string msg = "Build index encounter exception" + std::string(ex.what());
ENGINE_LOG_ERROR << msg;
return Status::Error(msg);
}
return Status::OK();
}
Status DBImpl::BuildIndexByTable(const std::string& table_id) {
std::unique_lock<std::mutex> lock(build_index_mutex_);
meta::TableFilesSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
Status status;
for (auto& file : to_index_files) {
status = BuildIndex(file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
return status;
}
ENGINE_LOG_DEBUG << "Sync building index for " << file.id_ << " passed";
}
return status;
}
void DBImpl::BackgroundBuildIndex() {
ENGINE_LOG_TRACE << " Background build index thread start";
std::unique_lock<std::mutex> lock(build_index_mutex_);
meta::TableFilesSchema to_index_files;
meta_ptr_->FilesToIndex(to_index_files);
Status status;
for (auto& file : to_index_files) {
/* ENGINE_LOG_DEBUG << "Buiding index for " << file.location; */
status = BuildIndex(file);
if (!status.ok()) {
ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
......@@ -505,7 +598,8 @@ void DBImpl::BackgroundBuildIndex() {
break;
}
}
/* ENGINE_LOG_DEBUG << "All Buiding index Done"; */
ENGINE_LOG_TRACE << " Background build index thread exit";
}
Status DBImpl::DropAll() {
......
......@@ -82,6 +82,8 @@ class DBImpl : public DB {
Status Size(uint64_t &result) override;
Status BuildIndex(const std::string& table_id) override;
~DBImpl() override;
private:
......@@ -107,9 +109,11 @@ class DBImpl : public DB {
Status BackgroundMergeFiles(const std::string &table_id);
void BackgroundCompaction(std::set<std::string> table_ids);
void StartBuildIndexTask();
void StartBuildIndexTask(bool force=false);
void BackgroundBuildIndex();
Status
BuildIndexByTable(const std::string& table_id);
Status
BuildIndex(const meta::TableFileSchema &);
......@@ -130,6 +134,8 @@ class DBImpl : public DB {
server::ThreadPool index_thread_pool_;
std::list<std::future<void>> index_thread_results_;
std::mutex build_index_mutex_;
}; // DBImpl
......
......@@ -83,26 +83,6 @@ using ConnectorT = decltype(StoragePrototype(""));
static std::unique_ptr<ConnectorT> ConnectorPtr;
using ConditionT = decltype(c(&TableFileSchema::id_) == 1UL);
std::string DBMetaImpl::GetTablePath(const std::string &table_id) {
return options_.path + "/tables/" + table_id;
}
std::string DBMetaImpl::GetTableDatePartitionPath(const std::string &table_id, DateT &date) {
std::stringstream ss;
ss << GetTablePath(table_id) << "/" << date;
return ss.str();
}
void DBMetaImpl::GetTableFilePath(TableFileSchema &group_file) {
if (group_file.date_ == EmptyDate) {
group_file.date_ = Meta::GetDate();
}
std::stringstream ss;
ss << GetTableDatePartitionPath(group_file.table_id_, group_file.date_)
<< "/" << group_file.file_id_;
group_file.location_ = ss.str();
}
Status DBMetaImpl::NextTableId(std::string &table_id) {
std::stringstream ss;
SimpleIDGenerator g;
......@@ -196,7 +176,8 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
if(TableSchema::TO_DELETE == std::get<0>(table[0])) {
return Status::Error("Table already exists and it is in delete state, please wait a second");
} else {
return Status::OK();//table already exists, no error
// Change from no error to already exist.
return Status::AlreadyExist("Table already exists");
}
}
}
......@@ -212,15 +193,7 @@ Status DBMetaImpl::CreateTable(TableSchema &table_schema) {
return Status::DBTransactionError("Add Table Error");
}
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
if (!boost::filesystem::is_directory(table_path)) {
auto ret = boost::filesystem::create_directories(table_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
}
return utils::CreateTablePath(options_, table_schema.table_id_);
} catch (std::exception &e) {
return HandleException("Encounter exception when create table", e);
......@@ -306,9 +279,6 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
return Status::NotFound("Table " + table_schema.table_id_ + " not found");
}
auto table_path = GetTablePath(table_schema.table_id_);
table_schema.location_ = table_path;
} catch (std::exception &e) {
return HandleException("Encounter exception when describe table", e);
}
......@@ -316,6 +286,30 @@ Status DBMetaImpl::DescribeTable(TableSchema &table_schema) {
return Status::OK();
}
Status DBMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) {
has = false;
try {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_),
where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
or
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW
or
c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX)
and c(&TableFileSchema::table_id_) == table_id
));
if (selected.size() >= 1) {
has = true;
} else {
has = false;
}
} catch (std::exception &e) {
return HandleException("Encounter exception when check non index files", e);
}
return Status::OK();
}
Status DBMetaImpl::HasTable(const std::string &table_id, bool &has_or_not) {
has_or_not = false;
......@@ -388,21 +382,11 @@ Status DBMetaImpl::CreateTableFile(TableFileSchema &file_schema) {
file_schema.created_on_ = utils::GetMicroSecTimeStamp();
file_schema.updated_time_ = file_schema.created_on_;
file_schema.engine_type_ = table_schema.engine_type_;
ENGINE_LOG_DEBUG << "CreateTableFile EngineTypee: " << table_schema.engine_type_;
GetTableFilePath(file_schema);
auto id = ConnectorPtr->insert(file_schema);
file_schema.id_ = id;
auto partition_path = GetTableDatePartitionPath(file_schema.table_id_, file_schema.date_);
if (!boost::filesystem::is_directory(partition_path)) {
auto ret = boost::filesystem::create_directory(partition_path);
if (!ret) {
ENGINE_LOG_ERROR << "Create directory " << partition_path << " Error";
return Status::DBTransactionError("Failed to create partition directory");
}
}
return utils::CreateTableFilePath(options_, file_schema);
} catch (std::exception& ex) {
return HandleException("Encounter exception when create table file", ex);
......@@ -439,7 +423,7 @@ Status DBMetaImpl::FilesToIndex(TableFilesSchema &files) {
table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file);
GetTableFilePath(table_file);
utils::GetTableFilePath(options_, table_file);
auto groupItr = groups.find(table_file.table_id_);
if (groupItr == groups.end()) {
TableSchema table_schema;
......@@ -502,7 +486,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file);
table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file);
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
......@@ -544,7 +528,7 @@ Status DBMetaImpl::FilesToSearch(const std::string &table_id,
table_file.date_ = std::get<5>(file);
table_file.engine_type_ = std::get<6>(file);
table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file);
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
......@@ -594,7 +578,7 @@ Status DBMetaImpl::FilesToMerge(const std::string &table_id,
table_file.size_ = std::get<4>(file);
table_file.date_ = std::get<5>(file);
table_file.dimension_ = table_schema.dimension_;
GetTableFilePath(table_file);
utils::GetTableFilePath(options_, table_file);
auto dateItr = files.find(table_file.date_);
if (dateItr == files.end()) {
files[table_file.date_] = TableFilesSchema();
......@@ -640,7 +624,7 @@ Status DBMetaImpl::GetTableFiles(const std::string& table_id,
file_schema.date_ = std::get<4>(file);
file_schema.engine_type_ = std::get<5>(file);
file_schema.dimension_ = table_schema.dimension_;
GetTableFilePath(file_schema);
utils::GetTableFilePath(options_, file_schema);
table_files.emplace_back(file_schema);
}
......@@ -692,16 +676,22 @@ Status DBMetaImpl::Archive() {
Status DBMetaImpl::Size(uint64_t &result) {
result = 0;
try {
auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size_)),
where(
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
));
auto files = ConnectorPtr->select(columns(&TableFileSchema::size_,
&TableFileSchema::file_type_,
&TableFileSchema::engine_type_),
where(
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
));
for (auto &sub_query : selected) {
if (!std::get<0>(sub_query)) {
continue;
for (auto &file : files) {
auto file_size = std::get<0>(file);
auto file_type = std::get<1>(file);
auto engine_type = std::get<2>(file);
if(file_type == (int)TableFileSchema::INDEX && engine_type == (int)EngineType::FAISS_IVFSQ8) {
result += (uint64_t)file_size/4;//hardcode for sq8
} else {
result += (uint64_t)file_size;
}
result += (uint64_t) (*std::get<0>(sub_query));
}
} catch (std::exception &e) {
return HandleException("Encounter exception when calculte db size", e);
......@@ -792,6 +782,23 @@ Status DBMetaImpl::UpdateTableFile(TableFileSchema &file_schema) {
return Status::OK();
}
Status DBMetaImpl::UpdateTableFilesToIndex(const std::string& table_id) {
try {
ConnectorPtr->update_all(
set(
c(&TableFileSchema::file_type_) = (int) TableFileSchema::TO_INDEX
),
where(
c(&TableFileSchema::table_id_) == table_id and
c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
));
} catch (std::exception &e) {
return HandleException("Encounter exception when update table files to to_index", e);
}
return Status::OK();
}
Status DBMetaImpl::UpdateTableFiles(TableFilesSchema &files) {
try {
MetricCollector metric;
......@@ -856,10 +863,9 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
table_file.table_id_ = std::get<1>(file);
table_file.file_id_ = std::get<2>(file);
table_file.date_ = std::get<3>(file);
GetTableFilePath(table_file);
ENGINE_LOG_DEBUG << "Removing deleted id =" << table_file.id_ << " location = " << table_file.location_ << std::endl;
boost::filesystem::remove(table_file.location_);
utils::DeleteTableFilePath(options_, table_file);
ENGINE_LOG_DEBUG << "Removing file id:" << table_file.id_ << " location:" << table_file.location_;
ConnectorPtr->remove<TableFileSchema>(table_file.id_);
}
......@@ -883,10 +889,7 @@ Status DBMetaImpl::CleanUpFilesWithTTL(uint16_t seconds) {
auto commited = ConnectorPtr->transaction([&]() mutable {
for (auto &table : tables) {
auto table_path = GetTablePath(std::get<1>(table));
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
boost::filesystem::remove_all(table_path);
utils::DeleteTablePath(options_, std::get<1>(table));
ConnectorPtr->remove<TableSchema>(std::get<0>(table));
}
......
......@@ -8,67 +8,89 @@
#include "Meta.h"
#include "Options.h"
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {
auto StoragePrototype(const std::string& path);
auto StoragePrototype(const std::string &path);
class DBMetaImpl : public Meta {
public:
DBMetaImpl(const DBMetaOptions& options_);
public:
explicit DBMetaImpl(const DBMetaOptions &options_);
Status
CreateTable(TableSchema &table_schema) override;
Status
DescribeTable(TableSchema &group_info_) override;
Status
HasTable(const std::string &table_id, bool &has_or_not) override;
Status
AllTables(std::vector<TableSchema> &table_schema_array) override;
Status
DeleteTable(const std::string &table_id) override;
Status
DeleteTableFiles(const std::string &table_id) override;
Status
CreateTableFile(TableFileSchema &file_schema) override;
virtual Status CreateTable(TableSchema& table_schema) override;
virtual Status DescribeTable(TableSchema& group_info_) override;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) override;
virtual Status AllTables(std::vector<TableSchema>& table_schema_array) override;
Status
DropPartitionsByDates(const std::string &table_id, const DatesT &dates) override;
virtual Status DeleteTable(const std::string& table_id) override;
virtual Status DeleteTableFiles(const std::string& table_id) override;
Status
GetTableFiles(const std::string &table_id, const std::vector<size_t> &ids, TableFilesSchema &table_files) override;
virtual Status CreateTableFile(TableFileSchema& file_schema) override;
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) override;
Status
HasNonIndexFiles(const std::string &table_id, bool &has) override;
virtual Status GetTableFiles(const std::string& table_id,
const std::vector<size_t>& ids,
TableFilesSchema& table_files) override;
Status
UpdateTableFilesToIndex(const std::string &table_id) override;
virtual Status UpdateTableFile(TableFileSchema& file_schema) override;
Status
UpdateTableFile(TableFileSchema &file_schema) override;
virtual Status UpdateTableFiles(TableFilesSchema& files) override;
Status
UpdateTableFiles(TableFilesSchema &files) override;
virtual Status FilesToSearch(const std::string& table_id,
const DatesT& partition,
DatePartionedTableFilesSchema& files) override;
Status
FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) override;
virtual Status FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) override;
Status
FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) override;
virtual Status FilesToIndex(TableFilesSchema&) override;
Status
FilesToIndex(TableFilesSchema &) override;
virtual Status Archive() override;
Status
Archive() override;
virtual Status Size(uint64_t& result) override;
Status
Size(uint64_t &result) override;
virtual Status CleanUp() override;
Status
CleanUp() override;
virtual Status CleanUpFilesWithTTL(uint16_t seconds) override;
Status
CleanUpFilesWithTTL(uint16_t seconds) override;
virtual Status DropAll() override;
Status
DropAll() override;
virtual Status Count(const std::string& table_id, uint64_t& result) override;
Status Count(const std::string &table_id, uint64_t &result) override;
virtual ~DBMetaImpl();
~DBMetaImpl() override;
private:
Status NextFileId(std::string& file_id);
Status NextTableId(std::string& table_id);
private:
Status NextFileId(std::string &file_id);
Status NextTableId(std::string &table_id);
Status DiscardFiles(long to_discard_size);
std::string GetTablePath(const std::string& table_id);
std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date);
void GetTableFilePath(TableFileSchema& group_file);
Status Initialize();
const DBMetaOptions options_;
......
......@@ -23,13 +23,19 @@ EngineFactory::Build(uint16_t dimension,
switch (type) {
case EngineType::FAISS_IDMAP: {
execution_engine_ptr =
ExecutionEnginePtr(new FaissExecutionEngine(dimension, location, "IDMap", "IDMap,Flat"));
ExecutionEnginePtr(new FaissExecutionEngine(dimension, location, BUILD_INDEX_TYPE_IDMAP, "IDMap,Flat"));
break;
}
case EngineType::FAISS_IVFFLAT_GPU: {
execution_engine_ptr =
ExecutionEnginePtr(new FaissExecutionEngine(dimension, location, "IVF", "IDMap,Flat"));
ExecutionEnginePtr(new FaissExecutionEngine(dimension, location, BUILD_INDEX_TYPE_IVF, "IDMap,Flat"));
break;
}
case EngineType::FAISS_IVFSQ8: {
execution_engine_ptr =
ExecutionEnginePtr(new FaissExecutionEngine(dimension, location, BUILD_INDEX_TYPE_IVFSQ8, "IDMap,Flat"));
break;
}
......
......@@ -17,9 +17,9 @@ namespace engine {
enum class EngineType {
INVALID = 0,
FAISS_IDMAP = 1,
FAISS_IVFFLAT_GPU,
FAISS_IVFFLAT_CPU,
SPTAG_KDT_RNT_CPU,
FAISS_IVFFLAT,
FAISS_IVFSQ8,
MAX_VALUE = FAISS_IVFSQ8,
};
class ExecutionEngine {
......@@ -39,7 +39,7 @@ public:
virtual Status Serialize() = 0;
virtual Status Load() = 0;
virtual Status Load(bool to_cache = true) = 0;
virtual Status Merge(const std::string& location) = 0;
......
......@@ -8,6 +8,7 @@
#include <src/server/ServerConfig.h>
#include <src/metrics/Metrics.h>
#include "Log.h"
#include "utils/CommonUtil.h"
#include "src/cache/CpuCacheMgr.h"
#include "ExecutionEngineImpl.h"
......@@ -51,16 +52,12 @@ VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
index = GetVecIndexFactory(IndexType::FAISS_IDMAP);
break;
}
case EngineType::FAISS_IVFFLAT_GPU: {
case EngineType::FAISS_IVFFLAT: {
index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_MIX);
break;
}
case EngineType::FAISS_IVFFLAT_CPU: {
index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_CPU);
break;
}
case EngineType::SPTAG_KDT_RNT_CPU: {
index = GetVecIndexFactory(IndexType::SPTAG_KDT_RNT_CPU);
case EngineType::FAISS_IVFSQ8: {
index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_MIX);
break;
}
default: {
......@@ -92,7 +89,7 @@ size_t ExecutionEngineImpl::Dimension() const {
}
size_t ExecutionEngineImpl::PhysicalSize() const {
return (size_t) (Count() * Dimension()) * sizeof(float);
return server::CommonUtil::GetFileSize(location_);
}
Status ExecutionEngineImpl::Serialize() {
......@@ -103,14 +100,13 @@ Status ExecutionEngineImpl::Serialize() {
return Status::OK();
}
Status ExecutionEngineImpl::Load() {
Status ExecutionEngineImpl::Load(bool to_cache) {
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool to_cache = false;
bool already_in_cache = (index_ != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index_) {
try {
index_ = read_index(location_);
to_cache = true;
ENGINE_LOG_DEBUG << "Disk io from: " << location_;
} catch (knowhere::KnowhereException &e) {
ENGINE_LOG_ERROR << e.what();
......@@ -120,16 +116,16 @@ Status ExecutionEngineImpl::Load() {
}
}
if (to_cache) {
if (!already_in_cache && to_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
double total_size = Size();
double physical_size = PhysicalSize();
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(total_size);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(total_size / double(total_time));
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size);
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size / double(total_time));
}
return Status::OK();
}
......@@ -215,9 +211,8 @@ Status ExecutionEngineImpl::Init() {
gpu_num = server_config.GetInt32Value("gpu_index", 0);
switch (build_type) {
case EngineType::FAISS_IVFFLAT_GPU: {
}
case EngineType::FAISS_IVFFLAT_CPU: {
case EngineType::FAISS_IVFSQ8:
case EngineType::FAISS_IVFFLAT: {
ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE);
nprobe_ = engine_config.GetInt32Value(CONFIG_NPROBE, 1);
break;
......
......@@ -40,7 +40,7 @@ class ExecutionEngineImpl : public ExecutionEngine {
Status Serialize() override;
Status Load() override;
Status Load(bool to_cache) override;
Status Merge(const std::string &location) override;
......
......@@ -6,6 +6,7 @@
#if 0
#include "FaissExecutionEngine.h"
#include "Log.h"
#include "utils/CommonUtil.h"
#include <faiss/AutoTune.h>
#include <faiss/MetaIndexes.h>
......@@ -22,21 +23,52 @@ namespace zilliz {
namespace milvus {
namespace engine {
namespace {
std::string GetMetricType() {
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode engine_config = config.GetConfig(server::CONFIG_ENGINE);
return engine_config.GetValue(server::CONFIG_METRICTYPE, "L2");
}
}
std::string IndexStatsHelper::ToString(const std::string &prefix) const {
return "";
}
void IndexStatsHelper::Reset() const {
faiss::indexIVF_stats.reset();
}
std::string FaissIndexIVFStatsHelper::ToString(const std::string &prefix) const {
std::stringstream ss;
ss << prefix;
ss << identifier_ << ":";
ss << " NQ=" << faiss::indexIVF_stats.nq;
ss << " NL=" << faiss::indexIVF_stats.nlist;
ss << " ND=" << faiss::indexIVF_stats.ndis;
ss << " NH=" << faiss::indexIVF_stats.nheap_updates;
ss << " Q=" << faiss::indexIVF_stats.quantization_time;
ss << " S=" << faiss::indexIVF_stats.search_time;
return ss.str();
}
FaissExecutionEngine::FaissExecutionEngine(uint16_t dimension,
const std::string& location,
const std::string& build_index_type,
const std::string& raw_index_type)
: pIndex_(faiss::index_factory(dimension, raw_index_type.c_str())),
location_(location),
const std::string &location,
const std::string &build_index_type,
const std::string &raw_index_type)
: location_(location),
build_index_type_(build_index_type),
raw_index_type_(raw_index_type) {
std::string metric_type = GetMetricType();
faiss::MetricType faiss_metric_type = (metric_type == "L2") ? faiss::METRIC_L2 : faiss::METRIC_INNER_PRODUCT;
pIndex_.reset(faiss::index_factory(dimension, raw_index_type.c_str(), faiss_metric_type));
}
FaissExecutionEngine::FaissExecutionEngine(std::shared_ptr<faiss::Index> index,
const std::string& location,
const std::string& build_index_type,
const std::string& raw_index_type)
const std::string &location,
const std::string &build_index_type,
const std::string &raw_index_type)
: pIndex_(index),
location_(location),
build_index_type_(build_index_type),
......@@ -49,11 +81,11 @@ Status FaissExecutionEngine::AddWithIds(long n, const float *xdata, const long *
}
size_t FaissExecutionEngine::Count() const {
return (size_t)(pIndex_->ntotal);
return (size_t) (pIndex_->ntotal);
}
size_t FaissExecutionEngine::Size() const {
return (size_t)(Count() * pIndex_->d)*sizeof(float);
return (size_t) (Count() * pIndex_->d) * sizeof(float);
}
size_t FaissExecutionEngine::Dimension() const {
......@@ -61,7 +93,7 @@ size_t FaissExecutionEngine::Dimension() const {
}
size_t FaissExecutionEngine::PhysicalSize() const {
return (size_t)(Count() * pIndex_->d)*sizeof(float);
return server::CommonUtil::GetFileSize(location_);
}
Status FaissExecutionEngine::Serialize() {
......@@ -69,18 +101,17 @@ Status FaissExecutionEngine::Serialize() {
return Status::OK();
}
Status FaissExecutionEngine::Load() {
auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool to_cache = false;
Status FaissExecutionEngine::Load(bool to_cache) {
auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index != nullptr);
auto start_time = METRICS_NOW_TIME;
if (!index) {
index = read_index(location_);
to_cache = true;
ENGINE_LOG_DEBUG << "Disk io from: " << location_;
}
pIndex_ = index->data();
if (to_cache) {
if (!already_in_cache && to_cache) {
Cache();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
......@@ -88,44 +119,44 @@ Status FaissExecutionEngine::Load() {
server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
double total_size = (pIndex_->d) * (pIndex_->ntotal) * 4;
server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(total_size);
// server::Metrics::GetInstance().FaissDiskLoadIOSpeedHistogramObserve(total_size/double(total_time));
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(total_size/double(total_time));
server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(total_size / double(total_time));
}
return Status::OK();
}
Status FaissExecutionEngine::Merge(const std::string& location) {
Status FaissExecutionEngine::Merge(const std::string &location) {
if (location == location_) {
return Status::Error("Cannot Merge Self");
}
ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
ENGINE_LOG_DEBUG << "Merge raw file: " << location << " to: " << location_;
auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
if (!to_merge) {
to_merge = read_index(location);
}
auto file_index = dynamic_cast<faiss::IndexIDMap*>(to_merge->data().get());
pIndex_->add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat*>(file_index->index)->xb.data(),
file_index->id_map.data());
auto file_index = dynamic_cast<faiss::IndexIDMap *>(to_merge->data().get());
pIndex_->add_with_ids(file_index->ntotal, dynamic_cast<faiss::IndexFlat *>(file_index->index)->xb.data(),
file_index->id_map.data());
return Status::OK();
}
ExecutionEnginePtr
FaissExecutionEngine::BuildIndex(const std::string& location) {
FaissExecutionEngine::BuildIndex(const std::string &location) {
ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;
auto opd = std::make_shared<Operand>();
opd->d = pIndex_->d;
opd->index_type = build_index_type_;
opd->metric_type = GetMetricType();
IndexBuilderPtr pBuilder = GetIndexBuilder(opd);
auto from_index = dynamic_cast<faiss::IndexIDMap*>(pIndex_.get());
auto from_index = dynamic_cast<faiss::IndexIDMap *>(pIndex_.get());
auto index = pBuilder->build_all(from_index->ntotal,
dynamic_cast<faiss::IndexFlat*>(from_index->index)->xb.data(),
from_index->id_map.data());
dynamic_cast<faiss::IndexFlat *>(from_index->index)->xb.data(),
from_index->id_map.data());
ExecutionEnginePtr new_ee(new FaissExecutionEngine(index->data(), location, build_index_type_, raw_index_type_));
return new_ee;
......@@ -139,38 +170,44 @@ Status FaissExecutionEngine::Search(long n,
auto start_time = METRICS_NOW_TIME;
std::shared_ptr<faiss::IndexIVF> ivf_index = std::dynamic_pointer_cast<faiss::IndexIVF>(pIndex_);
if(ivf_index) {
ENGINE_LOG_DEBUG << "Index type: IVFFLAT nProbe: " << nprobe_;
if (ivf_index) {
std::string stats_prefix = "K=" + std::to_string(k) + ":";
ENGINE_LOG_DEBUG << "Searching index type: " << build_index_type_ << " nProbe: " << nprobe_;
ivf_index->nprobe = nprobe_;
ivf_stats_helper_.Reset();
ivf_index->search(n, data, k, distances, labels);
ENGINE_LOG_INFO << ivf_stats_helper_.ToString(stats_prefix);
} else {
ENGINE_LOG_DEBUG << "Searching raw file";
pIndex_->search(n, data, k, distances, labels);
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time,end_time);
server::Metrics::GetInstance().QueryIndexTypePerSecondSet(build_index_type_, double(n)/double(total_time));
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().QueryIndexTypePerSecondSet(build_index_type_, double(n) / double(total_time));
return Status::OK();
}
Status FaissExecutionEngine::Cache() {
zilliz::milvus::cache::CpuCacheMgr::GetInstance(
)->InsertItem(location_, std::make_shared<Index>(pIndex_));
auto index = std::make_shared<Index>(pIndex_);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(index, PhysicalSize());
zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, data_obj);
return Status::OK();
}
Status FaissExecutionEngine::Init() {
if(build_index_type_ == "IVF") {
if (build_index_type_ == BUILD_INDEX_TYPE_IVF ||
build_index_type_ == BUILD_INDEX_TYPE_IVFSQ8) {
using namespace zilliz::milvus::server;
ServerConfig &config = ServerConfig::GetInstance();
ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE);
nprobe_ = engine_config.GetInt32Value(CONFIG_NPROBE, 1000);
nlist_ = engine_config.GetInt32Value(CONFIG_NLIST, 16384);
} else if(build_index_type_ == "IDMap") {
;
} else if (build_index_type_ == BUILD_INDEX_TYPE_IDMAP) { ;
} else {
return Status::Error("Wrong index type: ", build_index_type_);
}
......
......@@ -12,23 +12,44 @@
#include <memory>
#include <string>
namespace zilliz {
namespace milvus {
namespace engine {
const static std::string BUILD_INDEX_TYPE_IDMAP = "IDMap";
const static std::string BUILD_INDEX_TYPE_IVF = "IVF";
const static std::string BUILD_INDEX_TYPE_IVFSQ8 = "IVFSQ8";
class IndexStatsHelper {
public:
using Ptr = std::shared_ptr<IndexStatsHelper>;
virtual std::string ToString(const std::string &prefix = "") const;
virtual void Reset() const;
virtual ~IndexStatsHelper() {}
};
class FaissIndexIVFStatsHelper : public IndexStatsHelper {
public:
std::string ToString(const std::string &prefix = "") const override;
private:
const std::string identifier_ = BUILD_INDEX_TYPE_IVF;
};
class FaissExecutionEngine : public ExecutionEngine {
public:
public:
FaissExecutionEngine(uint16_t dimension,
const std::string& location,
const std::string& build_index_type,
const std::string& raw_index_type);
const std::string &location,
const std::string &build_index_type,
const std::string &raw_index_type);
FaissExecutionEngine(std::shared_ptr<faiss::Index> index,
const std::string& location,
const std::string& build_index_type,
const std::string& raw_index_type);
const std::string &location,
const std::string &build_index_type,
const std::string &raw_index_type);
Status AddWithIds(long n, const float *xdata, const long *xids) override;
......@@ -42,9 +63,9 @@ public:
Status Serialize() override;
Status Load() override;
Status Load(bool to_cache) override;
Status Merge(const std::string& location) override;
Status Merge(const std::string &location) override;
Status Search(long n,
const float *data,
......@@ -52,13 +73,14 @@ public:
float *distances,
long *labels) const override;
ExecutionEnginePtr BuildIndex(const std::string&) override;
ExecutionEnginePtr BuildIndex(const std::string &) override;
Status Cache() override;
Status Init() override;
protected:
protected:
FaissIndexIVFStatsHelper ivf_stats_helper_;
std::shared_ptr<faiss::Index> pIndex_;
std::string location_;
......@@ -66,6 +88,7 @@ protected:
std::string raw_index_type_;
size_t nprobe_ = 0;
size_t nlist_ = 0;
};
......
......@@ -13,7 +13,9 @@ namespace zilliz {
namespace milvus {
namespace engine {
IDGenerator::~IDGenerator() {}
IDGenerator::~IDGenerator() = default;
constexpr size_t SimpleIDGenerator::MAX_IDS_PER_MICRO;
IDNumber SimpleIDGenerator::GetNextIDNumber() {
auto now = std::chrono::system_clock::now();
......
......@@ -10,28 +10,39 @@
#include <cstddef>
#include <vector>
namespace zilliz {
namespace milvus {
namespace engine {
class IDGenerator {
public:
virtual IDNumber GetNextIDNumber() = 0;
virtual void GetNextIDNumbers(size_t n, IDNumbers& ids) = 0;
public:
virtual
IDNumber GetNextIDNumber() = 0;
virtual ~IDGenerator();
virtual void
GetNextIDNumbers(size_t n, IDNumbers &ids) = 0;
virtual
~IDGenerator() = 0;
}; // IDGenerator
class SimpleIDGenerator : public IDGenerator {
public:
virtual IDNumber GetNextIDNumber() override;
virtual void GetNextIDNumbers(size_t n, IDNumbers& ids) override;
public:
~SimpleIDGenerator() override = default;
IDNumber
GetNextIDNumber() override;
void
GetNextIDNumbers(size_t n, IDNumbers &ids) override;
private:
void
NextIDNumbers(size_t n, IDNumbers &ids);
private:
void NextIDNumbers(size_t n, IDNumbers& ids);
const size_t MAX_IDS_PER_MICRO = 1000;
static constexpr size_t MAX_IDS_PER_MICRO = 1000;
}; // SimpleIDGenerator
......
......@@ -83,11 +83,12 @@ Status MemVectors::Serialize(std::string &table_id) {
auto status = meta_->UpdateTableFile(schema_);
LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << schema_.file_id_ << " of size " << (double) (active_engine_->Size()) / (double) meta::M
<< " M";
ENGINE_LOG_DEBUG << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << schema_.file_id_ << " of size " << active_engine_->Size() << " bytes";
active_engine_->Cache();
if(options_.insert_cache_immediately_) {
active_engine_->Cache();
}
return status;
}
......@@ -125,9 +126,6 @@ Status MemManager::InsertVectors(const std::string &table_id_,
const float *vectors_,
IDNumbers &vector_ids_) {
LOG(DEBUG) << "MemManager::InsertVectors: mutable mem = " << GetCurrentMutableMem() <<
", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem();
std::unique_lock<std::mutex> lock(mutex_);
return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
......
......@@ -18,6 +18,7 @@
#include <mutex>
namespace zilliz {
namespace milvus {
namespace engine {
......
......@@ -95,10 +95,12 @@ Status MemTableFile::Serialize() {
auto status = meta_->UpdateTableFile(table_file_schema_);
LOG(DEBUG) << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << table_file_schema_.file_id_ << " of size " << (double) size / (double) M << " M";
ENGINE_LOG_DEBUG << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << table_file_schema_.file_id_ << " of size " << size << " bytes";
execution_engine_->Cache();
if(options_.insert_cache_immediately_) {
execution_engine_->Cache();
}
return status;
}
......
......@@ -13,6 +13,8 @@ namespace milvus {
namespace engine {
namespace meta {
Meta::~Meta() = default;
DateT Meta::GetDate(const std::time_t& t, int day_delta) {
struct tm ltm;
localtime_r(&t, &ltm);
......
......@@ -20,52 +20,86 @@ namespace meta {
class Meta {
public:
public:
using Ptr = std::shared_ptr<Meta>;
virtual Status CreateTable(TableSchema& table_schema) = 0;
virtual Status DescribeTable(TableSchema& table_schema) = 0;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) = 0;
virtual Status AllTables(std::vector<TableSchema>& table_schema_array) = 0;
virtual
~Meta() = 0;
virtual Status DeleteTable(const std::string& table_id) = 0;
virtual Status DeleteTableFiles(const std::string& table_id) = 0;
virtual Status
CreateTable(TableSchema &table_schema) = 0;
virtual Status CreateTableFile(TableFileSchema& file_schema) = 0;
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) = 0;
virtual Status
DescribeTable(TableSchema &table_schema) = 0;
virtual Status GetTableFiles(const std::string& table_id,
const std::vector<size_t>& ids,
TableFilesSchema& table_files) = 0;
virtual Status
HasTable(const std::string &table_id, bool &has_or_not) = 0;
virtual Status UpdateTableFile(TableFileSchema& file_schema) = 0;
virtual Status
AllTables(std::vector<TableSchema> &table_schema_array) = 0;
virtual Status UpdateTableFiles(TableFilesSchema& files) = 0;
virtual Status
DeleteTable(const std::string &table_id) = 0;
virtual Status FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema& files) = 0;
virtual Status
DeleteTableFiles(const std::string &table_id) = 0;
virtual Status FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) = 0;
virtual Status
CreateTableFile(TableFileSchema &file_schema) = 0;
virtual Status Size(uint64_t& result) = 0;
virtual Status
DropPartitionsByDates(const std::string &table_id, const DatesT &dates) = 0;
virtual Status Archive() = 0;
virtual Status
GetTableFiles(const std::string &table_id, const std::vector<size_t> &ids, TableFilesSchema &table_files) = 0;
virtual Status FilesToIndex(TableFilesSchema&) = 0;
virtual Status
UpdateTableFilesToIndex(const std::string &table_id) = 0;
virtual Status CleanUp() = 0;
virtual Status CleanUpFilesWithTTL(uint16_t) = 0;
virtual Status
UpdateTableFile(TableFileSchema &file_schema) = 0;
virtual Status DropAll() = 0;
virtual Status
UpdateTableFiles(TableFilesSchema &files) = 0;
virtual Status Count(const std::string& table_id, uint64_t& result) = 0;
virtual Status
FilesToSearch(const std::string &table_id, const DatesT &partition, DatePartionedTableFilesSchema &files) = 0;
static DateT GetDate(const std::time_t& t, int day_delta = 0);
static DateT GetDate();
static DateT GetDateWithDelta(int day_delta);
virtual Status
FilesToMerge(const std::string &table_id, DatePartionedTableFilesSchema &files) = 0;
virtual Status
Size(uint64_t &result) = 0;
virtual Status
Archive() = 0;
virtual Status
FilesToIndex(TableFilesSchema &) = 0;
virtual Status
HasNonIndexFiles(const std::string &table_id, bool &has) = 0;
virtual Status
CleanUp() = 0;
virtual Status
CleanUpFilesWithTTL(uint16_t) = 0;
virtual Status
DropAll() = 0;
virtual Status
Count(const std::string &table_id, uint64_t &result) = 0;
static DateT
GetDate(const std::time_t &t, int day_delta = 0);
static DateT
GetDate();
static DateT
GetDateWithDelta(int day_delta);
}; // MetaData
......
......@@ -31,7 +31,6 @@ struct TableSchema {
int state_ = (int)NORMAL;
size_t files_cnt_ = 0;
uint16_t dimension_ = 0;
std::string location_;
long created_on_ = 0;
int engine_type_ = (int)EngineType::FAISS_IDMAP;
bool store_raw_data_ = false;
......
此差异已折叠。
......@@ -12,78 +12,80 @@
#include "mysql++/mysql++.h"
#include <mutex>
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {
// auto StoragePrototype(const std::string& path);
using namespace mysqlpp;
using namespace mysqlpp;
class MySQLMetaImpl : public Meta {
public:
MySQLMetaImpl(const DBMetaOptions &options_, const int &mode);
Status CreateTable(TableSchema &table_schema) override;
Status DescribeTable(TableSchema &group_info_) override;
Status HasTable(const std::string &table_id, bool &has_or_not) override;
Status AllTables(std::vector<TableSchema> &table_schema_array) override;
class MySQLMetaImpl : public Meta {
public:
MySQLMetaImpl(const DBMetaOptions& options_, const int& mode);
Status DeleteTable(const std::string &table_id) override;
Status DeleteTableFiles(const std::string &table_id) override;
virtual Status CreateTable(TableSchema& table_schema) override;
virtual Status DescribeTable(TableSchema& group_info_) override;
virtual Status HasTable(const std::string& table_id, bool& has_or_not) override;
virtual Status AllTables(std::vector<TableSchema>& table_schema_array) override;
Status CreateTableFile(TableFileSchema &file_schema) override;
Status DropPartitionsByDates(const std::string &table_id,
const DatesT &dates) override;
virtual Status DeleteTable(const std::string& table_id) override;
virtual Status DeleteTableFiles(const std::string& table_id) override;
Status GetTableFiles(const std::string &table_id,
const std::vector<size_t> &ids,
TableFilesSchema &table_files) override;
virtual Status CreateTableFile(TableFileSchema& file_schema) override;
virtual Status DropPartitionsByDates(const std::string& table_id,
const DatesT& dates) override;
Status HasNonIndexFiles(const std::string &table_id, bool &has) override;
virtual Status GetTableFiles(const std::string& table_id,
const std::vector<size_t>& ids,
TableFilesSchema& table_files) override;
Status UpdateTableFile(TableFileSchema &file_schema) override;
virtual Status UpdateTableFile(TableFileSchema& file_schema) override;
Status UpdateTableFilesToIndex(const std::string &table_id) override;
virtual Status UpdateTableFiles(TableFilesSchema& files) override;
Status UpdateTableFiles(TableFilesSchema &files) override;
virtual Status FilesToSearch(const std::string& table_id,
const DatesT& partition,
DatePartionedTableFilesSchema& files) override;
Status FilesToSearch(const std::string &table_id,
const DatesT &partition,
DatePartionedTableFilesSchema &files) override;
virtual Status FilesToMerge(const std::string& table_id,
DatePartionedTableFilesSchema& files) override;
Status FilesToMerge(const std::string &table_id,
DatePartionedTableFilesSchema &files) override;
virtual Status FilesToIndex(TableFilesSchema&) override;
Status FilesToIndex(TableFilesSchema &) override;
virtual Status Archive() override;
Status Archive() override;
virtual Status Size(uint64_t& result) override;
Status Size(uint64_t &result) override;
virtual Status CleanUp() override;
Status CleanUp() override;
virtual Status CleanUpFilesWithTTL(uint16_t seconds) override;
Status CleanUpFilesWithTTL(uint16_t seconds) override;
virtual Status DropAll() override;
Status DropAll() override;
virtual Status Count(const std::string& table_id, uint64_t& result) override;
Status Count(const std::string &table_id, uint64_t &result) override;
virtual ~MySQLMetaImpl();
virtual ~MySQLMetaImpl();
private:
Status NextFileId(std::string& file_id);
Status NextTableId(std::string& table_id);
Status DiscardFiles(long long to_discard_size);
std::string GetTablePath(const std::string& table_id);
std::string GetTableDatePartitionPath(const std::string& table_id, DateT& date);
void GetTableFilePath(TableFileSchema& group_file);
Status Initialize();
private:
Status NextFileId(std::string &file_id);
Status NextTableId(std::string &table_id);
Status DiscardFiles(long long to_discard_size);
Status Initialize();
const DBMetaOptions options_;
const int mode_;
const DBMetaOptions options_;
const int mode_;
std::shared_ptr<MySQLConnectionPool> mysql_connection_pool_;
bool safe_grab = false;
std::shared_ptr<MySQLConnectionPool> mysql_connection_pool_;
bool safe_grab = false;
// std::mutex connectionMutex_;
}; // DBMetaImpl
}; // DBMetaImpl
} // namespace meta
} // namespace engine
......
......@@ -25,13 +25,10 @@ Status NewMemManager::InsertVectors(const std::string &table_id_,
const float *vectors_,
IDNumbers &vector_ids_) {
while (GetCurrentMem() > options_.maximum_memory) {
while (GetCurrentMem() > options_.insert_buffer_size) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
LOG(DEBUG) << "NewMemManager::InsertVectors: mutable mem = " << GetCurrentMutableMem() <<
", immutable mem = " << GetCurrentImmutableMem() << ", total mem = " << GetCurrentMem();
std::unique_lock<std::mutex> lock(mutex_);
return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
......@@ -77,7 +74,6 @@ Status NewMemManager::Serialize(std::set<std::string> &table_ids) {
table_ids.insert(mem->GetTableId());
}
immu_mem_list_.clear();
return Status::OK();
}
......
......@@ -41,6 +41,10 @@ void ArchiveConf::ParseCritirias(const std::string& criterias) {
}
for (auto& token : tokens) {
if(token.empty()) {
continue;
}
std::vector<std::string> kv;
boost::algorithm::split(kv, token, boost::is_any_of(":"));
if (kv.size() != 2) {
......
......@@ -8,6 +8,7 @@
#include <string>
#include <memory>
#include <map>
#include <vector>
namespace zilliz {
namespace milvus {
......@@ -21,7 +22,7 @@ static constexpr uint64_t ONE_GB = ONE_KB*ONE_MB;
static const std::string ARCHIVE_CONF_DISK = "disk";
static const std::string ARCHIVE_CONF_DAYS = "days";
static const std::string ARCHIVE_CONF_DEFAULT = ARCHIVE_CONF_DISK + ":512";
static const std::string ARCHIVE_CONF_DEFAULT = "";
struct ArchiveConf {
using CriteriaT = std::map<std::string, int>;
......@@ -43,6 +44,7 @@ private:
struct DBMetaOptions {
std::string path;
std::vector<std::string> slave_paths;
std::string backend_uri;
ArchiveConf archive_conf = ArchiveConf("delete");
}; // DBMetaOptions
......@@ -61,7 +63,9 @@ struct Options {
size_t index_trigger_size = ONE_GB; //unit: byte
DBMetaOptions meta;
int mode = MODE::SINGLE;
float maximum_memory = 4 * ONE_GB;
size_t insert_buffer_size = 4 * ONE_GB;
bool insert_cache_immediately_ = false;
}; // Options
......
......@@ -4,14 +4,58 @@
* Proprietary and confidential.
******************************************************************************/
#include "Utils.h"
#include "utils/CommonUtil.h"
#include "Log.h"
#include <mutex>
#include <chrono>
#include <boost/filesystem.hpp>
namespace zilliz {
namespace milvus {
namespace engine {
namespace utils {
namespace {
static const std::string TABLES_FOLDER = "/tables/";
static uint64_t index_file_counter = 0;
static std::mutex index_file_counter_mutex;
std::string ConstructParentFolder(const std::string& db_path, const meta::TableFileSchema& table_file) {
std::string table_path = db_path + TABLES_FOLDER + table_file.table_id_;
std::string partition_path = table_path + "/" + std::to_string(table_file.date_);
return partition_path;
}
std::string GetTableFileParentFolder(const DBMetaOptions& options, const meta::TableFileSchema& table_file) {
uint64_t path_count = options.slave_paths.size() + 1;
std::string target_path = options.path;
uint64_t index = 0;
if(meta::TableFileSchema::INDEX == table_file.file_type_) {
// index file is large file and to be persisted permanently
// we need to distribute index files to each db_path averagely
// round robin according to a file counter
std::lock_guard<std::mutex> lock(index_file_counter_mutex);
index = index_file_counter % path_count;
index_file_counter++;
} else {
// for other type files, they could be merged or deleted
// so we round robin according to their file id
index = table_file.id_ % path_count;
}
if (index > 0) {
target_path = options.slave_paths[index - 1];
}
return ConstructParentFolder(target_path, table_file);
}
}
long GetMicroSecTimeStamp() {
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
......@@ -20,6 +64,82 @@ long GetMicroSecTimeStamp() {
return micros;
}
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id) {
std::string db_path = options.path;
std::string table_path = db_path + TABLES_FOLDER + table_id;
auto status = server::CommonUtil::CreateDirectory(table_path);
if (status != 0) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
for(auto& path : options.slave_paths) {
table_path = path + TABLES_FOLDER + table_id;
status = server::CommonUtil::CreateDirectory(table_path);
if (status != 0) {
ENGINE_LOG_ERROR << "Create directory " << table_path << " Error";
return Status::Error("Failed to create table path");
}
}
return Status::OK();
}
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id) {
std::string db_path = options.path;
std::string table_path = db_path + TABLES_FOLDER + table_id;
boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
for(auto& path : options.slave_paths) {
table_path = path + TABLES_FOLDER + table_id;
boost::filesystem::remove_all(table_path);
ENGINE_LOG_DEBUG << "Remove table folder: " << table_path;
}
return Status::OK();
}
Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
std::string parent_path = GetTableFileParentFolder(options, table_file);
auto status = server::CommonUtil::CreateDirectory(parent_path);
if (status != 0) {
ENGINE_LOG_ERROR << "Create directory " << parent_path << " Error";
return Status::DBTransactionError("Failed to create partition directory");
}
table_file.location_ = parent_path + "/" + table_file.file_id_;
return Status::OK();
}
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
std::string parent_path = ConstructParentFolder(options.path, table_file);
std::string file_path = parent_path + "/" + table_file.file_id_;
if(boost::filesystem::exists(file_path)) {
table_file.location_ = file_path;
return Status::OK();
} else {
for(auto& path : options.slave_paths) {
parent_path = ConstructParentFolder(path, table_file);
file_path = parent_path + "/" + table_file.file_id_;
if(boost::filesystem::exists(file_path)) {
table_file.location_ = file_path;
return Status::OK();
}
}
}
return Status::Error("Table file doesn't exist: " + table_file.file_id_);
}
Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file) {
utils::GetTableFilePath(options, table_file);
boost::filesystem::remove(table_file.location_);
return Status::OK();
}
} // namespace utils
} // namespace engine
} // namespace milvus
......
......@@ -5,6 +5,10 @@
******************************************************************************/
#pragma once
#include "Options.h"
#include "MetaTypes.h"
#include <string>
namespace zilliz {
namespace milvus {
......@@ -13,6 +17,13 @@ namespace utils {
long GetMicroSecTimeStamp();
Status CreateTablePath(const DBMetaOptions& options, const std::string& table_id);
Status DeleteTablePath(const DBMetaOptions& options, const std::string& table_id);
Status CreateTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
Status GetTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
Status DeleteTableFilePath(const DBMetaOptions& options, meta::TableFileSchema& table_file);
} // namespace utils
} // namespace engine
} // namespace milvus
......
......@@ -20,6 +20,7 @@ class ReuseCacheIndexStrategy {
public:
bool Schedule(const SearchContextPtr &context, std::list<ScheduleTaskPtr>& task_list) {
if(context == nullptr) {
ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
return false;
}
......@@ -32,7 +33,7 @@ public:
IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(task);
if(index_files.find(loader->file_->id_) != index_files.end()){
ENGINE_LOG_INFO << "Append SearchContext to exist IndexLoaderContext";
ENGINE_LOG_DEBUG << "Append SearchContext to exist IndexLoaderContext";
index_files.erase(loader->file_->id_);
loader->search_contexts_.push_back(context);
}
......@@ -40,7 +41,7 @@ public:
//index_files still contains some index files, create new loader
for(auto& pair : index_files) {
ENGINE_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location_;
ENGINE_LOG_DEBUG << "Create new IndexLoaderContext for: " << pair.second->location_;
IndexLoadTaskPtr new_loader = std::make_shared<IndexLoadTask>();
new_loader->search_contexts_.push_back(context);
new_loader->file_ = pair.second;
......@@ -64,6 +65,7 @@ class DeleteTableStrategy {
public:
bool Schedule(const DeleteContextPtr &context, std::list<ScheduleTaskPtr> &task_list) {
if (context == nullptr) {
ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
return false;
}
......@@ -103,6 +105,7 @@ public:
bool TaskDispatchStrategy::Schedule(const ScheduleContextPtr &context_ptr,
std::list<zilliz::milvus::engine::ScheduleTaskPtr> &task_list) {
if(context_ptr == nullptr) {
ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
return false;
}
......
......@@ -31,6 +31,7 @@ TaskScheduler& TaskScheduler::GetInstance() {
bool
TaskScheduler::Start() {
if(!stopped_) {
SERVER_LOG_INFO << "Task Scheduler isn't started";
return true;
}
......@@ -47,6 +48,7 @@ TaskScheduler::Start() {
bool
TaskScheduler::Stop() {
if(stopped_) {
SERVER_LOG_INFO << "Task Scheduler already stopped";
return true;
}
......@@ -80,7 +82,7 @@ TaskScheduler::TaskDispatchWorker() {
ScheduleTaskPtr task_ptr = task_dispatch_queue_.Take();
if(task_ptr == nullptr) {
SERVER_LOG_INFO << "Stop db task dispatch thread";
break;//exit
return true;
}
//execute task
......@@ -98,8 +100,8 @@ TaskScheduler::TaskWorker() {
while(true) {
ScheduleTaskPtr task_ptr = task_queue_.Take();
if(task_ptr == nullptr) {
SERVER_LOG_INFO << "Stop db task thread";
break;//exit
SERVER_LOG_INFO << "Stop db task worker thread";
return true;
}
//execute task
......
......@@ -31,7 +31,7 @@ SearchContext::AddIndexFile(TableFileSchemaPtr& index_file) {
return false;
}
SERVER_LOG_INFO << "SearchContext " << identity_ << " add index file: " << index_file->id_;
SERVER_LOG_DEBUG << "SearchContext " << identity_ << " add index file: " << index_file->id_;
map_index_files_[index_file->id_] = index_file;
return true;
......@@ -42,7 +42,7 @@ SearchContext::IndexSearchDone(size_t index_id) {
std::unique_lock <std::mutex> lock(mtx_);
map_index_files_.erase(index_id);
done_cond_.notify_all();
SERVER_LOG_INFO << "SearchContext " << identity_ << " finish index file: " << index_id;
SERVER_LOG_DEBUG << "SearchContext " << identity_ << " finish index file: " << index_id;
}
void
......
......@@ -37,9 +37,19 @@ public:
const ResultSet& GetResult() const { return result_; }
ResultSet& GetResult() { return result_; }
std::string Identity() const { return identity_; }
void IndexSearchDone(size_t index_id);
void WaitResult();
void AccumLoadCost(double span) { time_cost_load_ += span; }
void AccumSearchCost(double span) { time_cost_search_ += span; }
void AccumReduceCost(double span) { time_cost_reduce_ += span; }
double LoadCost() const { return time_cost_load_; }
double SearchCost() const { return time_cost_search_; }
double ReduceCost() const { return time_cost_reduce_; }
private:
uint64_t topk_ = 0;
uint64_t nq_ = 0;
......@@ -52,6 +62,10 @@ private:
std::condition_variable done_cond_;
std::string identity_; //for debug
double time_cost_load_ = 0.0; //time cost for load all index files, unit: us
double time_cost_search_ = 0.0; //time cost for entire search, unit: us
double time_cost_reduce_ = 0.0; //time cost for entire reduce, unit: us
};
using SearchContextPtr = std::shared_ptr<SearchContext>;
......
......@@ -41,20 +41,21 @@ IndexLoadTask::IndexLoadTask()
}
std::shared_ptr<IScheduleTask> IndexLoadTask::Execute() {
ENGINE_LOG_INFO << "Loading index(" << file_->id_ << ") from location: " << file_->location_;
server::TimeRecorder rc("Load index");
server::TimeRecorder rc("");
//step 1: load index
ExecutionEnginePtr index_ptr = EngineFactory::Build(file_->dimension_,
file_->location_,
(EngineType)file_->engine_type_);
index_ptr->Load();
rc.Record("load index file to memory");
size_t file_size = index_ptr->PhysicalSize();
LOG(DEBUG) << "Index file type " << file_->file_type_ << " Of Size: "
<< file_size/(1024*1024) << " M";
std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + std::to_string(file_->file_type_)
+ " size:" + std::to_string(file_size) + " bytes from location: " + file_->location_ + " totally cost";
double span = rc.ElapseFromBegin(info);
for(auto& context : search_contexts_) {
context->AccumLoadCost(span);
}
CollectFileMetrics(file_->file_type_, file_size);
......
......@@ -5,14 +5,60 @@
******************************************************************************/
#include "SearchTask.h"
#include "metrics/Metrics.h"
#include "utils/Log.h"
#include "db/Log.h"
#include "utils/TimeRecorder.h"
#include <thread>
namespace zilliz {
namespace milvus {
namespace engine {
namespace {
static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000;
static constexpr size_t PARALLEL_REDUCE_BATCH = 1000;
bool NeedParallelReduce(uint64_t nq, uint64_t topk) {
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& db_config = config.GetConfig(server::CONFIG_DB);
bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, true);
if(!need_parallel) {
return false;
}
return nq*topk >= PARALLEL_REDUCE_THRESHOLD;
}
void ParallelReduce(std::function<void(size_t, size_t)>& reduce_function, size_t max_index) {
size_t reduce_batch = PARALLEL_REDUCE_BATCH;
auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work
if(thread_count > 0) {
reduce_batch = max_index/thread_count + 1;
}
ENGINE_LOG_DEBUG << "use " << thread_count <<
" thread parallelly do reduce, each thread process " << reduce_batch << " vectors";
std::vector<std::shared_ptr<std::thread> > thread_array;
size_t from_index = 0;
while(from_index < max_index) {
size_t to_index = from_index + reduce_batch;
if(to_index > max_index) {
to_index = max_index;
}
auto reduce_thread = std::make_shared<std::thread>(reduce_function, from_index, to_index);
thread_array.push_back(reduce_thread);
from_index = to_index;
}
for(auto& thread_ptr : thread_array) {
thread_ptr->join();
}
}
void CollectDurationMetrics(int index_type, double total_time) {
switch(index_type) {
case meta::TableFileSchema::RAW: {
......@@ -30,11 +76,20 @@ void CollectDurationMetrics(int index_type, double total_time) {
}
}
std::string GetMetricType() {
server::ServerConfig &config = server::ServerConfig::GetInstance();
server::ConfigNode& engine_config = config.GetConfig(server::CONFIG_ENGINE);
return engine_config.GetValue(server::CONFIG_METRICTYPE, "L2");
}
}
SearchTask::SearchTask()
: IScheduleTask(ScheduleTaskType::kSearch) {
std::string metric_type = GetMetricType();
if(metric_type != "L2") {
metric_l2 = false;
}
}
std::shared_ptr<IScheduleTask> SearchTask::Execute() {
......@@ -42,10 +97,10 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
return nullptr;
}
SERVER_LOG_INFO << "Searching in index(" << index_id_<< ") with "
ENGINE_LOG_DEBUG << "Searching in file id:" << index_id_<< " with "
<< search_contexts_.size() << " tasks";
server::TimeRecorder rc("DoSearch index(" + std::to_string(index_id_) + ")");
server::TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
auto start_time = METRICS_NOW_TIME;
......@@ -62,20 +117,25 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
index_engine_->Search(context->nq(), context->vectors(), inner_k, output_distence.data(),
output_ids.data());
rc.Record("do search");
double span = rc.RecordSection("do search for context:" + context->Identity());
context->AccumSearchCost(span);
//step 3: cluster result
SearchContext::ResultSet result_set;
auto spec_k = index_engine_->Count() < context->topk() ? index_engine_->Count() : context->topk();
SearchTask::ClusterResult(output_ids, output_distence, context->nq(), spec_k, result_set);
rc.Record("cluster result");
span = rc.RecordSection("cluster result for context:" + context->Identity());
context->AccumReduceCost(span);
//step 4: pick up topk result
SearchTask::TopkResult(result_set, inner_k, context->GetResult());
rc.Record("reduce topk");
SearchTask::TopkResult(result_set, inner_k, metric_l2, context->GetResult());
span = rc.RecordSection("reduce topk for context:" + context->Identity());
context->AccumReduceCost(span);
} catch (std::exception& ex) {
SERVER_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
context->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed
continue;
}
......@@ -88,7 +148,7 @@ std::shared_ptr<IScheduleTask> SearchTask::Execute() {
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
CollectDurationMetrics(index_type_, total_time);
rc.Elapse("totally cost");
rc.ElapseFromBegin("totally cost");
return nullptr;
}
......@@ -98,26 +158,35 @@ Status SearchTask::ClusterResult(const std::vector<long> &output_ids,
uint64_t nq,
uint64_t topk,
SearchContext::ResultSet &result_set) {
if(output_ids.size() != nq*topk || output_distence.size() != nq*topk) {
if(output_ids.size() < nq*topk || output_distence.size() < nq*topk) {
std::string msg = "Invalid id array size: " + std::to_string(output_ids.size()) +
" distance array size: " + std::to_string(output_distence.size());
SERVER_LOG_ERROR << msg;
ENGINE_LOG_ERROR << msg;
return Status::Error(msg);
}
result_set.clear();
result_set.reserve(nq);
for (auto i = 0; i < nq; i++) {
SearchContext::Id2DistanceMap id_distance;
id_distance.reserve(topk);
for (auto k = 0; k < topk; k++) {
uint64_t index = i * topk + k;
if(output_ids[index] < 0) {
continue;
result_set.resize(nq);
std::function<void(size_t, size_t)> reduce_worker = [&](size_t from_index, size_t to_index) {
for (auto i = from_index; i < to_index; i++) {
SearchContext::Id2DistanceMap id_distance;
id_distance.reserve(topk);
for (auto k = 0; k < topk; k++) {
uint64_t index = i * topk + k;
if(output_ids[index] < 0) {
continue;
}
id_distance.push_back(std::make_pair(output_ids[index], output_distence[index]));
}
id_distance.push_back(std::make_pair(output_ids[index], output_distence[index]));
result_set[i] = id_distance;
}
result_set.emplace_back(id_distance);
};
if(NeedParallelReduce(nq, topk)) {
ParallelReduce(reduce_worker, nq);
} else {
reduce_worker(0, nq);
}
return Status::OK();
......@@ -125,10 +194,11 @@ Status SearchTask::ClusterResult(const std::vector<long> &output_ids,
Status SearchTask::MergeResult(SearchContext::Id2DistanceMap &distance_src,
SearchContext::Id2DistanceMap &distance_target,
uint64_t topk) {
uint64_t topk,
bool ascending) {
//Note: the score_src and score_target are already arranged by score in ascending order
if(distance_src.empty()) {
SERVER_LOG_WARNING << "Empty distance source array";
ENGINE_LOG_WARNING << "Empty distance source array";
return Status::OK();
}
......@@ -161,15 +231,27 @@ Status SearchTask::MergeResult(SearchContext::Id2DistanceMap &distance_src,
break;
}
//compare score, put smallest score to score_merged one by one
//compare score,
// if ascending = true, put smallest score to score_merged one by one
// else, put largest score to score_merged one by one
auto& src_pair = distance_src[src_index];
auto& target_pair = distance_target[target_index];
if(src_pair.second > target_pair.second) {
distance_merged.push_back(target_pair);
target_index++;
if(ascending){
if(src_pair.second > target_pair.second) {
distance_merged.push_back(target_pair);
target_index++;
} else {
distance_merged.push_back(src_pair);
src_index++;
}
} else {
distance_merged.push_back(src_pair);
src_index++;
if(src_pair.second < target_pair.second) {
distance_merged.push_back(target_pair);
target_index++;
} else {
distance_merged.push_back(src_pair);
src_index++;
}
}
//score_merged.size() already equal topk
......@@ -185,6 +267,7 @@ Status SearchTask::MergeResult(SearchContext::Id2DistanceMap &distance_src,
Status SearchTask::TopkResult(SearchContext::ResultSet &result_src,
uint64_t topk,
bool ascending,
SearchContext::ResultSet &result_target) {
if (result_target.empty()) {
result_target.swap(result_src);
......@@ -193,14 +276,22 @@ Status SearchTask::TopkResult(SearchContext::ResultSet &result_src,
if (result_src.size() != result_target.size()) {
std::string msg = "Invalid result set size";
SERVER_LOG_ERROR << msg;
ENGINE_LOG_ERROR << msg;
return Status::Error(msg);
}
for (size_t i = 0; i < result_src.size(); i++) {
SearchContext::Id2DistanceMap &score_src = result_src[i];
SearchContext::Id2DistanceMap &score_target = result_target[i];
SearchTask::MergeResult(score_src, score_target, topk);
std::function<void(size_t, size_t)> ReduceWorker = [&](size_t from_index, size_t to_index) {
for (size_t i = from_index; i < to_index; i++) {
SearchContext::Id2DistanceMap &score_src = result_src[i];
SearchContext::Id2DistanceMap &score_target = result_target[i];
SearchTask::MergeResult(score_src, score_target, topk, ascending);
}
};
if(NeedParallelReduce(result_src.size(), topk)) {
ParallelReduce(ReduceWorker, result_src.size());
} else {
ReduceWorker(0, result_src.size());
}
return Status::OK();
......
......@@ -27,10 +27,12 @@ public:
static Status MergeResult(SearchContext::Id2DistanceMap &distance_src,
SearchContext::Id2DistanceMap &distance_target,
uint64_t topk);
uint64_t topk,
bool ascending);
static Status TopkResult(SearchContext::ResultSet &result_src,
uint64_t topk,
bool ascending,
SearchContext::ResultSet &result_target);
public:
......@@ -38,6 +40,7 @@ public:
int index_type_ = 0; //for metrics
ExecutionEnginePtr index_engine_;
std::vector<SearchContextPtr> search_contexts_;
bool metric_l2 = true;
};
using SearchTaskPtr = std::shared_ptr<SearchTask>;
......
......@@ -8,6 +8,7 @@
#include <iostream>
#include <time.h>
#include <chrono>
#include <unistd.h>
using namespace ::milvus;
......@@ -21,7 +22,8 @@ namespace {
static constexpr int64_t NQ = 10;
static constexpr int64_t TOP_K = 10;
static constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
static constexpr int64_t ADD_VECTOR_LOOP = 5;
static constexpr int64_t ADD_VECTOR_LOOP = 10;
static constexpr int64_t SECONDS_EACH_HOUR = 3600;
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
......@@ -34,26 +36,17 @@ namespace {
BLOCK_SPLITER
}
void PrintRecordIdArray(const std::vector<int64_t>& record_ids) {
BLOCK_SPLITER
std::cout << "Returned id array count: " << record_ids.size() << std::endl;
#if 0
for(auto id : record_ids) {
std::cout << std::to_string(id) << std::endl;
}
#endif
BLOCK_SPLITER
}
void PrintSearchResult(const std::vector<TopKQueryResult>& topk_query_result_array) {
void PrintSearchResult(const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::vector<TopKQueryResult>& topk_query_result_array) {
BLOCK_SPLITER
std::cout << "Returned result count: " << topk_query_result_array.size() << std::endl;
int32_t index = 0;
for(auto& result : topk_query_result_array) {
auto search_id = search_record_array[index].first;
index++;
std::cout << "No." << std::to_string(index) << " vector top "
<< std::to_string(result.query_result_arrays.size())
std::cout << "No." << std::to_string(index) << " vector " << std::to_string(search_id)
<< " top " << std::to_string(result.query_result_arrays.size())
<< " search result:" << std::endl;
for(auto& item : result.query_result_arrays) {
std::cout << "\t" << std::to_string(item.id) << "\tdistance:" << std::to_string(item.distance);
......@@ -67,7 +60,7 @@ namespace {
std::string CurrentTime() {
time_t tt;
time( &tt );
tt = tt + 8*3600;
tt = tt + 8*SECONDS_EACH_HOUR;
tm* t= gmtime( &tt );
std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1)
......@@ -77,10 +70,11 @@ namespace {
return str;
}
std::string CurrentTmDate() {
std::string CurrentTmDate(int64_t offset_day = 0) {
time_t tt;
time( &tt );
tt = tt + 8*3600;
tt = tt + 8*SECONDS_EACH_HOUR;
tt = tt + 24*SECONDS_EACH_HOUR*offset_day;
tm* t= gmtime( &tt );
std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1)
......@@ -91,13 +85,13 @@ namespace {
std::string GetTableName() {
static std::string s_id(CurrentTime());
return s_id;
return "tbl_" + s_id;
}
TableSchema BuildTableSchema() {
TableSchema tb_schema;
tb_schema.table_name = TABLE_NAME;
tb_schema.index_type = IndexType::cpu_idmap;
tb_schema.index_type = IndexType::gpu_ivfflat;
tb_schema.dimension = TABLE_DIMENSION;
tb_schema.store_raw_vector = true;
......@@ -126,6 +120,66 @@ namespace {
std::cout << "Waiting " << seconds << " seconds ..." << std::endl;
sleep(seconds);
}
class TimeRecorder {
public:
TimeRecorder(const std::string& title)
: title_(title) {
start_ = std::chrono::system_clock::now();
}
~TimeRecorder() {
std::chrono::system_clock::time_point end = std::chrono::system_clock::now();
long span = (std::chrono::duration_cast<std::chrono::milliseconds> (end - start_)).count();
std::cout << title_ << " totally cost: " << span << " ms" << std::endl;
}
private:
std::string title_;
std::chrono::system_clock::time_point start_;
};
void CheckResult(const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::vector<TopKQueryResult>& topk_query_result_array) {
BLOCK_SPLITER
int64_t index = 0;
for(auto& result : topk_query_result_array) {
auto result_id = result.query_result_arrays[0].id;
auto search_id = search_record_array[index++].first;
if(result_id != search_id) {
std::cout << "The top 1 result is wrong: " << result_id
<< " vs. " << search_id << std::endl;
} else {
std::cout << "No." << index-1 << " Check result successfully" << std::endl;
}
}
BLOCK_SPLITER
}
void DoSearch(std::shared_ptr<Connection> conn,
const std::vector<std::pair<int64_t, RowRecord>>& search_record_array,
const std::string& phase_name) {
std::vector<Range> query_range_array;
Range rg;
rg.start_value = CurrentTmDate();
rg.end_value = CurrentTmDate(1);
query_range_array.emplace_back(rg);
std::vector<RowRecord> record_array;
for(auto& pair : search_record_array) {
record_array.push_back(pair.second);
}
std::vector<TopKQueryResult> topk_query_result_array;
{
TimeRecorder rc(phase_name);
Status stat = conn->SearchVector(TABLE_NAME, record_array, query_range_array, TOP_K, topk_query_result_array);
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl;
}
PrintSearchResult(search_record_array, topk_query_result_array);
CheckResult(search_record_array, topk_query_result_array);
}
}
void
......@@ -179,30 +233,40 @@ ClientTest::Test(const std::string& address, const std::string& port) {
PrintTableSchema(tb_schema);
}
for(int i = 0; i < ADD_VECTOR_LOOP; i++){//add vectors
//add vectors
std::vector<std::pair<int64_t, RowRecord>> search_record_array;
for (int i = 0; i < ADD_VECTOR_LOOP; i++) {
TimeRecorder recorder("Add vector No." + std::to_string(i));
std::vector<RowRecord> record_array;
BuildVectors(i*BATCH_ROW_COUNT, (i+1)*BATCH_ROW_COUNT, record_array);
int64_t begin_index = i * BATCH_ROW_COUNT;
BuildVectors(begin_index, begin_index + BATCH_ROW_COUNT, record_array);
std::vector<int64_t> record_ids;
Status stat = conn->AddVector(TABLE_NAME, record_array, record_ids);
std::cout << "AddVector function call status: " << stat.ToString() << std::endl;
PrintRecordIdArray(record_ids);
std::cout << "Returned id array count: " << record_ids.size() << std::endl;
if(i == 0) {
for(int64_t k = SEARCH_TARGET; k < SEARCH_TARGET + NQ; k++) {
search_record_array.push_back(
std::make_pair(record_ids[k], record_array[k]));
}
}
}
{//search vectors
{//search vectors without index
Sleep(2);
DoSearch(conn, search_record_array, "Search without index");
}
std::vector<RowRecord> record_array;
BuildVectors(SEARCH_TARGET, SEARCH_TARGET + NQ, record_array);
{//wait unit build index finish
TimeRecorder recorder("Build index");
std::cout << "Wait until build all index done" << std::endl;
Status stat = conn->BuildIndex(TABLE_NAME);
std::cout << "BuildIndex function call status: " << stat.ToString() << std::endl;
}
std::vector<Range> query_range_array;
Range rg;
rg.start_value = CurrentTmDate();
rg.end_value = CurrentTmDate();
query_range_array.emplace_back(rg);
std::vector<TopKQueryResult> topk_query_result_array;
Status stat = conn->SearchVector(TABLE_NAME, record_array, query_range_array, TOP_K, topk_query_result_array);
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl;
PrintSearchResult(topk_query_result_array);
{//search vectors after build index finish
DoSearch(conn, search_record_array, "Search after build index finish");
}
{//delete table
......
此差异已折叠。
......@@ -27,6 +27,8 @@ public:
virtual Status DeleteTable(const std::string &table_name) override;
virtual Status BuildIndex(const std::string &table_name) override;
virtual Status AddVector(const std::string &table_name,
const std::vector<RowRecord> &record_array,
std::vector<int64_t> &id_array) override;
......
......@@ -66,6 +66,11 @@ ConnectionImpl::DeleteTable(const std::string &table_name) {
return client_proxy_->DeleteTable(table_name);
}
Status
ConnectionImpl::BuildIndex(const std::string &table_name) {
return client_proxy_->BuildIndex(table_name);
}
Status
ConnectionImpl::AddVector(const std::string &table_name,
const std::vector<RowRecord> &record_array,
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册