提交 15d76b84 编写于 作者: Y yudong.cai

Merge remote-tracking branch 'main/branch-0.3.1' into branch-0.3.1


Former-commit-id: 5e4bbae09c91b6aebec06951921baa1ea4ac89b5
......@@ -16,3 +16,5 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-1 - Add CHANGELOG.md
- MS-161 - Add CI / CD Module to Milvus Project
- MS-202 - Add Milvus Jenkins project email notification
- MS-215 - Add Milvus cluster CI/CD groovy file
try {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}"
if (currentBuild.result == 'ABORTED') {
throw new hudson.AbortException("Dev Test Aborted !")
} else if (currentBuild.result == 'FAILURE') {
error("Dev Test Failure !")
def result = sh script: "helm status ${env.JOB_NAME}-${env.BUILD_NUMBER}", returnStatus: true
if (!result) {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}"
}
} catch (exc) {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}"
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
def result = sh script: "helm status ${env.JOB_NAME}-${env.BUILD_NUMBER}", returnStatus: true
if (!result) {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}"
}
throw exc
}
try {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster"
if (currentBuild.result == 'ABORTED') {
throw new hudson.AbortException("Cluster Dev Test Aborted !")
} else if (currentBuild.result == 'FAILURE') {
error("Dev Test Failure !")
def result = sh script: "helm status ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster", returnStatus: true
if (!result) {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster"
}
} catch (exc) {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster"
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
def result = sh script: "helm status ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster", returnStatus: true
if (!result) {
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster"
}
throw exc
}
......@@ -3,17 +3,18 @@ try {
sh 'helm repo add milvus https://registry.zilliz.com/chartrepo/milvus'
sh 'helm repo update'
dir ("milvus-helm") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [[$class: 'SubmoduleOption',disableSubmodules: false,parentCredentials: true,recursiveSubmodules: true,reference: '',trackingSubmodules: false]], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus-helm.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus-helm.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
dir ("milvus/milvus-cluster") {
sh "helm install --set roServers.image.tag=${DOCKER_VERSION} --set woServers.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP -f ci/values.yaml --name ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster --namespace milvus-cluster --version 0.1.0 . "
sh "helm install --set roServers.image.tag=${DOCKER_VERSION} --set woServers.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP -f ci/values.yaml --name ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster --namespace milvus-cluster --version 0.3.1 . "
}
}
timeout(time: 2, unit: 'MINUTES') {
waitUntil {
def result = sh script: "nc -z -w 2 ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster-milvus-cluster-proxy.milvus-cluster.svc.cluster.local 19530", returnStatus: true
def result = sh script: "nc -z -w 3 ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster-milvus-cluster-proxy.milvus-cluster.svc.cluster.local 19530", returnStatus: true
return !result
}
}
} catch (exc) {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo 'Helm running failed!'
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster"
throw exc
......
container('milvus-testframework') {
timeout(time: 10, unit: 'MINUTES') {
gitlabCommitStatus(name: 'Dev Test') {
try {
dir ("${PROJECT_NAME}_test") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh 'python3 -m pip install -r requirements.txt'
sh "pytest . --alluredir=cluster_test_out --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster-milvus-cluster-proxy.milvus-cluster.svc.cluster.local"
}
} catch (exc) {
updateGitlabCommitStatus name: 'Dev Test', state: 'failed'
currentBuild.result = 'FAILURE'
echo 'Milvus Test Failed !'
}
timeout(time: 10, unit: 'MINUTES') {
try {
dir ("${PROJECT_NAME}_test") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh 'python3 -m pip install -r requirements.txt'
sh "pytest . --alluredir=cluster_test_out --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-cluster-milvus-cluster-proxy.milvus-cluster.svc.cluster.local"
}
} catch (exc) {
echo 'Milvus Cluster Test Failed !'
throw exc
}
}
......@@ -2,13 +2,19 @@ try {
sh 'helm init --client-only --skip-refresh --stable-repo-url https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts'
sh 'helm repo add milvus https://registry.zilliz.com/chartrepo/milvus'
sh 'helm repo update'
sh "helm install --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.JOB_NAME}-${env.BUILD_NUMBER} --version 0.3.0 milvus/milvus-gpu"
waitUntil {
def result = sh script: "nc -z -w 2 ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.kube-opt.svc.cluster.local 19530", returnStatus: true
return !result
dir ("milvus-helm") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:megasearch/milvus-helm.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
dir ("milvus/milvus-gpu") {
sh "helm install --set engine.image.tag=${DOCKER_VERSION} --set expose.type=clusterIP --name ${env.JOB_NAME}-${env.BUILD_NUMBER} -f ci/values.yaml --version 0.3.1 ."
}
}
timeout(time: 2, unit: 'MINUTES') {
waitUntil {
def result = sh script: "nc -z -w 3 ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.kube-opt.svc.cluster.local 19530", returnStatus: true
return !result
}
}
} catch (exc) {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo 'Helm running failed!'
sh "helm del --purge ${env.JOB_NAME}-${env.BUILD_NUMBER}"
throw exc
......
container('milvus-testframework') {
timeout(time: 10, unit: 'MINUTES') {
gitlabCommitStatus(name: 'Dev Test') {
try {
dir ("${PROJECT_NAME}_test") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh 'python3 -m pip install -r requirements.txt'
sh "pytest . --alluredir=test_out --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.kube-opt.svc.cluster.local"
}
} catch (exc) {
updateGitlabCommitStatus name: 'Dev Test', state: 'failed'
currentBuild.result = 'FAILURE'
echo 'Milvus Test Failed !'
}
timeout(time: 10, unit: 'MINUTES') {
try {
dir ("${PROJECT_NAME}_test") {
checkout([$class: 'GitSCM', branches: [[name: "${SEMVER}"]], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: "${params.GIT_USER}", url: "git@192.168.1.105:Test/milvus_test.git", name: 'origin', refspec: "+refs/heads/${SEMVER}:refs/remotes/origin/${SEMVER}"]]])
sh 'python3 -m pip install -r requirements.txt'
sh "pytest . --alluredir=test_out --ip ${env.JOB_NAME}-${env.BUILD_NUMBER}-milvus-gpu-engine.kube-opt.svc.cluster.local"
}
} catch (exc) {
echo 'Milvus Test Failed !'
throw exc
}
}
container('milvus-build-env') {
timeout(time: 20, unit: 'MINUTES') {
timeout(time: 30, unit: 'MINUTES') {
gitlabCommitStatus(name: 'Build Engine') {
dir ("milvus_engine") {
try {
......@@ -17,4 +17,3 @@ container('milvus-build-env') {
}
}
}
container('milvus-build-env') {
timeout(time: 20, unit: 'MINUTES') {
timeout(time: 30, unit: 'MINUTES') {
gitlabCommitStatus(name: 'Build Engine') {
dir ("milvus_engine") {
try {
......@@ -17,4 +17,3 @@ container('milvus-build-env') {
}
}
}
......@@ -12,7 +12,10 @@ container('publish-docker') {
def customImage = docker.build("${PROJECT_NAME}/engine:${DOCKER_VERSION}")
customImage.push()
}
echo "Docker Pull Command: docker pull registry.zilliz.com/${PROJECT_NAME}/engine:${DOCKER_VERSION}"
if (currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
updateGitlabCommitStatus name: 'Publish Engine Docker', state: 'success'
echo "Docker Pull Command: docker pull registry.zilliz.com/${PROJECT_NAME}/engine:${DOCKER_VERSION}"
}
} catch (exc) {
updateGitlabCommitStatus name: 'Publish Engine Docker', state: 'canceled'
throw exc
......
container('milvus-testframework') {
timeout(time: 5, unit: 'MINUTES') {
dir ("${PROJECT_NAME}_test") {
gitlabCommitStatus(name: 'Upload Dev Test Out') {
if (fileExists('cluster_test_out')) {
try {
def fileTransfer = load "${env.WORKSPACE}/ci/function/file_transfer.groovy"
fileTransfer.FileTransfer("cluster_test_out/", "${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}", 'nas storage')
if (currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
echo "Milvus Dev Test Out Viewer \"ftp://192.168.1.126/data/${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}\""
}
} catch (hudson.AbortException ae) {
updateGitlabCommitStatus name: 'Upload Dev Test Out', state: 'canceled'
currentBuild.result = 'ABORTED'
} catch (exc) {
updateGitlabCommitStatus name: 'Upload Dev Test Out', state: 'failed'
currentBuild.result = 'FAILURE'
}
} else {
updateGitlabCommitStatus name: 'Upload Dev Test Out', state: 'failed'
echo "Milvus Dev Test Out directory don't exists!"
}
timeout(time: 5, unit: 'MINUTES') {
dir ("${PROJECT_NAME}_test") {
if (fileExists('cluster_test_out')) {
def fileTransfer = load "${env.WORKSPACE}/ci/function/file_transfer.groovy"
fileTransfer.FileTransfer("cluster_test_out/", "${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}", 'nas storage')
if (currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
echo "Milvus Dev Test Out Viewer \"ftp://192.168.1.126/data/${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}\""
}
} else {
error("Milvus Dev Test Out directory don't exists!")
}
}
}
container('milvus-testframework') {
timeout(time: 5, unit: 'MINUTES') {
dir ("${PROJECT_NAME}_test") {
gitlabCommitStatus(name: 'Upload Dev Test Out') {
if (fileExists('test_out')) {
try {
def fileTransfer = load "${env.WORKSPACE}/ci/function/file_transfer.groovy"
fileTransfer.FileTransfer("test_out/", "${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}", 'nas storage')
if (currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
echo "Milvus Dev Test Out Viewer \"ftp://192.168.1.126/data/${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}\""
}
} catch (hudson.AbortException ae) {
updateGitlabCommitStatus name: 'Upload Dev Test Out', state: 'canceled'
currentBuild.result = 'ABORTED'
} catch (exc) {
updateGitlabCommitStatus name: 'Upload Dev Test Out', state: 'failed'
currentBuild.result = 'FAILURE'
}
} else {
updateGitlabCommitStatus name: 'Upload Dev Test Out', state: 'failed'
echo "Milvus Dev Test Out directory don't exists!"
}
timeout(time: 5, unit: 'MINUTES') {
dir ("${PROJECT_NAME}_test") {
if (fileExists('test_out')) {
def fileTransfer = load "${env.WORKSPACE}/ci/function/file_transfer.groovy"
fileTransfer.FileTransfer("test_out/", "${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}", 'nas storage')
if (currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
echo "Milvus Dev Test Out Viewer \"ftp://192.168.1.126/data/${PROJECT_NAME}/test/${JOB_NAME}-${BUILD_ID}\""
}
} else {
error("Milvus Dev Test Out directory don't exists!")
}
}
}
......@@ -35,7 +35,7 @@ pipeline {
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-build-env'
image 'registry.zilliz.com/milvus/milvus-build-env:v0.10'
image 'registry.zilliz.com/milvus/milvus-build-env:v0.11'
ttyEnabled true
command 'cat'
}
......@@ -131,59 +131,53 @@ spec:
stage("Deploy to Development") {
parallel {
stage("Single") {
stage("Single Node") {
agent {
kubernetes {
label 'dev-test'
defaultContainer 'jnlp'
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage("Deploy to Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Deploy') {
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'canceled'
echo "Milvus Deloy to Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo "Milvus Deloy to Dev failure !"
}
}
}
}
stage("Dev Test") {
agent {
kubernetes {
label 'test'
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-testframework'
image 'registry.zilliz.com/milvus/milvus-test:v0.1'
ttyEnabled true
command 'cat'
}
}
}
stages {
stage('Test') {
steps {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_test_out.groovy"
......@@ -192,140 +186,133 @@ spec:
}
}
}
stage ("Cleanup Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Cleanup') {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'canceled'
echo "Milvus Cleanup Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
echo "Milvus Cleanup Dev failure !"
}
}
}
post {
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Single Node CI/CD success !"
}
}
aborted {
script {
echo "Milvus Single Node CI/CD aborted !"
}
}
failure {
script {
echo "Milvus Single Node CI/CD failure !"
}
}
}
}
stage("Cluster") {
agent {
kubernetes {
label 'dev-test'
defaultContainer 'jnlp'
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage("Deploy to Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Deploy') {
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_deploy2dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_deploy2dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'canceled'
echo "Milvus Deloy to Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo "Milvus Deloy to Dev failure !"
}
}
}
}
stage("Dev Test") {
agent {
kubernetes {
label 'test'
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-testframework'
image 'registry.zilliz.com/milvus/milvus-test:v0.1'
ttyEnabled true
command 'cat'
}
}
}
stages {
stage('Test') {
steps {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_cluster_dev_test_out.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_cluster_test_out.groovy"
}
}
}
}
}
stage ("Cleanup Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Cleanup') {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'canceled'
echo "Milvus Cleanup Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
echo "Milvus Cleanup Dev failure !"
}
}
}
post {
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Cluster CI/CD success !"
}
}
aborted {
script {
echo "Milvus Cluster CI/CD aborted !"
}
}
failure {
script {
echo "Milvus Cluster CI/CD failure !"
}
}
}
}
}
......@@ -337,16 +324,18 @@ spec:
post {
always {
script {
if (!currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
// Send an email only if the build status has changed from green/unstable to red
emailext subject: '$DEFAULT_SUBJECT',
body: '$DEFAULT_CONTENT',
recipientProviders: [
[$class: 'DevelopersRecipientProvider'],
[$class: 'RequesterRecipientProvider']
],
replyTo: '$DEFAULT_REPLYTO',
to: '$DEFAULT_RECIPIENTS'
if (env.gitlabAfter != null) {
if (!currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
// Send an email only if the build status has changed from green/unstable to red
emailext subject: '$DEFAULT_SUBJECT',
body: '$DEFAULT_CONTENT',
recipientProviders: [
[$class: 'DevelopersRecipientProvider'],
[$class: 'RequesterRecipientProvider']
],
replyTo: '$DEFAULT_REPLYTO',
to: '$DEFAULT_RECIPIENTS'
}
}
}
}
......
......@@ -35,7 +35,7 @@ pipeline {
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-build-env'
image 'registry.zilliz.com/milvus/milvus-build-env:v0.10'
image 'registry.zilliz.com/milvus/milvus-build-env:v0.11'
ttyEnabled true
command 'cat'
}
......@@ -131,59 +131,53 @@ spec:
stage("Deploy to Development") {
parallel {
stage("Single") {
stage("Single Node") {
agent {
kubernetes {
label 'dev-test'
defaultContainer 'jnlp'
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage("Deploy to Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Deploy') {
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'canceled'
echo "Milvus Deloy to Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo "Milvus Deloy to Dev failure !"
}
}
}
}
stage("Dev Test") {
agent {
kubernetes {
label 'test'
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-testframework'
image 'registry.zilliz.com/milvus/milvus-test:v0.1'
ttyEnabled true
command 'cat'
}
}
}
stages {
stage('Test') {
steps {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_test_out.groovy"
......@@ -192,140 +186,133 @@ spec:
}
}
}
stage ("Cleanup Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Cleanup') {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'canceled'
echo "Milvus Cleanup Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
echo "Milvus Cleanup Dev failure !"
}
}
}
post {
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Single Node CI/CD success !"
}
}
aborted {
script {
echo "Milvus Single Node CI/CD aborted !"
}
}
failure {
script {
echo "Milvus Single Node CI/CD failure !"
}
}
}
}
stage("Cluster") {
agent {
kubernetes {
label 'dev-test'
defaultContainer 'jnlp'
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage("Deploy to Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Deploy') {
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_deploy2dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_deploy2dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'canceled'
echo "Milvus Deloy to Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo "Milvus Deloy to Dev failure !"
}
}
}
}
stage("Dev Test") {
agent {
kubernetes {
label 'test'
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-testframework'
image 'registry.zilliz.com/milvus/milvus-test:v0.1'
ttyEnabled true
command 'cat'
}
}
}
stages {
stage('Test') {
steps {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_cluster_dev_test_out.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_cluster_test_out.groovy"
}
}
}
}
}
stage ("Cleanup Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Cleanup') {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'canceled'
echo "Milvus Cleanup Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
echo "Milvus Cleanup Dev failure !"
}
}
}
post {
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Cluster CI/CD success !"
}
}
aborted {
script {
echo "Milvus Cluster CI/CD aborted !"
}
}
failure {
script {
echo "Milvus Cluster CI/CD failure !"
}
}
}
}
}
......@@ -337,16 +324,18 @@ spec:
post {
always {
script {
if (!currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
// Send an email only if the build status has changed from green/unstable to red
emailext subject: '$DEFAULT_SUBJECT',
body: '$DEFAULT_CONTENT',
recipientProviders: [
[$class: 'DevelopersRecipientProvider'],
[$class: 'RequesterRecipientProvider']
],
replyTo: '$DEFAULT_REPLYTO',
to: '$DEFAULT_RECIPIENTS'
if (env.gitlabAfter != null) {
if (!currentBuild.resultIsBetterOrEqualTo('SUCCESS')) {
// Send an email only if the build status has changed from green/unstable to red
emailext subject: '$DEFAULT_SUBJECT',
body: '$DEFAULT_CONTENT',
recipientProviders: [
[$class: 'DevelopersRecipientProvider'],
[$class: 'RequesterRecipientProvider']
],
replyTo: '$DEFAULT_REPLYTO',
to: '$DEFAULT_RECIPIENTS'
}
}
}
}
......
......@@ -35,7 +35,7 @@ pipeline {
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-build-env'
image 'registry.zilliz.com/milvus/milvus-build-env:v0.10'
image 'registry.zilliz.com/milvus/milvus-build-env:v0.11'
ttyEnabled true
command 'cat'
}
......@@ -131,59 +131,53 @@ spec:
stage("Deploy to Development") {
parallel {
stage("Single") {
stage("Single Node") {
agent {
kubernetes {
label 'dev-test'
defaultContainer 'jnlp'
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage("Deploy to Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Deploy') {
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/deploy2dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'canceled'
echo "Milvus Deloy to Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo "Milvus Deloy to Dev failure !"
}
}
}
}
stage("Dev Test") {
agent {
kubernetes {
label 'test'
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-testframework'
image 'registry.zilliz.com/milvus/milvus-test:v0.1'
ttyEnabled true
command 'cat'
}
}
}
stages {
stage('Test') {
steps {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_test_out.groovy"
......@@ -192,140 +186,133 @@ spec:
}
}
}
stage ("Cleanup Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Cleanup') {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'canceled'
echo "Milvus Cleanup Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
echo "Milvus Cleanup Dev failure !"
}
}
}
post {
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Single Node CI/CD success !"
}
}
aborted {
script {
echo "Milvus Single Node CI/CD aborted !"
}
}
failure {
script {
echo "Milvus Single Node CI/CD failure !"
}
}
}
}
stage("Cluster") {
agent {
kubernetes {
label 'dev-test'
defaultContainer 'jnlp'
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: milvus
componet: test
spec:
containers:
- name: milvus-testframework
image: registry.zilliz.com/milvus/milvus-test:v0.2
command:
- cat
tty: true
volumeMounts:
- name: kubeconf
mountPath: /root/.kube/
readOnly: true
volumes:
- name: kubeconf
secret:
secretName: test-cluster-config
"""
}
}
stages {
stage("Deploy to Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Deploy') {
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_deploy2dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Deloy to Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_deploy2dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'canceled'
echo "Milvus Deloy to Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Deloy to Dev', state: 'failed'
echo "Milvus Deloy to Dev failure !"
}
}
}
}
stage("Dev Test") {
agent {
kubernetes {
label 'test'
defaultContainer 'jnlp'
containerTemplate {
name 'milvus-testframework'
image 'registry.zilliz.com/milvus/milvus-test:v0.1'
ttyEnabled true
command 'cat'
}
}
}
stages {
stage('Test') {
steps {
steps {
gitlabCommitStatus(name: 'Deloy Test') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_dev_test.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_cluster_dev_test_out.groovy"
load "${env.WORKSPACE}/ci/jenkinsfile/upload_dev_cluster_test_out.groovy"
}
}
}
}
}
stage ("Cleanup Dev") {
agent {
kubernetes {
label 'jenkins-slave'
defaultContainer 'jnlp'
}
}
stages {
stage('Cleanup') {
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
steps {
gitlabCommitStatus(name: 'Cleanup Dev') {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
}
}
}
post {
aborted {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'canceled'
echo "Milvus Cleanup Dev aborted !"
}
}
failure {
script {
updateGitlabCommitStatus name: 'Cleanup Dev', state: 'failed'
echo "Milvus Cleanup Dev failure !"
}
}
}
post {
always {
container('milvus-testframework') {
script {
load "${env.WORKSPACE}/ci/jenkinsfile/cluster_cleanup_dev.groovy"
}
}
}
success {
script {
echo "Milvus Cluster CI/CD success !"
}
}
aborted {
script {
echo "Milvus Cluster CI/CD aborted !"
}
}
failure {
script {
echo "Milvus Cluster CI/CD failure !"
}
}
}
}
}
......
......@@ -9,8 +9,17 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-148 - Disable cleanup if mode is read only
- MS-149 - Fixed searching only one index file issue in distributed mode
- MS-153 - fix c_str error when connecting to MySQL
- MS-157 - fix changelog
- MS-153 - Fix c_str error when connecting to MySQL
- MS-157 - Fix changelog
- MS-190 - Use env variable to switch mem manager and fix cmake
- MS-217 - Fix SQ8 row count bug
- MS-224 - Return AlreadyExist status in MySQLMetaImpl::CreateTable if table already exists
- MS-232 - Add MySQLMetaImpl::UpdateTableFilesToIndex and set maximum_memory to default if config value = 0
- MS-233 - Remove mem manager log
- MS-230 - Change parameter name: Maximum_memory to insert_buffer_size
- MS-234 - Some case cause background merge thread stop
- MS-235 - Some test cases random fail
- MS-236 - Add MySQLMetaImpl::HasNonIndexFiles
## Improvement
- MS-156 - Add unittest for merge result functions
......@@ -19,8 +28,12 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-206 - Support SQ8 index type
- MS-208 - Add buildinde interface for C++ SDK
- MS-212 - Support Inner product metric type
- MS-241 - Build Faiss with MKL if using Intel CPU; else build with OpenBlas
- MS-242 - Clean up cmake and change MAKE_BUILD_ARGS to be user defined variable
- MS-245 - Improve search result transfer performance
## New Feature
- MS-180 - Add new mem manager
- MS-195 - Add nlist and use_blas_threshold conf
## Task
......@@ -76,6 +89,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-144 - Add nprobe config
- MS-147 - Enable IVF
- MS-130 - Add prometheus_test
## Task
- MS-74 - Change README.md in cpp
- MS-88 - Add support for arm architecture
......
### Compilation
#### Step 1: install necessery tools
Install MySQL
centos7 :
yum install gfortran qt4 flex bison mysql-devel
yum install gfortran qt4 flex bison mysql-devel mysql
ubuntu16.04 :
sudo apt-get install gfortran qt4-qmake flex bison libmysqlclient-dev
sudo apt-get install gfortran qt4-qmake flex bison libmysqlclient-dev mysql-client
cd scripts && sudo ./requirements.sh
If `libmysqlclient_r.so` does not exist after installing MySQL Development Files, you need to create a symbolic link:
......
......@@ -57,8 +57,6 @@ define_option(MILVUS_VERBOSE_THIRDPARTY_BUILD
define_option(MILVUS_WITH_ARROW "Build with ARROW" OFF)
define_option(MILVUS_BOOST_USE_SHARED "Rely on boost shared libraries where relevant" OFF)
define_option(MILVUS_BOOST_VENDORED "Use vendored Boost instead of existing Boost. \
Note that this requires linking Boost statically" ON)
......
此差异已折叠。
......@@ -14,8 +14,10 @@ db_config:
db_backend_url: sqlite://:@:/
index_building_threshold: 1024 # index building trigger threshold, default: 1024, unit: MB
archive_disk_threshold: 512 # triger archive action if storage size exceed this value, unit: GB
archive_days_threshold: 30 # files older than x days will be archived, unit: day
archive_disk_threshold: 0 # triger archive action if storage size exceed this value, 0 means no limit, unit: GB
archive_days_threshold: 0 # files older than x days will be archived, 0 means no limit, unit: day
insert_buffer_size: 4 # maximum insert buffer size allowed, default: 4, unit: GB, should be at least 1 GB.
# the sum of insert_buffer_size and cpu_cache_capacity should be less than total memory
metric_config:
is_startup: off # if monitoring start: on, off
......@@ -37,4 +39,4 @@ engine_config:
nprobe: 10
nlist: 16384
use_blas_threshold: 20
metric_type: L2 #L2 or Inner Product
\ No newline at end of file
metric_type: L2 # compare vectors by euclidean distance(L2) or inner product(IP), optional: L2 or IP
......@@ -13,6 +13,27 @@ DIR_LCOV_OUTPUT="lcov_out"
DIR_GCNO="cmake_build"
DIR_UNITTEST="milvus/bin"
MYSQL_USER_NAME=root
MYSQL_PASSWORD=Fantast1c
MYSQL_HOST='192.168.1.194'
MYSQL_PORT='3306'
MYSQL_DB_NAME=milvus_`date +%s%N`
function mysql_exc()
{
cmd=$1
mysql -h${MYSQL_HOST} -u${MYSQL_USER_NAME} -p${MYSQL_PASSWORD} -e "${cmd}"
if [ $? -ne 0 ]; then
echo "mysql $cmd run failed"
fi
}
mysql_exc "CREATE DATABASE IF NOT EXISTS ${MYSQL_DB_NAME};"
mysql_exc "GRANT ALL PRIVILEGES ON ${MYSQL_DB_NAME}.* TO '${MYSQL_USER_NAME}'@'%';"
mysql_exc "FLUSH PRIVILEGES;"
mysql_exc "USE ${MYSQL_DB_NAME};"
MYSQL_USER_NAME=root
MYSQL_PASSWORD=Fantast1c
......
#!/usr/bin/env bash
wget -P /tmp https://apt.repos.intel.com/intel-gpg-keys/GPG-PUB-KEY-INTEL-SW-PRODUCTS-2019.PUB
apt-key add /tmp/GPG-PUB-KEY-INTEL-SW-PRODUCTS-2019.PUB
sh -c 'echo deb https://apt.repos.intel.com/mkl all main > /etc/apt/sources.list.d/intel-mkl.list'
apt -y update && apt-get -y install intel-mkl-gnu-2019.4-243 intel-mkl-core-2019.4-243
#sh -c 'echo export LD_LIBRARY_PATH=/opt/intel/compilers_and_libraries_2019.4.243/linux/mkl/lib/intel64:\$LD_LIBRARY_PATH > /etc/profile.d/mkl.sh'
#source /etc/profile
......@@ -48,7 +48,6 @@ set(engine_files
${db_files}
${db_scheduler_files}
${wrapper_files}
# metrics/Metrics.cpp
${metrics_files}
)
......@@ -71,8 +70,6 @@ set(third_party_libs
yaml-cpp
libgpufaiss.a
faiss
lapack
openblas
prometheus-cpp-push
prometheus-cpp-pull
prometheus-cpp-core
......@@ -88,9 +85,19 @@ set(third_party_libs
profiler
${CUDA_TOOLKIT_ROOT_DIR}/lib64/stubs/libnvidia-ml.so
)
if (MEGASEARCH_WITH_ARROW STREQUAL "ON")
set(third_party_libs ${third_party_libs} arrow)
endif()
endif()
if(${BUILD_FAISS_WITH_MKL} STREQUAL "true")
set(third_party_libs ${third_party_libs}
${MKL_LIBS}
${MKL_LIBS})
else()
set(third_party_libs ${third_party_libs}
lapack
openblas)
endif()
if (GPU_VERSION STREQUAL "ON")
link_directories("${CUDA_TOOLKIT_ROOT_DIR}/lib64")
......@@ -188,6 +195,6 @@ install(FILES
${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}
${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3
${CMAKE_BINARY_DIR}/mysqlpp_ep-prefix/src/mysqlpp_ep/lib/${CMAKE_SHARED_LIBRARY_PREFIX}mysqlpp${CMAKE_SHARED_LIBRARY_SUFFIX}.3.2.4
DESTINATION lib) #need to copy libmysqlpp.so
DESTINATION lib)
add_subdirectory(sdk)
......@@ -20,6 +20,11 @@ public:
: index_(index)
{}
DataObj(const engine::Index_ptr& index, int64_t size)
: index_(index),
size_(size)
{}
engine::Index_ptr data() { return index_; }
const engine::Index_ptr& data() const { return index_; }
......@@ -28,11 +33,16 @@ public:
return 0;
}
if(size_ > 0) {
return size_;
}
return index_->ntotal*(index_->dim*4);
}
private:
engine::Index_ptr index_ = nullptr;
int64_t size_ = 0;
};
using DataObjPtr = std::shared_ptr<DataObj>;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
namespace zilliz {
namespace milvus {
namespace engine {
const size_t K = 1024UL;
const size_t M = K * K;
const size_t G = K * M;
const size_t T = K * G;
const size_t MAX_TABLE_FILE_MEM = 128 * M;
const int VECTOR_TYPE_SIZE = sizeof(float);
} // namespace engine
} // namespace milvus
} // namespace zilliz
......@@ -87,9 +87,9 @@ DBImpl::DBImpl(const Options& options)
compact_thread_pool_(1, 1),
index_thread_pool_(1, 1) {
meta_ptr_ = DBMetaImplFactory::Build(options.meta, options.mode);
mem_mgr_ = std::make_shared<MemManager>(meta_ptr_, options_);
// mem_mgr_ = (MemManagerPtr)(new MemManager(meta_ptr_, options_));
mem_mgr_ = MemManagerFactory::Build(meta_ptr_, options_);
if (options.mode != Options::MODE::READ_ONLY) {
ENGINE_LOG_INFO << "StartTimerTasks";
StartTimerTasks();
}
}
......@@ -170,7 +170,10 @@ Status DBImpl::Query(const std::string& table_id, uint64_t k, uint64_t nq,
}
}
return QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
status = QueryAsync(table_id, file_id_array, k, nq, vectors, dates, results);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
return status;
}
Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>& file_ids,
......@@ -195,7 +198,10 @@ Status DBImpl::Query(const std::string& table_id, const std::vector<std::string>
return Status::Error("Invalid file id");
}
return QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info before query
status = QueryAsync(table_id, files_array, k, nq, vectors, dates, results);
cache::CpuCacheMgr::GetInstance()->PrintInfo(); //print cache info after query
return status;
}
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
......@@ -230,7 +236,6 @@ void DBImpl::BackgroundTimerTask() {
Status status;
server::SystemInfo::GetInstance().Init();
while (true) {
if (!bg_error_.ok()) break;
if (shutting_down_.load(std::memory_order_acquire)){
for(auto& iter : compact_thread_results_) {
iter.wait();
......@@ -385,15 +390,11 @@ Status DBImpl::BackgroundMergeFiles(const std::string& table_id) {
}
void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
// static int b_count = 0;
// b_count++;
// std::cout << "BackgroundCompaction: " << b_count << std::endl;
Status status;
for (auto& table_id : table_ids) {
status = BackgroundMergeFiles(table_id);
if (!status.ok()) {
bg_error_ = status;
ENGINE_LOG_ERROR << "Merge files for table " << table_id << " failed: " << status.ToString();
return;
}
}
......@@ -403,7 +404,6 @@ void DBImpl::BackgroundCompaction(std::set<std::string> table_ids) {
int ttl = 1;
if (options_.mode == Options::MODE::CLUSTER) {
ttl = meta::D_SEC;
// ENGINE_LOG_DEBUG << "Server mode is cluster. Clean up files with ttl = " << std::to_string(ttl) << "seconds.";
}
meta_ptr_->CleanUpFilesWithTTL(ttl);
}
......@@ -487,7 +487,7 @@ Status DBImpl::BuildIndex(const meta::TableFileSchema& file) {
//step 6: update meta
table_file.file_type_ = meta::TableFileSchema::INDEX;
table_file.size_ = index->PhysicalSize();
table_file.size_ = index->Size();
auto to_remove = file;
to_remove.file_type_ = meta::TableFileSchema::TO_DELETE;
......@@ -536,10 +536,9 @@ void DBImpl::BackgroundBuildIndex() {
meta_ptr_->FilesToIndex(to_index_files);
Status status;
for (auto& file : to_index_files) {
/* ENGINE_LOG_DEBUG << "Buiding index for " << file.location; */
status = BuildIndex(file);
if (!status.ok()) {
bg_error_ = status;
ENGINE_LOG_ERROR << "Building index for " << file.id_ << " failed: " << status.ToString();
return;
}
......@@ -547,7 +546,6 @@ void DBImpl::BackgroundBuildIndex() {
break;
}
}
/* ENGINE_LOG_DEBUG << "All Buiding index Done"; */
}
Status DBImpl::DropAll() {
......
......@@ -9,6 +9,7 @@
#include "MemManager.h"
#include "Types.h"
#include "utils/ThreadPool.h"
#include "MemManagerAbstract.h"
#include <mutex>
#include <condition_variable>
......@@ -33,7 +34,6 @@ class Meta;
class DBImpl : public DB {
public:
using MetaPtr = meta::Meta::Ptr;
using MemManagerPtr = typename MemManager::Ptr;
explicit DBImpl(const Options &options);
......@@ -118,16 +118,14 @@ class DBImpl : public DB {
BuildIndex(const meta::TableFileSchema &);
private:
const Options options_;
Status bg_error_;
std::atomic<bool> shutting_down_;
std::thread bg_timer_thread_;
MetaPtr meta_ptr_;
MemManagerPtr mem_mgr_;
MemManagerAbstractPtr mem_mgr_;
server::ThreadPool compact_thread_pool_;
std::list<std::future<void>> compact_thread_results_;
......
......@@ -291,6 +291,8 @@ Status DBMetaImpl::HasNonIndexFiles(const std::string& table_id, bool& has) {
try {
auto selected = ConnectorPtr->select(columns(&TableFileSchema::id_),
where((c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW
or
c(&TableFileSchema::file_type_) == (int) TableFileSchema::NEW
or
c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX)
and c(&TableFileSchema::table_id_) == table_id
......@@ -674,16 +676,22 @@ Status DBMetaImpl::Archive() {
Status DBMetaImpl::Size(uint64_t &result) {
result = 0;
try {
auto selected = ConnectorPtr->select(columns(sum(&TableFileSchema::size_)),
where(
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
));
auto files = ConnectorPtr->select(columns(&TableFileSchema::size_,
&TableFileSchema::file_type_,
&TableFileSchema::engine_type_),
where(
c(&TableFileSchema::file_type_) != (int) TableFileSchema::TO_DELETE
));
for (auto &sub_query : selected) {
if (!std::get<0>(sub_query)) {
continue;
for (auto &file : files) {
auto file_size = std::get<0>(file);
auto file_type = std::get<1>(file);
auto engine_type = std::get<2>(file);
if(file_type == (int)TableFileSchema::INDEX && engine_type == (int)EngineType::FAISS_IVFSQ8) {
result += (uint64_t)file_size/4;//hardcode for sq8
} else {
result += (uint64_t)file_size;
}
result += (uint64_t) (*std::get<0>(sub_query));
}
} catch (std::exception &e) {
return HandleException("Encounter exception when calculte db size", e);
......
......@@ -19,6 +19,7 @@ enum class EngineType {
FAISS_IDMAP = 1,
FAISS_IVFFLAT,
FAISS_IVFSQ8,
MAX_VALUE = FAISS_IVFSQ8,
};
class ExecutionEngine {
......
......@@ -3,10 +3,14 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <stdlib.h>
#include "Factories.h"
#include "DBImpl.h"
#include "MemManager.h"
#include "NewMemManager.h"
#include "Exception.h"
#include <stdlib.h>
#include <time.h>
#include <sstream>
#include <iostream>
......@@ -14,7 +18,9 @@
#include <assert.h>
#include <easylogging++.h>
#include <regex>
#include "Exception.h"
#include <cstdlib>
#include <string>
#include <algorithm>
namespace zilliz {
namespace milvus {
......@@ -72,17 +78,14 @@ std::shared_ptr<meta::Meta> DBMetaImplFactory::Build(const DBMetaOptions& metaOp
if (dialect.find("mysql") != std::string::npos) {
ENGINE_LOG_INFO << "Using MySQL";
return std::make_shared<meta::MySQLMetaImpl>(meta::MySQLMetaImpl(metaOptions, mode));
}
else if (dialect.find("sqlite") != std::string::npos) {
} else if (dialect.find("sqlite") != std::string::npos) {
ENGINE_LOG_INFO << "Using SQLite";
return std::make_shared<meta::DBMetaImpl>(meta::DBMetaImpl(metaOptions));
}
else {
} else {
ENGINE_LOG_ERROR << "Invalid dialect in URI: dialect = " << dialect;
throw InvalidArgumentException("URI dialect is not mysql / sqlite");
}
}
else {
} else {
ENGINE_LOG_ERROR << "Wrong URI format: URI = " << uri;
throw InvalidArgumentException("Wrong URI format ");
}
......@@ -98,6 +101,21 @@ DB* DBFactory::Build(const Options& options) {
return new DBImpl(options);
}
MemManagerAbstractPtr MemManagerFactory::Build(const std::shared_ptr<meta::Meta>& meta,
const Options& options) {
if (const char* env = getenv("MILVUS_USE_OLD_MEM_MANAGER")) {
std::string env_str = env;
std::transform(env_str.begin(), env_str.end(), env_str.begin(), ::toupper);
if (env_str == "ON") {
return std::make_shared<MemManager>(meta, options);
}
else {
return std::make_shared<NewMemManager>(meta, options);
}
}
return std::make_shared<NewMemManager>(meta, options);
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
......@@ -10,16 +10,18 @@
#include "MySQLMetaImpl.h"
#include "Options.h"
#include "ExecutionEngine.h"
#include "MemManagerAbstract.h"
#include <string>
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
struct DBMetaOptionsFactory {
static DBMetaOptions Build(const std::string& path = "");
static DBMetaOptions Build(const std::string &path = "");
};
struct OptionsFactory {
......@@ -28,12 +30,16 @@ struct OptionsFactory {
struct DBMetaImplFactory {
static std::shared_ptr<meta::DBMetaImpl> Build();
static std::shared_ptr<meta::Meta> Build(const DBMetaOptions& metaOptions, const int& mode);
static std::shared_ptr<meta::Meta> Build(const DBMetaOptions &metaOptions, const int &mode);
};
struct DBFactory {
static std::shared_ptr<DB> Build();
static DB* Build(const Options&);
static DB *Build(const Options &);
};
struct MemManagerFactory {
static MemManagerAbstractPtr Build(const std::shared_ptr<meta::Meta> &meta, const Options &options);
};
} // namespace engine
......
......@@ -110,7 +110,7 @@ Status FaissExecutionEngine::Merge(const std::string& location) {
if (location == location_) {
return Status::Error("Cannot Merge Self");
}
ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
ENGINE_LOG_DEBUG << "Merge raw file: " << location << " to: " << location_;
auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
if (!to_merge) {
......@@ -165,8 +165,9 @@ Status FaissExecutionEngine::Search(long n,
}
Status FaissExecutionEngine::Cache() {
zilliz::milvus::cache::CpuCacheMgr::GetInstance(
)->InsertItem(location_, std::make_shared<Index>(pIndex_));
auto index = std::make_shared<Index>(pIndex_);
cache::DataObjPtr data_obj = std::make_shared<cache::DataObj>(index, PhysicalSize());
zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, data_obj);
return Status::OK();
}
......
......@@ -8,28 +8,30 @@
#include "MetaConsts.h"
#include "EngineFactory.h"
#include "metrics/Metrics.h"
#include "Log.h"
#include <iostream>
#include <sstream>
#include <thread>
#include <easylogging++.h>
namespace zilliz {
namespace milvus {
namespace engine {
MemVectors::MemVectors(const std::shared_ptr<meta::Meta>& meta_ptr,
const meta::TableFileSchema& schema, const Options& options)
: meta_(meta_ptr),
options_(options),
schema_(schema),
id_generator_(new SimpleIDGenerator()),
active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType)schema_.engine_type_)) {
MemVectors::MemVectors(const std::shared_ptr<meta::Meta> &meta_ptr,
const meta::TableFileSchema &schema, const Options &options)
: meta_(meta_ptr),
options_(options),
schema_(schema),
id_generator_(new SimpleIDGenerator()),
active_engine_(EngineFactory::Build(schema_.dimension_, schema_.location_, (EngineType) schema_.engine_type_)) {
}
Status MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_) {
if(active_engine_ == nullptr) {
Status MemVectors::Add(size_t n_, const float *vectors_, IDNumbers &vector_ids_) {
if (active_engine_ == nullptr) {
return Status::Error("index engine is null");
}
......@@ -38,13 +40,15 @@ Status MemVectors::Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_)
Status status = active_engine_->AddWithIds(n_, vectors_, vector_ids_.data());
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_), static_cast<int>(schema_.dimension_), total_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(schema_.dimension_),
total_time);
return status;
}
size_t MemVectors::RowCount() const {
if(active_engine_ == nullptr) {
if (active_engine_ == nullptr) {
return 0;
}
......@@ -52,15 +56,15 @@ size_t MemVectors::RowCount() const {
}
size_t MemVectors::Size() const {
if(active_engine_ == nullptr) {
if (active_engine_ == nullptr) {
return 0;
}
return active_engine_->Size();
}
Status MemVectors::Serialize(std::string& table_id) {
if(active_engine_ == nullptr) {
Status MemVectors::Serialize(std::string &table_id) {
if (active_engine_ == nullptr) {
return Status::Error("index engine is null");
}
......@@ -72,15 +76,16 @@ Status MemVectors::Serialize(std::string& table_id) {
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
schema_.size_ = size;
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet(size/total_time);
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet(size / total_time);
schema_.file_type_ = (size >= options_.index_trigger_size) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
auto status = meta_->UpdateTableFile(schema_);
LOG(DEBUG) << "New " << ((schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << schema_.file_id_ << " of size " << (double)(active_engine_->Size()) / (double)meta::M << " M";
<< " file " << schema_.file_id_ << " of size " << (double) (active_engine_->Size()) / (double) meta::M
<< " M";
active_engine_->Cache();
......@@ -98,7 +103,7 @@ MemVectors::~MemVectors() {
* MemManager
*/
MemManager::MemVectorsPtr MemManager::GetMemByTable(
const std::string& table_id) {
const std::string &table_id) {
auto memIt = mem_id_map_.find(table_id);
if (memIt != mem_id_map_.end()) {
return memIt->second;
......@@ -115,26 +120,28 @@ MemManager::MemVectorsPtr MemManager::GetMemByTable(
return mem_id_map_[table_id];
}
Status MemManager::InsertVectors(const std::string& table_id_,
size_t n_,
const float* vectors_,
IDNumbers& vector_ids_) {
Status MemManager::InsertVectors(const std::string &table_id_,
size_t n_,
const float *vectors_,
IDNumbers &vector_ids_) {
std::unique_lock<std::mutex> lock(mutex_);
return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
}
Status MemManager::InsertVectorsNoLock(const std::string& table_id,
size_t n,
const float* vectors,
IDNumbers& vector_ids) {
Status MemManager::InsertVectorsNoLock(const std::string &table_id,
size_t n,
const float *vectors,
IDNumbers &vector_ids) {
MemVectorsPtr mem = GetMemByTable(table_id);
if (mem == nullptr) {
return Status::NotFound("Group " + table_id + " not found!");
}
//makesure each file size less than index_trigger_size
if(mem->Size() > options_.index_trigger_size) {
if (mem->Size() > options_.index_trigger_size) {
std::unique_lock<std::mutex> lock(serialization_mtx_);
immu_mem_list_.push_back(mem);
mem_id_map_.erase(table_id);
......@@ -147,8 +154,8 @@ Status MemManager::InsertVectorsNoLock(const std::string& table_id,
Status MemManager::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_);
MemIdMap temp_map;
for (auto& kv: mem_id_map_) {
if(kv.second->RowCount() == 0) {
for (auto &kv: mem_id_map_) {
if (kv.second->RowCount() == 0) {
temp_map.insert(kv);
continue;//empty vector, no need to serialize
}
......@@ -159,12 +166,12 @@ Status MemManager::ToImmutable() {
return Status::OK();
}
Status MemManager::Serialize(std::set<std::string>& table_ids) {
Status MemManager::Serialize(std::set<std::string> &table_ids) {
ToImmutable();
std::unique_lock<std::mutex> lock(serialization_mtx_);
std::string table_id;
table_ids.clear();
for (auto& mem : immu_mem_list_) {
for (auto &mem : immu_mem_list_) {
mem->Serialize(table_id);
table_ids.insert(table_id);
}
......@@ -172,7 +179,7 @@ Status MemManager::Serialize(std::set<std::string>& table_ids) {
return Status::OK();
}
Status MemManager::EraseMemVector(const std::string& table_id) {
Status MemManager::EraseMemVector(const std::string &table_id) {
{//erase MemVector from rapid-insert cache
std::unique_lock<std::mutex> lock(mutex_);
mem_id_map_.erase(table_id);
......@@ -181,8 +188,8 @@ Status MemManager::EraseMemVector(const std::string& table_id) {
{//erase MemVector from serialize cache
std::unique_lock<std::mutex> lock(serialization_mtx_);
MemList temp_list;
for (auto& mem : immu_mem_list_) {
if(mem->TableId() != table_id) {
for (auto &mem : immu_mem_list_) {
if (mem->TableId() != table_id) {
temp_list.push_back(mem);
}
}
......@@ -192,6 +199,26 @@ Status MemManager::EraseMemVector(const std::string& table_id) {
return Status::OK();
}
size_t MemManager::GetCurrentMutableMem() {
size_t totalMem = 0;
for (auto &kv : mem_id_map_) {
auto memVector = kv.second;
totalMem += memVector->Size();
}
return totalMem;
}
size_t MemManager::GetCurrentImmutableMem() {
size_t totalMem = 0;
for (auto &memVector : immu_mem_list_) {
totalMem += memVector->Size();
}
return totalMem;
}
size_t MemManager::GetCurrentMem() {
return GetCurrentMutableMem() + GetCurrentImmutableMem();
}
} // namespace engine
} // namespace milvus
......
......@@ -9,6 +9,7 @@
#include "IDGenerator.h"
#include "Status.h"
#include "Meta.h"
#include "MemManagerAbstract.h"
#include <map>
#include <string>
......@@ -17,72 +18,79 @@
#include <mutex>
#include <set>
namespace zilliz {
namespace milvus {
namespace engine {
namespace meta {
class Meta;
class Meta;
}
class MemVectors {
public:
public:
using MetaPtr = meta::Meta::Ptr;
using Ptr = std::shared_ptr<MemVectors>;
explicit MemVectors(const std::shared_ptr<meta::Meta>&,
const meta::TableFileSchema&, const Options&);
explicit MemVectors(const std::shared_ptr<meta::Meta> &,
const meta::TableFileSchema &, const Options &);
Status Add(size_t n_, const float* vectors_, IDNumbers& vector_ids_);
Status Add(size_t n_, const float *vectors_, IDNumbers &vector_ids_);
size_t RowCount() const;
size_t Size() const;
Status Serialize(std::string& table_id);
Status Serialize(std::string &table_id);
~MemVectors();
const std::string& Location() const { return schema_.location_; }
const std::string &Location() const { return schema_.location_; }
std::string TableId() const { return schema_.table_id_; }
private:
private:
MemVectors() = delete;
MemVectors(const MemVectors&) = delete;
MemVectors& operator=(const MemVectors&) = delete;
MemVectors(const MemVectors &) = delete;
MemVectors &operator=(const MemVectors &) = delete;
MetaPtr meta_;
Options options_;
meta::TableFileSchema schema_;
IDGenerator* id_generator_;
IDGenerator *id_generator_;
ExecutionEnginePtr active_engine_;
}; // MemVectors
class MemManager {
public:
class MemManager : public MemManagerAbstract {
public:
using MetaPtr = meta::Meta::Ptr;
using MemVectorsPtr = typename MemVectors::Ptr;
using Ptr = std::shared_ptr<MemManager>;
MemManager(const std::shared_ptr<meta::Meta>& meta, const Options& options)
MemManager(const std::shared_ptr<meta::Meta> &meta, const Options &options)
: meta_(meta), options_(options) {}
MemVectorsPtr GetMemByTable(const std::string& table_id);
Status InsertVectors(const std::string &table_id,
size_t n, const float *vectors, IDNumbers &vector_ids) override;
Status Serialize(std::set<std::string> &table_ids) override;
Status EraseMemVector(const std::string &table_id) override;
size_t GetCurrentMutableMem() override;
Status InsertVectors(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids);
size_t GetCurrentImmutableMem() override;
Status Serialize(std::set<std::string>& table_ids);
size_t GetCurrentMem() override;
Status EraseMemVector(const std::string& table_id);
private:
MemVectorsPtr GetMemByTable(const std::string &table_id);
private:
Status InsertVectorsNoLock(const std::string& table_id,
size_t n, const float* vectors, IDNumbers& vector_ids);
Status InsertVectorsNoLock(const std::string &table_id,
size_t n, const float *vectors, IDNumbers &vector_ids);
Status ToImmutable();
using MemIdMap = std::map<std::string, MemVectorsPtr>;
......
#pragma once
#include <set>
namespace zilliz {
namespace milvus {
namespace engine {
class MemManagerAbstract {
public:
virtual Status InsertVectors(const std::string &table_id,
size_t n, const float *vectors, IDNumbers &vector_ids) = 0;
virtual Status Serialize(std::set<std::string> &table_ids) = 0;
virtual Status EraseMemVector(const std::string &table_id) = 0;
virtual size_t GetCurrentMutableMem() = 0;
virtual size_t GetCurrentImmutableMem() = 0;
virtual size_t GetCurrentMem() = 0;
}; // MemManagerAbstract
using MemManagerAbstractPtr = std::shared_ptr<MemManagerAbstract>;
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
#include "MemTable.h"
#include "Log.h"
namespace zilliz {
namespace milvus {
namespace engine {
MemTable::MemTable(const std::string &table_id,
const std::shared_ptr<meta::Meta> &meta,
const Options &options) :
table_id_(table_id),
meta_(meta),
options_(options) {
}
Status MemTable::Add(VectorSource::Ptr &source) {
while (!source->AllAdded()) {
MemTableFile::Ptr current_mem_table_file;
if (!mem_table_file_list_.empty()) {
current_mem_table_file = mem_table_file_list_.back();
}
Status status;
if (mem_table_file_list_.empty() || current_mem_table_file->IsFull()) {
MemTableFile::Ptr new_mem_table_file = std::make_shared<MemTableFile>(table_id_, meta_, options_);
status = new_mem_table_file->Add(source);
if (status.ok()) {
mem_table_file_list_.emplace_back(new_mem_table_file);
}
} else {
status = current_mem_table_file->Add(source);
}
if (!status.ok()) {
std::string err_msg = "MemTable::Add failed: " + status.ToString();
ENGINE_LOG_ERROR << err_msg;
return Status::Error(err_msg);
}
}
return Status::OK();
}
void MemTable::GetCurrentMemTableFile(MemTableFile::Ptr &mem_table_file) {
mem_table_file = mem_table_file_list_.back();
}
size_t MemTable::GetTableFileCount() {
return mem_table_file_list_.size();
}
Status MemTable::Serialize() {
for (auto mem_table_file = mem_table_file_list_.begin(); mem_table_file != mem_table_file_list_.end();) {
auto status = (*mem_table_file)->Serialize();
if (!status.ok()) {
std::string err_msg = "MemTable::Serialize failed: " + status.ToString();
ENGINE_LOG_ERROR << err_msg;
return Status::Error(err_msg);
}
std::lock_guard<std::mutex> lock(mutex_);
mem_table_file = mem_table_file_list_.erase(mem_table_file);
}
return Status::OK();
}
bool MemTable::Empty() {
return mem_table_file_list_.empty();
}
const std::string &MemTable::GetTableId() const {
return table_id_;
}
size_t MemTable::GetCurrentMem() {
std::lock_guard<std::mutex> lock(mutex_);
size_t total_mem = 0;
for (auto &mem_table_file : mem_table_file_list_) {
total_mem += mem_table_file->GetCurrentMem();
}
return total_mem;
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
#pragma once
#include "Status.h"
#include "MemTableFile.h"
#include "VectorSource.h"
#include <mutex>
namespace zilliz {
namespace milvus {
namespace engine {
class MemTable {
public:
using Ptr = std::shared_ptr<MemTable>;
using MemTableFileList = std::vector<MemTableFile::Ptr>;
using MetaPtr = meta::Meta::Ptr;
MemTable(const std::string &table_id, const std::shared_ptr<meta::Meta> &meta, const Options &options);
Status Add(VectorSource::Ptr &source);
void GetCurrentMemTableFile(MemTableFile::Ptr &mem_table_file);
size_t GetTableFileCount();
Status Serialize();
bool Empty();
const std::string &GetTableId() const;
size_t GetCurrentMem();
private:
const std::string table_id_;
MemTableFileList mem_table_file_list_;
MetaPtr meta_;
Options options_;
std::mutex mutex_;
}; //MemTable
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
#include "MemTableFile.h"
#include "Constants.h"
#include "Log.h"
#include "EngineFactory.h"
#include "metrics/Metrics.h"
#include <cmath>
namespace zilliz {
namespace milvus {
namespace engine {
MemTableFile::MemTableFile(const std::string &table_id,
const std::shared_ptr<meta::Meta> &meta,
const Options &options) :
table_id_(table_id),
meta_(meta),
options_(options) {
current_mem_ = 0;
auto status = CreateTableFile();
if (status.ok()) {
execution_engine_ = EngineFactory::Build(table_file_schema_.dimension_,
table_file_schema_.location_,
(EngineType) table_file_schema_.engine_type_);
}
}
Status MemTableFile::CreateTableFile() {
meta::TableFileSchema table_file_schema;
table_file_schema.table_id_ = table_id_;
auto status = meta_->CreateTableFile(table_file_schema);
if (status.ok()) {
table_file_schema_ = table_file_schema;
} else {
std::string err_msg = "MemTableFile::CreateTableFile failed: " + status.ToString();
ENGINE_LOG_ERROR << err_msg;
}
return status;
}
Status MemTableFile::Add(const VectorSource::Ptr &source) {
if (table_file_schema_.dimension_ <= 0) {
std::string err_msg = "MemTableFile::Add: table_file_schema dimension = " +
std::to_string(table_file_schema_.dimension_) + ", table_id = " + table_file_schema_.table_id_;
ENGINE_LOG_ERROR << err_msg;
return Status::Error(err_msg);
}
size_t single_vector_mem_size = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE;
size_t mem_left = GetMemLeft();
if (mem_left >= single_vector_mem_size) {
size_t num_vectors_to_add = std::ceil(mem_left / single_vector_mem_size);
size_t num_vectors_added;
auto status = source->Add(execution_engine_, table_file_schema_, num_vectors_to_add, num_vectors_added);
if (status.ok()) {
current_mem_ += (num_vectors_added * single_vector_mem_size);
}
return status;
}
return Status::OK();
}
size_t MemTableFile::GetCurrentMem() {
return current_mem_;
}
size_t MemTableFile::GetMemLeft() {
return (MAX_TABLE_FILE_MEM - current_mem_);
}
bool MemTableFile::IsFull() {
size_t single_vector_mem_size = table_file_schema_.dimension_ * VECTOR_TYPE_SIZE;
return (GetMemLeft() < single_vector_mem_size);
}
Status MemTableFile::Serialize() {
auto start_time = METRICS_NOW_TIME;
auto size = GetCurrentMem();
execution_engine_->Serialize();
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
table_file_schema_.size_ = size;
server::Metrics::GetInstance().DiskStoreIOSpeedGaugeSet((double) size / total_time);
table_file_schema_.file_type_ = (size >= options_.index_trigger_size) ?
meta::TableFileSchema::TO_INDEX : meta::TableFileSchema::RAW;
auto status = meta_->UpdateTableFile(table_file_schema_);
LOG(DEBUG) << "New " << ((table_file_schema_.file_type_ == meta::TableFileSchema::RAW) ? "raw" : "to_index")
<< " file " << table_file_schema_.file_id_ << " of size " << (double) size / (double) M << " M";
execution_engine_->Cache();
return status;
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
#pragma once
#include "Status.h"
#include "Meta.h"
#include "VectorSource.h"
#include "ExecutionEngine.h"
namespace zilliz {
namespace milvus {
namespace engine {
class MemTableFile {
public:
using Ptr = std::shared_ptr<MemTableFile>;
using MetaPtr = meta::Meta::Ptr;
MemTableFile(const std::string &table_id, const std::shared_ptr<meta::Meta> &meta, const Options &options);
Status Add(const VectorSource::Ptr &source);
size_t GetCurrentMem();
size_t GetMemLeft();
bool IsFull();
Status Serialize();
private:
Status CreateTableFile();
const std::string table_id_;
meta::TableFileSchema table_file_schema_;
MetaPtr meta_;
Options options_;
size_t current_mem_;
ExecutionEnginePtr execution_engine_;
}; //MemTableFile
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
此差异已折叠。
#include "NewMemManager.h"
#include "VectorSource.h"
#include "Log.h"
#include "Constants.h"
#include <thread>
namespace zilliz {
namespace milvus {
namespace engine {
NewMemManager::MemTablePtr NewMemManager::GetMemByTable(const std::string &table_id) {
auto memIt = mem_id_map_.find(table_id);
if (memIt != mem_id_map_.end()) {
return memIt->second;
}
mem_id_map_[table_id] = std::make_shared<MemTable>(table_id, meta_, options_);
return mem_id_map_[table_id];
}
Status NewMemManager::InsertVectors(const std::string &table_id_,
size_t n_,
const float *vectors_,
IDNumbers &vector_ids_) {
while (GetCurrentMem() > options_.insert_buffer_size) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::unique_lock<std::mutex> lock(mutex_);
return InsertVectorsNoLock(table_id_, n_, vectors_, vector_ids_);
}
Status NewMemManager::InsertVectorsNoLock(const std::string &table_id,
size_t n,
const float *vectors,
IDNumbers &vector_ids) {
MemTablePtr mem = GetMemByTable(table_id);
VectorSource::Ptr source = std::make_shared<VectorSource>(n, vectors);
auto status = mem->Add(source);
if (status.ok()) {
vector_ids = source->GetVectorIds();
}
return status;
}
Status NewMemManager::ToImmutable() {
std::unique_lock<std::mutex> lock(mutex_);
MemIdMap temp_map;
for (auto &kv: mem_id_map_) {
if (kv.second->Empty()) {
//empty table, no need to serialize
temp_map.insert(kv);
} else {
immu_mem_list_.push_back(kv.second);
}
}
mem_id_map_.swap(temp_map);
return Status::OK();
}
Status NewMemManager::Serialize(std::set<std::string> &table_ids) {
ToImmutable();
std::unique_lock<std::mutex> lock(serialization_mtx_);
table_ids.clear();
for (auto &mem : immu_mem_list_) {
mem->Serialize();
table_ids.insert(mem->GetTableId());
}
immu_mem_list_.clear();
return Status::OK();
}
Status NewMemManager::EraseMemVector(const std::string &table_id) {
{//erase MemVector from rapid-insert cache
std::unique_lock<std::mutex> lock(mutex_);
mem_id_map_.erase(table_id);
}
{//erase MemVector from serialize cache
std::unique_lock<std::mutex> lock(serialization_mtx_);
MemList temp_list;
for (auto &mem : immu_mem_list_) {
if (mem->GetTableId() != table_id) {
temp_list.push_back(mem);
}
}
immu_mem_list_.swap(temp_list);
}
return Status::OK();
}
size_t NewMemManager::GetCurrentMutableMem() {
size_t total_mem = 0;
for (auto &kv : mem_id_map_) {
auto memTable = kv.second;
total_mem += memTable->GetCurrentMem();
}
return total_mem;
}
size_t NewMemManager::GetCurrentImmutableMem() {
size_t total_mem = 0;
for (auto &mem_table : immu_mem_list_) {
total_mem += mem_table->GetCurrentMem();
}
return total_mem;
}
size_t NewMemManager::GetCurrentMem() {
return GetCurrentMutableMem() + GetCurrentImmutableMem();
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
#pragma once
#include "Meta.h"
#include "MemTable.h"
#include "Status.h"
#include "MemManagerAbstract.h"
#include <map>
#include <string>
#include <ctime>
#include <memory>
#include <mutex>
namespace zilliz {
namespace milvus {
namespace engine {
class NewMemManager : public MemManagerAbstract {
public:
using MetaPtr = meta::Meta::Ptr;
using Ptr = std::shared_ptr<NewMemManager>;
using MemTablePtr = typename MemTable::Ptr;
NewMemManager(const std::shared_ptr<meta::Meta> &meta, const Options &options)
: meta_(meta), options_(options) {}
Status InsertVectors(const std::string &table_id,
size_t n, const float *vectors, IDNumbers &vector_ids) override;
Status Serialize(std::set<std::string> &table_ids) override;
Status EraseMemVector(const std::string &table_id) override;
size_t GetCurrentMutableMem() override;
size_t GetCurrentImmutableMem() override;
size_t GetCurrentMem() override;
private:
MemTablePtr GetMemByTable(const std::string &table_id);
Status InsertVectorsNoLock(const std::string &table_id,
size_t n, const float *vectors, IDNumbers &vector_ids);
Status ToImmutable();
using MemIdMap = std::map<std::string, MemTablePtr>;
using MemList = std::vector<MemTablePtr>;
MemIdMap mem_id_map_;
MemList immu_mem_list_;
MetaPtr meta_;
Options options_;
std::mutex mutex_;
std::mutex serialization_mtx_;
}; // NewMemManager
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
......@@ -41,6 +41,10 @@ void ArchiveConf::ParseCritirias(const std::string& criterias) {
}
for (auto& token : tokens) {
if(token.empty()) {
continue;
}
std::vector<std::string> kv;
boost::algorithm::split(kv, token, boost::is_any_of(":"));
if (kv.size() != 2) {
......
......@@ -22,7 +22,7 @@ static constexpr uint64_t ONE_GB = ONE_KB*ONE_MB;
static const std::string ARCHIVE_CONF_DISK = "disk";
static const std::string ARCHIVE_CONF_DAYS = "days";
static const std::string ARCHIVE_CONF_DEFAULT = ARCHIVE_CONF_DISK + ":512";
static const std::string ARCHIVE_CONF_DEFAULT = "";
struct ArchiveConf {
using CriteriaT = std::map<std::string, int>;
......@@ -63,6 +63,7 @@ struct Options {
size_t index_trigger_size = ONE_GB; //unit: byte
DBMetaOptions meta;
int mode = MODE::SINGLE;
size_t insert_buffer_size = 4 * ONE_GB;
}; // Options
......
#include "VectorSource.h"
#include "ExecutionEngine.h"
#include "EngineFactory.h"
#include "Log.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace milvus {
namespace engine {
VectorSource::VectorSource(const size_t &n,
const float *vectors) :
n_(n),
vectors_(vectors),
id_generator_(new SimpleIDGenerator()) {
current_num_vectors_added = 0;
}
Status VectorSource::Add(const ExecutionEnginePtr &execution_engine,
const meta::TableFileSchema &table_file_schema,
const size_t &num_vectors_to_add,
size_t &num_vectors_added) {
auto start_time = METRICS_NOW_TIME;
num_vectors_added = current_num_vectors_added + num_vectors_to_add <= n_ ?
num_vectors_to_add : n_ - current_num_vectors_added;
IDNumbers vector_ids_to_add;
id_generator_->GetNextIDNumbers(num_vectors_added, vector_ids_to_add);
Status status = execution_engine->AddWithIds(num_vectors_added,
vectors_ + current_num_vectors_added * table_file_schema.dimension_,
vector_ids_to_add.data());
if (status.ok()) {
current_num_vectors_added += num_vectors_added;
vector_ids_.insert(vector_ids_.end(),
std::make_move_iterator(vector_ids_to_add.begin()),
std::make_move_iterator(vector_ids_to_add.end()));
} else {
ENGINE_LOG_ERROR << "VectorSource::Add failed: " + status.ToString();
}
auto end_time = METRICS_NOW_TIME;
auto total_time = METRICS_MICROSECONDS(start_time, end_time);
server::Metrics::GetInstance().AddVectorsPerSecondGaugeSet(static_cast<int>(n_),
static_cast<int>(table_file_schema.dimension_),
total_time);
return status;
}
size_t VectorSource::GetNumVectorsAdded() {
return current_num_vectors_added;
}
bool VectorSource::AllAdded() {
return (current_num_vectors_added == n_);
}
IDNumbers VectorSource::GetVectorIds() {
return vector_ids_;
}
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
#pragma once
#include "Status.h"
#include "Meta.h"
#include "IDGenerator.h"
#include "ExecutionEngine.h"
namespace zilliz {
namespace milvus {
namespace engine {
class VectorSource {
public:
using Ptr = std::shared_ptr<VectorSource>;
VectorSource(const size_t &n, const float *vectors);
Status Add(const ExecutionEnginePtr &execution_engine,
const meta::TableFileSchema &table_file_schema,
const size_t &num_vectors_to_add,
size_t &num_vectors_added);
size_t GetNumVectorsAdded();
bool AllAdded();
IDNumbers GetVectorIds();
private:
const size_t n_;
const float *vectors_;
IDNumbers vector_ids_;
size_t current_num_vectors_added;
IDGenerator *id_generator_;
}; //VectorSource
} // namespace engine
} // namespace milvus
} // namespace zilliz
\ No newline at end of file
......@@ -107,7 +107,7 @@ Status SearchTask::ClusterResult(const std::vector<long> &output_ids,
uint64_t nq,
uint64_t topk,
SearchContext::ResultSet &result_set) {
if(output_ids.size() != nq*topk || output_distence.size() != nq*topk) {
if(output_ids.size() < nq*topk || output_distence.size() < nq*topk) {
std::string msg = "Invalid id array size: " + std::to_string(output_ids.size()) +
" distance array size: " + std::to_string(output_distence.size());
SERVER_LOG_ERROR << msg;
......
......@@ -23,6 +23,7 @@ namespace {
static constexpr int64_t TOP_K = 10;
static constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
static constexpr int64_t ADD_VECTOR_LOOP = 10;
static constexpr int64_t SECONDS_EACH_HOUR = 3600;
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
......@@ -59,7 +60,7 @@ namespace {
std::string CurrentTime() {
time_t tt;
time( &tt );
tt = tt + 8*3600;
tt = tt + 8*SECONDS_EACH_HOUR;
tm* t= gmtime( &tt );
std::string str = std::to_string(t->tm_year + 1900) + "_" + std::to_string(t->tm_mon + 1)
......@@ -69,10 +70,11 @@ namespace {
return str;
}
std::string CurrentTmDate() {
std::string CurrentTmDate(int64_t offset_day = 0) {
time_t tt;
time( &tt );
tt = tt + 8*3600;
tt = tt + 8*SECONDS_EACH_HOUR;
tt = tt + 24*SECONDS_EACH_HOUR*offset_day;
tm* t= gmtime( &tt );
std::string str = std::to_string(t->tm_year + 1900) + "-" + std::to_string(t->tm_mon + 1)
......@@ -160,7 +162,7 @@ namespace {
std::vector<Range> query_range_array;
Range rg;
rg.start_value = CurrentTmDate();
rg.end_value = CurrentTmDate();
rg.end_value = CurrentTmDate(1);
query_range_array.emplace_back(rg);
std::vector<RowRecord> record_array;
......
......@@ -209,17 +209,25 @@ ClientProxy::SearchVector(const std::string &table_name,
}
//step 3: search vectors
std::vector<thrift::TopKQueryResult> result_array;
ClientPtr()->interface()->SearchVector(result_array, table_name, thrift_records, thrift_ranges, topk);
std::vector<thrift::TopKQueryBinResult> result_array;
ClientPtr()->interface()->SearchVector2(result_array, table_name, thrift_records, thrift_ranges, topk);
//step 4: convert result array
for(auto& thrift_topk_result : result_array) {
TopKQueryResult result;
for(auto& thrift_query_result : thrift_topk_result.query_result_arrays) {
size_t id_count = thrift_topk_result.id_array.size()/sizeof(int64_t);
size_t dist_count = thrift_topk_result.distance_array.size()/ sizeof(double);
if(id_count != dist_count) {
return Status(StatusCode::UnknownError, "illegal result");
}
int64_t* id_ptr = (int64_t*)thrift_topk_result.id_array.data();
double* dist_ptr = (double*)thrift_topk_result.distance_array.data();
for(size_t i = 0; i < id_count; i++) {
QueryResult query_result;
query_result.id = thrift_query_result.id;
query_result.distance = thrift_query_result.distance;
query_result.id = id_ptr[i];
query_result.distance = dist_ptr[i];
result.query_result_arrays.emplace_back(query_result);
}
......
......@@ -28,6 +28,14 @@ DBWrapper::DBWrapper() {
if(index_size > 0) {//ensure larger than zero, unit is MB
opt.index_trigger_size = (size_t)index_size * engine::ONE_MB;
}
int64_t insert_buffer_size = config.GetInt64Value(CONFIG_DB_INSERT_BUFFER_SIZE, 4);
if (insert_buffer_size >= 1) {
opt.insert_buffer_size = insert_buffer_size * engine::ONE_GB;
}
else {
std::cout << "ERROR: insert_buffer_size should be at least 1 GB" << std::endl;
kill(0, SIGUSR1);
}
ConfigNode& serverConfig = ServerConfig::GetInstance().GetConfig(CONFIG_SERVER);
std::string mode = serverConfig.GetValue(CONFIG_CLUSTER_MODE, "single");
......
......@@ -8,6 +8,7 @@
#include "ServerConfig.h"
#include "ThreadPoolServer.h"
#include "DBWrapper.h"
#include "utils/Log.h"
#include "milvus_types.h"
#include "milvus_constants.h"
......@@ -75,7 +76,7 @@ MilvusServer::StartService() {
return;
}
stdcxx::shared_ptr<ThreadManager> threadManager(ThreadManager::newSimpleThreadManager());
stdcxx::shared_ptr<ThreadManager> threadManager(ThreadManager::newSimpleThreadManager(16));
stdcxx::shared_ptr<PosixThreadFactory> threadFactory(new PosixThreadFactory());
threadManager->threadFactory(threadFactory);
threadManager->start();
......
......@@ -60,11 +60,22 @@ RequestHandler::SearchVector(std::vector<thrift::TopKQueryResult> &_return,
const std::vector<thrift::Range> &query_range_array,
const int64_t topk) {
// SERVER_LOG_DEBUG << "Entering RequestHandler::SearchVector";
BaseTaskPtr task_ptr = SearchVectorTask::Create(table_name, std::vector<std::string>(), query_record_array,
BaseTaskPtr task_ptr = SearchVectorTask1::Create(table_name, std::vector<std::string>(), query_record_array,
query_range_array, topk, _return);
RequestScheduler::ExecTask(task_ptr);
}
void
RequestHandler::SearchVector2(std::vector<thrift::TopKQueryBinResult> & _return,
const std::string& table_name,
const std::vector<thrift::RowRecord> & query_record_array,
const std::vector<thrift::Range> & query_range_array,
const int64_t topk) {
BaseTaskPtr task_ptr = SearchVectorTask2::Create(table_name, std::vector<std::string>(), query_record_array,
query_range_array, topk, _return);
RequestScheduler::ExecTask(task_ptr);
}
void
RequestHandler::SearchVectorInFiles(std::vector<::milvus::thrift::TopKQueryResult> &_return,
const std::string& table_name,
......@@ -73,7 +84,7 @@ RequestHandler::SearchVectorInFiles(std::vector<::milvus::thrift::TopKQueryResul
const std::vector<::milvus::thrift::Range> &query_range_array,
const int64_t topk) {
// SERVER_LOG_DEBUG << "Entering RequestHandler::SearchVectorInFiles. file_id_array size = " << std::to_string(file_id_array.size());
BaseTaskPtr task_ptr = SearchVectorTask::Create(table_name, file_id_array, query_record_array,
BaseTaskPtr task_ptr = SearchVectorTask1::Create(table_name, file_id_array, query_record_array,
query_range_array, topk, _return);
RequestScheduler::ExecTask(task_ptr);
}
......
......@@ -106,6 +106,29 @@ public:
const std::vector<::milvus::thrift::Range> & query_range_array,
const int64_t topk);
/**
* @brief Query vector
*
* This method is used to query vector in table.
*
* @param table_name, table_name is queried.
* @param query_record_array, all vector are going to be queried.
* @param query_range_array, optional ranges for conditional search. If not specified, search whole table
* @param topk, how many similarity vectors will be searched.
*
* @return query binary result array.
*
* @param table_name
* @param query_record_array
* @param query_range_array
* @param topk
*/
void SearchVector2(std::vector<::milvus::thrift::TopKQueryBinResult> & _return,
const std::string& table_name,
const std::vector<::milvus::thrift::RowRecord> & query_record_array,
const std::vector<::milvus::thrift::Range> & query_range_array,
const int64_t topk);
/**
* @brief Internal use query interface
*
......
......@@ -8,6 +8,7 @@
#include "utils/CommonUtil.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "utils/ValidationUtil.h"
#include "DBWrapper.h"
#include "version.h"
......@@ -114,7 +115,13 @@ namespace {
}
long days = (tt_end > tt_start) ? (tt_end - tt_start)/DAY_SECONDS : (tt_start - tt_end)/DAY_SECONDS;
for(long i = 0; i <= days; i++) {
if(days == 0) {
error_code = SERVER_INVALID_TIME_RANGE;
error_msg = "Invalid time range: " + range.start_value + " to " + range.end_value;
return ;
}
for(long i = 0; i < days; i++) {
time_t tt_day = tt_start + DAY_SECONDS*i;
tm tm_day;
CommonUtil::ConvertTime(tt_day, tm_day);
......@@ -154,16 +161,20 @@ ServerError CreateTableTask::OnExecute() {
try {
//step 1: check arguments
if(schema_.table_name.empty()) {
return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
ServerError res = SERVER_SUCCESS;
res = ValidateTableName(schema_.table_name);
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid table name: " + schema_.table_name);
}
if(schema_.dimension <= 0) {
return SetError(SERVER_INVALID_TABLE_DIMENSION, "Invalid table dimension: " + std::to_string(schema_.dimension));
res = ValidateTableDimension(schema_.dimension);
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid table dimension: " + std::to_string(schema_.dimension));
}
engine::EngineType engine_type = EngineType(schema_.index_type);
if(engine_type == engine::EngineType::INVALID) {
return SetError(SERVER_INVALID_INDEX_TYPE, "Invalid index type: " + std::to_string(schema_.index_type));
res = ValidateTableIndexType(schema_.index_type);
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid index type: " + std::to_string(schema_.index_type));
}
//step 2: construct table schema
......@@ -205,8 +216,10 @@ ServerError DescribeTableTask::OnExecute() {
try {
//step 1: check arguments
if(table_name_.empty()) {
return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
ServerError res = SERVER_SUCCESS;
res = ValidateTableName(table_name_);
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid table name: " + table_name_);
}
//step 2: get table info
......@@ -246,12 +259,20 @@ ServerError BuildIndexTask::OnExecute() {
TimeRecorder rc("BuildIndexTask");
//step 1: check arguments
if(table_name_.empty()) {
return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
ServerError res = SERVER_SUCCESS;
res = ValidateTableName(table_name_);
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid table name: " + table_name_);
}
bool has_table = false;
engine::Status stat = DBWrapper::DB()->HasTable(table_name_, has_table);
if(!has_table) {
return SetError(SERVER_TABLE_NOT_EXIST, "Table " + table_name_ + " not exists");
}
//step 2: check table existence
engine::Status stat = DBWrapper::DB()->BuildIndex(table_name_);
stat = DBWrapper::DB()->BuildIndex(table_name_);
if(!stat.ok()) {
return SetError(SERVER_BUILD_INDEX_ERROR, "Engine failed: " + stat.ToString());
}
......@@ -281,8 +302,10 @@ ServerError HasTableTask::OnExecute() {
TimeRecorder rc("HasTableTask");
//step 1: check arguments
if(table_name_.empty()) {
return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
ServerError res = SERVER_SUCCESS;
res = ValidateTableName(table_name_);
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid table name: " + table_name_);
}
//step 2: check table existence
......@@ -315,8 +338,10 @@ ServerError DeleteTableTask::OnExecute() {
TimeRecorder rc("DeleteTableTask");
//step 1: check arguments
if (table_name_.empty()) {
return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
ServerError res = SERVER_SUCCESS;
res = ValidateTableName(table_name_);
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid table name: " + table_name_);
}
//step 2: check table existence
......@@ -341,7 +366,7 @@ ServerError DeleteTableTask::OnExecute() {
}
rc.Record("deleta table");
rc.Elapse("totally cost");
rc.Elapse("total cost");
} catch (std::exception& ex) {
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
}
......@@ -397,8 +422,10 @@ ServerError AddVectorTask::OnExecute() {
TimeRecorder rc("AddVectorTask");
//step 1: check arguments
if (table_name_.empty()) {
return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
ServerError res = SERVER_SUCCESS;
res = ValidateTableName(table_name_);
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid table name: " + table_name_);
}
if(record_array_.empty()) {
......@@ -455,7 +482,7 @@ ServerError AddVectorTask::OnExecute() {
#endif
rc.Record("do insert");
rc.Elapse("totally cost");
rc.Elapse("total cost");
} catch (std::exception& ex) {
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
......@@ -465,39 +492,29 @@ ServerError AddVectorTask::OnExecute() {
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
SearchVectorTask::SearchVectorTask(const std::string &table_name,
const std::vector<std::string>& file_id_array,
const std::vector<thrift::RowRecord> &query_record_array,
const std::vector<thrift::Range> &query_range_array,
const int64_t top_k,
std::vector<thrift::TopKQueryResult> &result_array)
SearchVectorTaskBase::SearchVectorTaskBase(const std::string &table_name,
const std::vector<std::string>& file_id_array,
const std::vector<thrift::RowRecord> &query_record_array,
const std::vector<thrift::Range> &query_range_array,
const int64_t top_k)
: BaseTask(DQL_TASK_GROUP),
table_name_(table_name),
file_id_array_(file_id_array),
record_array_(query_record_array),
range_array_(query_range_array),
top_k_(top_k),
result_array_(result_array) {
top_k_(top_k) {
}
BaseTaskPtr SearchVectorTask::Create(const std::string& table_name,
const std::vector<std::string>& file_id_array,
const std::vector<thrift::RowRecord> & query_record_array,
const std::vector<thrift::Range> & query_range_array,
const int64_t top_k,
std::vector<thrift::TopKQueryResult>& result_array) {
return std::shared_ptr<BaseTask>(new SearchVectorTask(table_name, file_id_array,
query_record_array, query_range_array, top_k, result_array));
}
ServerError SearchVectorTask::OnExecute() {
ServerError SearchVectorTaskBase::OnExecute() {
try {
TimeRecorder rc("SearchVectorTask");
//step 1: check arguments
if (table_name_.empty()) {
return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
ServerError res = SERVER_SUCCESS;
res = ValidateTableName(table_name_);
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid table name: " + table_name_);
}
if(top_k_ <= 0) {
......@@ -574,28 +591,14 @@ ServerError SearchVectorTask::OnExecute() {
rc.Record("do search");
//step 5: construct result array
for(uint64_t i = 0; i < record_count; i++) {
auto& result = results[i];
const auto& record = record_array_[i];
thrift::TopKQueryResult thrift_topk_result;
for(auto& pair : result) {
thrift::QueryResult thrift_result;
thrift_result.__set_id(pair.first);
thrift_result.__set_distance(pair.second);
thrift_topk_result.query_result_arrays.emplace_back(thrift_result);
}
result_array_.emplace_back(thrift_topk_result);
}
ConstructResult(results);
#ifdef MILVUS_ENABLE_PROFILING
ProfilerStop();
#endif
rc.Record("construct result");
rc.Elapse("totally cost");
rc.Elapse("total cost");
} catch (std::exception& ex) {
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
......@@ -604,6 +607,100 @@ ServerError SearchVectorTask::OnExecute() {
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
SearchVectorTask1::SearchVectorTask1(const std::string &table_name,
const std::vector<std::string>& file_id_array,
const std::vector<thrift::RowRecord> &query_record_array,
const std::vector<thrift::Range> &query_range_array,
const int64_t top_k,
std::vector<thrift::TopKQueryResult> &result_array)
: SearchVectorTaskBase(table_name, file_id_array, query_record_array, query_range_array, top_k),
result_array_(result_array) {
}
BaseTaskPtr SearchVectorTask1::Create(const std::string& table_name,
const std::vector<std::string>& file_id_array,
const std::vector<thrift::RowRecord> & query_record_array,
const std::vector<thrift::Range> & query_range_array,
const int64_t top_k,
std::vector<thrift::TopKQueryResult>& result_array) {
return std::shared_ptr<BaseTask>(new SearchVectorTask1(table_name, file_id_array,
query_record_array, query_range_array, top_k, result_array));
}
ServerError SearchVectorTask1::ConstructResult(engine::QueryResults& results) {
for(uint64_t i = 0; i < results.size(); i++) {
auto& result = results[i];
const auto& record = record_array_[i];
thrift::TopKQueryResult thrift_topk_result;
for(auto& pair : result) {
thrift::QueryResult thrift_result;
thrift_result.__set_id(pair.first);
thrift_result.__set_distance(pair.second);
thrift_topk_result.query_result_arrays.emplace_back(thrift_result);
}
result_array_.emplace_back(thrift_topk_result);
}
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
SearchVectorTask2::SearchVectorTask2(const std::string &table_name,
const std::vector<std::string>& file_id_array,
const std::vector<thrift::RowRecord> &query_record_array,
const std::vector<thrift::Range> &query_range_array,
const int64_t top_k,
std::vector<thrift::TopKQueryBinResult> &result_array)
: SearchVectorTaskBase(table_name, file_id_array, query_record_array, query_range_array, top_k),
result_array_(result_array) {
}
BaseTaskPtr SearchVectorTask2::Create(const std::string& table_name,
const std::vector<std::string>& file_id_array,
const std::vector<thrift::RowRecord> & query_record_array,
const std::vector<thrift::Range> & query_range_array,
const int64_t top_k,
std::vector<thrift::TopKQueryBinResult>& result_array) {
return std::shared_ptr<BaseTask>(new SearchVectorTask2(table_name, file_id_array,
query_record_array, query_range_array, top_k, result_array));
}
ServerError SearchVectorTask2::ConstructResult(engine::QueryResults& results) {
for(size_t i = 0; i < results.size(); i++) {
auto& result = results[i];
thrift::TopKQueryBinResult thrift_topk_result;
if(result.empty()) {
result_array_.emplace_back(thrift_topk_result);
continue;
}
std::string str_ids, str_distances;
str_ids.resize(sizeof(engine::IDNumber)*result.size());
str_distances.resize(sizeof(double)*result.size());
engine::IDNumber* ids_ptr = (engine::IDNumber*)str_ids.data();
double* distance_ptr = (double*)str_distances.data();
for(size_t k = 0; k < results.size(); k++) {
auto& pair = result[k];
ids_ptr[k] = pair.first;
distance_ptr[k] = pair.second;
}
thrift_topk_result.__set_id_array(str_ids);
thrift_topk_result.__set_distance_array(str_distances);
result_array_.emplace_back(thrift_topk_result);
}
return SERVER_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
GetTableRowCountTask::GetTableRowCountTask(const std::string& table_name, int64_t& row_count)
: BaseTask(DDL_DML_TASK_GROUP),
......@@ -621,8 +718,10 @@ ServerError GetTableRowCountTask::OnExecute() {
TimeRecorder rc("GetTableRowCountTask");
//step 1: check arguments
if (table_name_.empty()) {
return SetError(SERVER_INVALID_TABLE_NAME, "Empty table name");
ServerError res = SERVER_SUCCESS;
res = ValidateTableName(table_name_);
if(res != SERVER_SUCCESS) {
return SetError(res, "Invalid table name: " + table_name_);
}
//step 2: get row count
......@@ -634,7 +733,7 @@ ServerError GetTableRowCountTask::OnExecute() {
row_count_ = (int64_t) row_count;
rc.Elapse("totally cost");
rc.Elapse("total cost");
} catch (std::exception& ex) {
return SetError(SERVER_UNEXPECTED_ERROR, ex.what());
......
......@@ -129,7 +129,28 @@ private:
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class SearchVectorTask : public BaseTask {
class SearchVectorTaskBase : public BaseTask {
protected:
SearchVectorTaskBase(const std::string& table_name,
const std::vector<std::string>& file_id_array,
const std::vector<::milvus::thrift::RowRecord> & query_record_array,
const std::vector<::milvus::thrift::Range> & query_range_array,
const int64_t top_k);
ServerError OnExecute() override;
virtual ServerError ConstructResult(engine::QueryResults& results) = 0;
protected:
std::string table_name_;
std::vector<std::string> file_id_array_;
int64_t top_k_;
const std::vector<::milvus::thrift::RowRecord>& record_array_;
const std::vector<::milvus::thrift::Range>& range_array_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class SearchVectorTask1 : public SearchVectorTaskBase {
public:
static BaseTaskPtr Create(const std::string& table_name,
const std::vector<std::string>& file_id_array,
......@@ -139,24 +160,43 @@ public:
std::vector<::milvus::thrift::TopKQueryResult>& result_array);
protected:
SearchVectorTask(const std::string& table_name,
const std::vector<std::string>& file_id_array,
const std::vector<::milvus::thrift::RowRecord> & query_record_array,
const std::vector<::milvus::thrift::Range> & query_range_array,
const int64_t top_k,
SearchVectorTask1(const std::string& table_name,
const std::vector<std::string>& file_id_array,
const std::vector<::milvus::thrift::RowRecord> & query_record_array,
const std::vector<::milvus::thrift::Range> & query_range_array,
const int64_t top_k,
std::vector<::milvus::thrift::TopKQueryResult>& result_array);
ServerError OnExecute() override;
ServerError ConstructResult(engine::QueryResults& results) override;
private:
std::string table_name_;
std::vector<std::string> file_id_array_;
int64_t top_k_;
const std::vector<::milvus::thrift::RowRecord>& record_array_;
const std::vector<::milvus::thrift::Range>& range_array_;
std::vector<::milvus::thrift::TopKQueryResult>& result_array_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class SearchVectorTask2 : public SearchVectorTaskBase {
public:
static BaseTaskPtr Create(const std::string& table_name,
const std::vector<std::string>& file_id_array,
const std::vector<::milvus::thrift::RowRecord> & query_record_array,
const std::vector<::milvus::thrift::Range> & query_range_array,
const int64_t top_k,
std::vector<::milvus::thrift::TopKQueryBinResult>& result_array);
protected:
SearchVectorTask2(const std::string& table_name,
const std::vector<std::string>& file_id_array,
const std::vector<::milvus::thrift::RowRecord> & query_record_array,
const std::vector<::milvus::thrift::Range> & query_range_array,
const int64_t top_k,
std::vector<::milvus::thrift::TopKQueryBinResult>& result_array);
ServerError ConstructResult(engine::QueryResults& results) override;
private:
std::vector<::milvus::thrift::TopKQueryBinResult>& result_array_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class GetTableRowCountTask : public BaseTask {
public:
......
......@@ -27,6 +27,7 @@ static const std::string CONFIG_DB_SLAVE_PATH = "db_slave_path";
static const std::string CONFIG_DB_INDEX_TRIGGER_SIZE = "index_building_threshold";
static const std::string CONFIG_DB_ARCHIVE_DISK = "archive_disk_threshold";
static const std::string CONFIG_DB_ARCHIVE_DAYS = "archive_days_threshold";
static const std::string CONFIG_DB_INSERT_BUFFER_SIZE = "insert_buffer_size";
static const std::string CONFIG_LOG = "log_config";
......
......@@ -104,6 +104,25 @@ class MilvusServiceIf {
*/
virtual void SearchVector(std::vector<TopKQueryResult> & _return, const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk) = 0;
/**
* @brief Query vector
*
* This method is used to query vector in table.
*
* @param table_name, table_name is queried.
* @param query_record_array, all vector are going to be queried.
* @param query_range_array, optional ranges for conditional search. If not specified, search whole table
* @param topk, how many similarity vectors will be searched.
*
* @return query binary result array.
*
* @param table_name
* @param query_record_array
* @param query_range_array
* @param topk
*/
virtual void SearchVector2(std::vector<TopKQueryBinResult> & _return, const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk) = 0;
/**
* @brief Internal use query interface
*
......@@ -218,6 +237,9 @@ class MilvusServiceNull : virtual public MilvusServiceIf {
void SearchVector(std::vector<TopKQueryResult> & /* _return */, const std::string& /* table_name */, const std::vector<RowRecord> & /* query_record_array */, const std::vector<Range> & /* query_range_array */, const int64_t /* topk */) {
return;
}
void SearchVector2(std::vector<TopKQueryBinResult> & /* _return */, const std::string& /* table_name */, const std::vector<RowRecord> & /* query_record_array */, const std::vector<Range> & /* query_range_array */, const int64_t /* topk */) {
return;
}
void SearchVectorInFiles(std::vector<TopKQueryResult> & /* _return */, const std::string& /* table_name */, const std::vector<std::string> & /* file_id_array */, const std::vector<RowRecord> & /* query_record_array */, const std::vector<Range> & /* query_range_array */, const int64_t /* topk */) {
return;
}
......@@ -912,6 +934,139 @@ class MilvusService_SearchVector_presult {
};
typedef struct _MilvusService_SearchVector2_args__isset {
_MilvusService_SearchVector2_args__isset() : table_name(false), query_record_array(false), query_range_array(false), topk(false) {}
bool table_name :1;
bool query_record_array :1;
bool query_range_array :1;
bool topk :1;
} _MilvusService_SearchVector2_args__isset;
class MilvusService_SearchVector2_args {
public:
MilvusService_SearchVector2_args(const MilvusService_SearchVector2_args&);
MilvusService_SearchVector2_args& operator=(const MilvusService_SearchVector2_args&);
MilvusService_SearchVector2_args() : table_name(), topk(0) {
}
virtual ~MilvusService_SearchVector2_args() throw();
std::string table_name;
std::vector<RowRecord> query_record_array;
std::vector<Range> query_range_array;
int64_t topk;
_MilvusService_SearchVector2_args__isset __isset;
void __set_table_name(const std::string& val);
void __set_query_record_array(const std::vector<RowRecord> & val);
void __set_query_range_array(const std::vector<Range> & val);
void __set_topk(const int64_t val);
bool operator == (const MilvusService_SearchVector2_args & rhs) const
{
if (!(table_name == rhs.table_name))
return false;
if (!(query_record_array == rhs.query_record_array))
return false;
if (!(query_range_array == rhs.query_range_array))
return false;
if (!(topk == rhs.topk))
return false;
return true;
}
bool operator != (const MilvusService_SearchVector2_args &rhs) const {
return !(*this == rhs);
}
bool operator < (const MilvusService_SearchVector2_args & ) const;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};
class MilvusService_SearchVector2_pargs {
public:
virtual ~MilvusService_SearchVector2_pargs() throw();
const std::string* table_name;
const std::vector<RowRecord> * query_record_array;
const std::vector<Range> * query_range_array;
const int64_t* topk;
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};
typedef struct _MilvusService_SearchVector2_result__isset {
_MilvusService_SearchVector2_result__isset() : success(false), e(false) {}
bool success :1;
bool e :1;
} _MilvusService_SearchVector2_result__isset;
class MilvusService_SearchVector2_result {
public:
MilvusService_SearchVector2_result(const MilvusService_SearchVector2_result&);
MilvusService_SearchVector2_result& operator=(const MilvusService_SearchVector2_result&);
MilvusService_SearchVector2_result() {
}
virtual ~MilvusService_SearchVector2_result() throw();
std::vector<TopKQueryBinResult> success;
Exception e;
_MilvusService_SearchVector2_result__isset __isset;
void __set_success(const std::vector<TopKQueryBinResult> & val);
void __set_e(const Exception& val);
bool operator == (const MilvusService_SearchVector2_result & rhs) const
{
if (!(success == rhs.success))
return false;
if (!(e == rhs.e))
return false;
return true;
}
bool operator != (const MilvusService_SearchVector2_result &rhs) const {
return !(*this == rhs);
}
bool operator < (const MilvusService_SearchVector2_result & ) const;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
};
typedef struct _MilvusService_SearchVector2_presult__isset {
_MilvusService_SearchVector2_presult__isset() : success(false), e(false) {}
bool success :1;
bool e :1;
} _MilvusService_SearchVector2_presult__isset;
class MilvusService_SearchVector2_presult {
public:
virtual ~MilvusService_SearchVector2_presult() throw();
std::vector<TopKQueryBinResult> * success;
Exception e;
_MilvusService_SearchVector2_presult__isset __isset;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
};
typedef struct _MilvusService_SearchVectorInFiles_args__isset {
_MilvusService_SearchVectorInFiles_args__isset() : table_name(false), file_id_array(false), query_record_array(false), query_range_array(false), topk(false) {}
bool table_name :1;
......@@ -1531,6 +1686,9 @@ class MilvusServiceClient : virtual public MilvusServiceIf {
void SearchVector(std::vector<TopKQueryResult> & _return, const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
void send_SearchVector(const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
void recv_SearchVector(std::vector<TopKQueryResult> & _return);
void SearchVector2(std::vector<TopKQueryBinResult> & _return, const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
void send_SearchVector2(const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
void recv_SearchVector2(std::vector<TopKQueryBinResult> & _return);
void SearchVectorInFiles(std::vector<TopKQueryResult> & _return, const std::string& table_name, const std::vector<std::string> & file_id_array, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
void send_SearchVectorInFiles(const std::string& table_name, const std::vector<std::string> & file_id_array, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
void recv_SearchVectorInFiles(std::vector<TopKQueryResult> & _return);
......@@ -1567,6 +1725,7 @@ class MilvusServiceProcessor : public ::apache::thrift::TDispatchProcessor {
void process_BuildIndex(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_AddVector(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_SearchVector(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_SearchVector2(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_SearchVectorInFiles(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_DescribeTable(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
void process_GetTableRowCount(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
......@@ -1581,6 +1740,7 @@ class MilvusServiceProcessor : public ::apache::thrift::TDispatchProcessor {
processMap_["BuildIndex"] = &MilvusServiceProcessor::process_BuildIndex;
processMap_["AddVector"] = &MilvusServiceProcessor::process_AddVector;
processMap_["SearchVector"] = &MilvusServiceProcessor::process_SearchVector;
processMap_["SearchVector2"] = &MilvusServiceProcessor::process_SearchVector2;
processMap_["SearchVectorInFiles"] = &MilvusServiceProcessor::process_SearchVectorInFiles;
processMap_["DescribeTable"] = &MilvusServiceProcessor::process_DescribeTable;
processMap_["GetTableRowCount"] = &MilvusServiceProcessor::process_GetTableRowCount;
......@@ -1670,6 +1830,16 @@ class MilvusServiceMultiface : virtual public MilvusServiceIf {
return;
}
void SearchVector2(std::vector<TopKQueryBinResult> & _return, const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk) {
size_t sz = ifaces_.size();
size_t i = 0;
for (; i < (sz - 1); ++i) {
ifaces_[i]->SearchVector2(_return, table_name, query_record_array, query_range_array, topk);
}
ifaces_[i]->SearchVector2(_return, table_name, query_record_array, query_range_array, topk);
return;
}
void SearchVectorInFiles(std::vector<TopKQueryResult> & _return, const std::string& table_name, const std::vector<std::string> & file_id_array, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk) {
size_t sz = ifaces_.size();
size_t i = 0;
......@@ -1767,6 +1937,9 @@ class MilvusServiceConcurrentClient : virtual public MilvusServiceIf {
void SearchVector(std::vector<TopKQueryResult> & _return, const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
int32_t send_SearchVector(const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
void recv_SearchVector(std::vector<TopKQueryResult> & _return, const int32_t seqid);
void SearchVector2(std::vector<TopKQueryBinResult> & _return, const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
int32_t send_SearchVector2(const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
void recv_SearchVector2(std::vector<TopKQueryBinResult> & _return, const int32_t seqid);
void SearchVectorInFiles(std::vector<TopKQueryResult> & _return, const std::string& table_name, const std::vector<std::string> & file_id_array, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
int32_t send_SearchVectorInFiles(const std::string& table_name, const std::vector<std::string> & file_id_array, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk);
void recv_SearchVectorInFiles(std::vector<TopKQueryResult> & _return, const int32_t seqid);
......
......@@ -120,6 +120,28 @@ class MilvusServiceHandler : virtual public MilvusServiceIf {
printf("SearchVector\n");
}
/**
* @brief Query vector
*
* This method is used to query vector in table.
*
* @param table_name, table_name is queried.
* @param query_record_array, all vector are going to be queried.
* @param query_range_array, optional ranges for conditional search. If not specified, search whole table
* @param topk, how many similarity vectors will be searched.
*
* @return query binary result array.
*
* @param table_name
* @param query_record_array
* @param query_range_array
* @param topk
*/
void SearchVector2(std::vector<TopKQueryBinResult> & _return, const std::string& table_name, const std::vector<RowRecord> & query_record_array, const std::vector<Range> & query_range_array, const int64_t topk) {
// Your implementation goes here
printf("SearchVector2\n");
}
/**
* @brief Internal use query interface
*
......
......@@ -781,4 +781,119 @@ void TopKQueryResult::printTo(std::ostream& out) const {
out << ")";
}
TopKQueryBinResult::~TopKQueryBinResult() throw() {
}
void TopKQueryBinResult::__set_id_array(const std::string& val) {
this->id_array = val;
}
void TopKQueryBinResult::__set_distance_array(const std::string& val) {
this->distance_array = val;
}
std::ostream& operator<<(std::ostream& out, const TopKQueryBinResult& obj)
{
obj.printTo(out);
return out;
}
uint32_t TopKQueryBinResult::read(::apache::thrift::protocol::TProtocol* iprot) {
::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
uint32_t xfer = 0;
std::string fname;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
bool isset_id_array = false;
bool isset_distance_array = false;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == ::apache::thrift::protocol::T_STRING) {
xfer += iprot->readBinary(this->id_array);
isset_id_array = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == ::apache::thrift::protocol::T_STRING) {
xfer += iprot->readBinary(this->distance_array);
isset_distance_array = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
if (!isset_id_array)
throw TProtocolException(TProtocolException::INVALID_DATA);
if (!isset_distance_array)
throw TProtocolException(TProtocolException::INVALID_DATA);
return xfer;
}
uint32_t TopKQueryBinResult::write(::apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("TopKQueryBinResult");
xfer += oprot->writeFieldBegin("id_array", ::apache::thrift::protocol::T_STRING, 1);
xfer += oprot->writeBinary(this->id_array);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("distance_array", ::apache::thrift::protocol::T_STRING, 2);
xfer += oprot->writeBinary(this->distance_array);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(TopKQueryBinResult &a, TopKQueryBinResult &b) {
using ::std::swap;
swap(a.id_array, b.id_array);
swap(a.distance_array, b.distance_array);
}
TopKQueryBinResult::TopKQueryBinResult(const TopKQueryBinResult& other19) {
id_array = other19.id_array;
distance_array = other19.distance_array;
}
TopKQueryBinResult& TopKQueryBinResult::operator=(const TopKQueryBinResult& other20) {
id_array = other20.id_array;
distance_array = other20.distance_array;
return *this;
}
void TopKQueryBinResult::printTo(std::ostream& out) const {
using ::apache::thrift::to_string;
out << "TopKQueryBinResult(";
out << "id_array=" << to_string(id_array);
out << ", " << "distance_array=" << to_string(distance_array);
out << ")";
}
}} // namespace
......@@ -63,6 +63,8 @@ class QueryResult;
class TopKQueryResult;
class TopKQueryBinResult;
typedef struct _Exception__isset {
_Exception__isset() : code(false), reason(false) {}
bool code :1;
......@@ -346,6 +348,47 @@ void swap(TopKQueryResult &a, TopKQueryResult &b);
std::ostream& operator<<(std::ostream& out, const TopKQueryResult& obj);
class TopKQueryBinResult : public virtual ::apache::thrift::TBase {
public:
TopKQueryBinResult(const TopKQueryBinResult&);
TopKQueryBinResult& operator=(const TopKQueryBinResult&);
TopKQueryBinResult() : id_array(), distance_array() {
}
virtual ~TopKQueryBinResult() throw();
std::string id_array;
std::string distance_array;
void __set_id_array(const std::string& val);
void __set_distance_array(const std::string& val);
bool operator == (const TopKQueryBinResult & rhs) const
{
if (!(id_array == rhs.id_array))
return false;
if (!(distance_array == rhs.distance_array))
return false;
return true;
}
bool operator != (const TopKQueryBinResult &rhs) const {
return !(*this == rhs);
}
bool operator < (const TopKQueryBinResult & ) const;
uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
virtual void printTo(std::ostream& out) const;
};
void swap(TopKQueryBinResult &a, TopKQueryBinResult &b);
std::ostream& operator<<(std::ostream& out, const TopKQueryBinResult& obj);
}} // namespace
#endif
......@@ -84,6 +84,14 @@ struct TopKQueryResult {
1: list<QueryResult> query_result_arrays; ///< TopK query result
}
/**
* @brief TopK query binary result
*/
struct TopKQueryBinResult {
1: required binary id_array; ///< id array, interger array
2: required binary distance_array; ///< distance array, double array
}
service MilvusService {
/**
* @brief Create table method
......@@ -158,6 +166,23 @@ service MilvusService {
4: list<Range> query_range_array,
5: i64 topk) throws(1: Exception e);
/**
* @brief Query vector
*
* This method is used to query vector in table.
*
* @param table_name, table_name is queried.
* @param query_record_array, all vector are going to be queried.
* @param query_range_array, optional ranges for conditional search. If not specified, search whole table
* @param topk, how many similarity vectors will be searched.
*
* @return query binary result array.
*/
list<TopKQueryBinResult> SearchVector2(2: string table_name,
3: list<RowRecord> query_record_array,
4: list<Range> query_range_array,
5: i64 topk) throws(1: Exception e);
/**
* @brief Internal use query interface
*
......
......@@ -55,7 +55,6 @@ constexpr ServerError SERVER_LICENSE_VALIDATION_FAIL = ToGlobalServerErrorCode(5
constexpr ServerError DB_META_TRANSACTION_FAILED = ToGlobalServerErrorCode(1000);
class ServerException : public std::exception {
public:
ServerException(ServerError error_code,
......
#include <src/db/ExecutionEngine.h>
#include "ValidationUtil.h"
#include "Log.h"
namespace zilliz {
namespace milvus {
namespace server {
constexpr size_t table_name_size_limit = 255;
constexpr int64_t table_dimension_limit = 16384;
ServerError
ValidateTableName(const std::string &table_name) {
// Table name shouldn't be empty.
if (table_name.empty()) {
SERVER_LOG_ERROR << "Empty table name";
return SERVER_INVALID_TABLE_NAME;
}
// Table name size shouldn't exceed 16384.
if (table_name.size() > table_name_size_limit) {
SERVER_LOG_ERROR << "Table name size exceed the limitation";
return SERVER_INVALID_TABLE_NAME;
}
// Table name first character should be underscore or character.
char first_char = table_name[0];
if (first_char != '_' && std::isalpha(first_char) == 0) {
SERVER_LOG_ERROR << "Table name first character isn't underscore or character: " << first_char;
return SERVER_INVALID_TABLE_NAME;
}
int64_t table_name_size = table_name.size();
for (int64_t i = 1; i < table_name_size; ++i) {
char name_char = table_name[i];
if (name_char != '_' && std::isalnum(name_char) == 0) {
SERVER_LOG_ERROR << "Table name character isn't underscore or alphanumber: " << name_char;
return SERVER_INVALID_TABLE_NAME;
}
}
return SERVER_SUCCESS;
}
ServerError
ValidateTableDimension(int64_t dimension) {
if (dimension <= 0 || dimension > table_dimension_limit) {
SERVER_LOG_ERROR << "Table dimension excceed the limitation: " << table_dimension_limit;
return SERVER_INVALID_VECTOR_DIMENSION;
} else {
return SERVER_SUCCESS;
}
}
ServerError
ValidateTableIndexType(int32_t index_type) {
int engine_type = (int)engine::EngineType(index_type);
if(engine_type <= 0 || engine_type > (int)engine::EngineType::MAX_VALUE) {
return SERVER_INVALID_INDEX_TYPE;
}
SERVER_LOG_DEBUG << "Index type: " << index_type;
return SERVER_SUCCESS;
}
}
}
}
\ No newline at end of file
#pragma once
#include "Error.h"
namespace zilliz {
namespace milvus {
namespace server {
ServerError
ValidateTableName(const std::string& table_name);
ServerError
ValidateTableDimension(int64_t dimension);
ServerError
ValidateTableIndexType(int32_t index_type);
}
}
}
\ No newline at end of file
......@@ -28,6 +28,15 @@ IndexType resolveIndexType(const string &index_type) {
return IndexType::Invalid_Option;
}
int CalcBacketCount(int nb, size_t nlist) {
int backet_count = int(nb / 1000000.0 * nlist);
if(backet_count == 0) {
backet_count = 1; //avoid faiss rash
}
return backet_count;
}
// nb at least 100
string Operand::get_index_type(const int &nb) {
if (!index_str.empty()) { return index_str; }
......@@ -45,7 +54,7 @@ string Operand::get_index_type(const int &nb) {
size_t nlist = engine_config.GetInt32Value(CONFIG_NLIST, 16384);
index_str += (ncent != 0 ? index_type + std::to_string(ncent) :
index_type + std::to_string(int(nb / 1000000.0 * nlist)));
index_type + std::to_string(CalcBacketCount(nb, nlist)));
// std::cout<<"nlist = "<<nlist<<std::endl;
if (!postproc.empty()) { index_str += ("," + postproc); }
break;
......@@ -58,7 +67,7 @@ string Operand::get_index_type(const int &nb) {
size_t nlist = engine_config.GetInt32Value(CONFIG_NLIST, 16384);
index_str += (ncent != 0 ? "IVF" + std::to_string(ncent) :
"IVF" + std::to_string(int(nb / 1000000.0 * nlist)));
"IVF" + std::to_string(CalcBacketCount(nb, nlist)));
index_str += ",SQ8";
// std::cout<<"nlist = "<<nlist<<std::endl;
break;
......
......@@ -3,6 +3,7 @@ BOOST_VERSION=1.70.0
BZIP2_VERSION=1.0.6
EASYLOGGINGPP_VERSION=v9.96.7
FAISS_VERSION=7b07685
MKL_VERSION=2019.4.243
GTEST_VERSION=1.8.1
JSONCONS_VERSION=0.126.0
LAPACK_VERSION=v3.8.0
......
......@@ -12,7 +12,6 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/config config_files)
set(unittest_srcs
${CMAKE_CURRENT_SOURCE_DIR}/main.cpp)
#${EASYLOGGINGPP_INCLUDE_DIR}/easylogging++.cc)
set(require_files
${MILVUS_ENGINE_SRC}/server/ServerConfig.cpp
......@@ -29,7 +28,6 @@ set(unittest_libs
easyloggingpp
pthread
metrics
openblas
gfortran
prometheus-cpp-pull
prometheus-cpp-push
......@@ -44,4 +42,5 @@ add_subdirectory(db)
add_subdirectory(faiss_wrapper)
#add_subdirectory(license)
add_subdirectory(metrics)
add_subdirectory(storage)
\ No newline at end of file
add_subdirectory(storage)
add_subdirectory(utils)
\ No newline at end of file
......@@ -24,7 +24,6 @@ link_directories("/usr/local/cuda/lib64")
include_directories(/usr/include/mysql)
set(db_test_src
#${unittest_srcs}
${config_files}
${cache_srcs}
${db_srcs}
......@@ -40,13 +39,18 @@ set(db_libs
faiss
cudart
cublas
sqlite3
boost_system
boost_filesystem
sqlite
boost_system_static
boost_filesystem_static
lz4
mysqlpp
)
if(${BUILD_FAISS_WITH_MKL} STREQUAL "true")
set(db_libs ${db_libs} ${MKL_LIBS} ${MKL_LIBS})
endif()
target_link_libraries(db_test ${db_libs} ${unittest_libs})
install(TARGETS db_test DESTINATION bin)
......@@ -3,17 +3,20 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <thread>
#include <easylogging++.h>
#include <boost/filesystem.hpp>
#include "utils.h"
#include "db/DB.h"
#include "db/DBImpl.h"
#include "db/MetaConsts.h"
#include "db/Factories.h"
#include <gtest/gtest.h>
#include <easylogging++.h>
#include <boost/filesystem.hpp>
#include <thread>
#include <random>
using namespace zilliz::milvus;
namespace {
......@@ -52,15 +55,13 @@ TEST_F(DBTest, CONFIG_TEST) {
engine::ArchiveConf conf("delete");
ASSERT_EQ(conf.GetType(), "delete");
auto criterias = conf.GetCriterias();
ASSERT_TRUE(criterias.size() == 1);
ASSERT_TRUE(criterias["disk"] == 512);
ASSERT_TRUE(criterias.size() == 0);
}
{
engine::ArchiveConf conf("swap");
ASSERT_EQ(conf.GetType(), "swap");
auto criterias = conf.GetCriterias();
ASSERT_TRUE(criterias.size() == 1);
ASSERT_TRUE(criterias["disk"] == 512);
ASSERT_TRUE(criterias.size() == 0);
}
{
ASSERT_ANY_THROW(engine::ArchiveConf conf1("swap", "disk:"));
......@@ -206,11 +207,21 @@ TEST_F(DBTest, SEARCH_TEST) {
ASSERT_STATS(stat);
}
sleep(2); // wait until build index finish
db_->BuildIndex(TABLE_NAME); // wait until build index finish
engine::QueryResults results;
stat = db_->Query(TABLE_NAME, k, nq, xq.data(), results);
ASSERT_STATS(stat);
{
engine::QueryResults results;
stat = db_->Query(TABLE_NAME, k, nq, xq.data(), results);
ASSERT_STATS(stat);
}
{//search by specify index file
engine::meta::DatesT dates;
std::vector<std::string> file_ids = {"1", "2", "3", "4"};
engine::QueryResults results;
stat = db_->Query(TABLE_NAME, file_ids, k, nq, xq.data(), dates, results);
ASSERT_STATS(stat);
}
// TODO(linxj): add groundTruth assert
};
......
此差异已折叠。
......@@ -14,6 +14,7 @@
#include "db/Options.h"
#include "db/DBMetaImpl.h"
#include "db/EngineFactory.h"
#include "db/Utils.h"
#include <vector>
......@@ -134,4 +135,32 @@ TEST(DBMiscTest, META_TEST) {
int delta = 10;
engine::meta::DateT dt = impl.GetDate(tt, delta);
ASSERT_GT(dt, 0);
}
TEST(DBMiscTest, UTILS_TEST) {
engine::DBMetaOptions options;
options.path = "/tmp/milvus_test/main";
options.slave_paths.push_back("/tmp/milvus_test/slave_1");
options.slave_paths.push_back("/tmp/milvus_test/slave_2");
const std::string TABLE_NAME = "test_tbl";
auto status = engine::utils::CreateTablePath(options, TABLE_NAME);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(boost::filesystem::exists(options.path));
for(auto& path : options.slave_paths) {
ASSERT_TRUE(boost::filesystem::exists(path));
}
engine::meta::TableFileSchema file;
file.id_ = 50;
file.table_id_ = TABLE_NAME;
file.file_type_ = 3;
file.date_ = 155000;
status = engine::utils::GetTableFilePath(options, file);
ASSERT_FALSE(status.ok());
ASSERT_TRUE(file.location_.empty());
status = engine::utils::DeleteTablePath(options, TABLE_NAME);
ASSERT_TRUE(status.ok());
}
\ No newline at end of file
......@@ -3,17 +3,19 @@
// Unauthorized copying of this file, via any medium is strictly prohibited.
// Proprietary and confidential.
////////////////////////////////////////////////////////////////////////////////
#include <gtest/gtest.h>
#include <thread>
#include <easylogging++.h>
#include <boost/filesystem.hpp>
#include "utils.h"
#include "db/DB.h"
#include "db/DBImpl.h"
#include "db/MetaConsts.h"
#include "db/Factories.h"
#include <gtest/gtest.h>
#include <easylogging++.h>
#include <boost/filesystem.hpp>
#include <thread>
#include <random>
using namespace zilliz::milvus;
namespace {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -25,7 +25,7 @@ set(s3_client_libs
stdc++
aws-cpp-sdk-s3
aws-cpp-sdk-core
boost_filesystem
boost_filesystem_static
)
target_link_libraries(s3_test
${s3_client_libs}
......
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册