提交 a38f539b 编写于 作者: F FluorineDog 提交者: yefu.chen

Move c headers to segcore folder

Signed-off-by: NFluorineDog <guilin.gou@zilliz.com>
上级 df1dcf81
...@@ -203,7 +203,7 @@ set( GPU_ENABLE "false" ) ...@@ -203,7 +203,7 @@ set( GPU_ENABLE "false" )
install( install(
DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/segcore/ DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/segcore/
DESTINATION include DESTINATION include/segcore/
FILES_MATCHING PATTERN "*_c.h" FILES_MATCHING PATTERN "*_c.h"
) )
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
extern "C" { extern "C" {
#endif #endif
#include "collection_c.h" #include "segcore/collection_c.h"
#include <stdbool.h> #include <stdbool.h>
#include <stdint.h> #include <stdint.h>
......
...@@ -14,8 +14,8 @@ extern "C" { ...@@ -14,8 +14,8 @@ extern "C" {
#endif #endif
#include <stdbool.h> #include <stdbool.h>
#include "collection_c.h" #include "segcore/collection_c.h"
#include "plan_c.h" #include "segcore/plan_c.h"
#include <stdint.h> #include <stdint.h>
typedef void* CSegmentBase; typedef void* CSegmentBase;
......
...@@ -6,8 +6,8 @@ package reader ...@@ -6,8 +6,8 @@ package reader
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#include "collection_c.h" #include "segcore/collection_c.h"
#include "segment_c.h" #include "segcore/segment_c.h"
*/ */
import "C" import "C"
......
...@@ -6,13 +6,14 @@ package reader ...@@ -6,13 +6,14 @@ package reader
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#include "collection_c.h" #include "segcore/collection_c.h"
#include "segment_c.h" #include "segcore/segment_c.h"
*/ */
import "C" import "C"
import ( import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"log"
"strconv" "strconv"
"sync" "sync"
...@@ -37,13 +38,18 @@ type collectionReplica interface { ...@@ -37,13 +38,18 @@ type collectionReplica interface {
removeCollection(collectionID UniqueID) error removeCollection(collectionID UniqueID) error
getCollectionByID(collectionID UniqueID) (*Collection, error) getCollectionByID(collectionID UniqueID) (*Collection, error)
getCollectionByName(collectionName string) (*Collection, error) getCollectionByName(collectionName string) (*Collection, error)
hasCollection(collectionID UniqueID) bool
// partition // partition
// Partition tags in different collections are not unique, // Partition tags in different collections are not unique,
// so partition api should specify the target collection. // so partition api should specify the target collection.
getPartitionNum(collectionID UniqueID) (int, error)
addPartition(collectionID UniqueID, partitionTag string) error addPartition(collectionID UniqueID, partitionTag string) error
removePartition(collectionID UniqueID, partitionTag string) error removePartition(collectionID UniqueID, partitionTag string) error
addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error
removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error
getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error)
hasPartition(collectionID UniqueID, partitionTag string) bool
// segment // segment
getSegmentNum() int getSegmentNum() int
...@@ -142,7 +148,31 @@ func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName stri ...@@ -142,7 +148,31 @@ func (colReplica *collectionReplicaImpl) getCollectionByName(collectionName stri
return nil, errors.New("Cannot found collection: " + collectionName) return nil, errors.New("Cannot found collection: " + collectionName)
} }
func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bool {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
for _, col := range colReplica.collections {
if col.ID() == collectionID {
return true
}
}
return false
}
//----------------------------------------------------------------------------------------------------- partition //----------------------------------------------------------------------------------------------------- partition
func (colReplica *collectionReplicaImpl) getPartitionNum(collectionID UniqueID) (int, error) {
collection, err := colReplica.getCollectionByID(collectionID)
if err != nil {
return -1, err
}
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
return len(collection.partitions), nil
}
func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error { func (colReplica *collectionReplicaImpl) addPartition(collectionID UniqueID, partitionTag string) error {
collection, err := colReplica.getCollectionByID(collectionID) collection, err := colReplica.getCollectionByID(collectionID)
if err != nil { if err != nil {
...@@ -182,6 +212,61 @@ func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID, ...@@ -182,6 +212,61 @@ func (colReplica *collectionReplicaImpl) removePartition(collectionID UniqueID,
return nil return nil
} }
func (colReplica *collectionReplicaImpl) addPartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error {
if !colReplica.hasCollection(colMeta.ID) {
err := errors.New("Cannot find collection, id = " + strconv.FormatInt(colMeta.ID, 10))
return err
}
pToAdd := make([]string, 0)
for _, partitionTag := range colMeta.PartitionTags {
if !colReplica.hasPartition(colMeta.ID, partitionTag) {
pToAdd = append(pToAdd, partitionTag)
}
}
for _, tag := range pToAdd {
err := colReplica.addPartition(colMeta.ID, tag)
if err != nil {
log.Println(err)
}
}
return nil
}
func (colReplica *collectionReplicaImpl) removePartitionsByCollectionMeta(colMeta *etcdpb.CollectionMeta) error {
col, err := colReplica.getCollectionByID(colMeta.ID)
if err != nil {
return err
}
colReplica.mu.Lock()
pToDel := make([]string, 0)
for _, partition := range col.partitions {
hasPartition := false
for _, tag := range colMeta.PartitionTags {
if partition.partitionTag == tag {
hasPartition = true
}
}
if !hasPartition {
pToDel = append(pToDel, partition.partitionTag)
}
}
colReplica.mu.Unlock()
for _, tag := range pToDel {
err := colReplica.removePartition(col.ID(), tag)
if err != nil {
log.Println(err)
}
}
return nil
}
func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) { func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID, partitionTag string) (*Partition, error) {
collection, err := colReplica.getCollectionByID(collectionID) collection, err := colReplica.getCollectionByID(collectionID)
if err != nil { if err != nil {
...@@ -200,6 +285,25 @@ func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID ...@@ -200,6 +285,25 @@ func (colReplica *collectionReplicaImpl) getPartitionByTag(collectionID UniqueID
return nil, errors.New("cannot find partition, tag = " + partitionTag) return nil, errors.New("cannot find partition, tag = " + partitionTag)
} }
func (colReplica *collectionReplicaImpl) hasPartition(collectionID UniqueID, partitionTag string) bool {
collection, err := colReplica.getCollectionByID(collectionID)
if err != nil {
log.Println(err)
return false
}
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
for _, p := range *collection.Partitions() {
if p.Tag() == partitionTag {
return true
}
}
return false
}
//----------------------------------------------------------------------------------------------------- segment //----------------------------------------------------------------------------------------------------- segment
func (colReplica *collectionReplicaImpl) getSegmentNum() int { func (colReplica *collectionReplicaImpl) getSegmentNum() int {
colReplica.mu.RLock() colReplica.mu.RLock()
...@@ -209,6 +313,9 @@ func (colReplica *collectionReplicaImpl) getSegmentNum() int { ...@@ -209,6 +313,9 @@ func (colReplica *collectionReplicaImpl) getSegmentNum() int {
} }
func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.QueryNodeSegStats { func (colReplica *collectionReplicaImpl) getSegmentStatistics() *internalpb.QueryNodeSegStats {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
var statisticData = make([]*internalpb.SegmentStats, 0) var statisticData = make([]*internalpb.SegmentStats, 0)
for segmentID, segment := range colReplica.segments { for segmentID, segment := range colReplica.segments {
...@@ -306,6 +413,9 @@ func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool { ...@@ -306,6 +413,9 @@ func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
//----------------------------------------------------------------------------------------------------- //-----------------------------------------------------------------------------------------------------
func (colReplica *collectionReplicaImpl) freeAll() { func (colReplica *collectionReplicaImpl) freeAll() {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
for _, seg := range colReplica.segments { for _, seg := range colReplica.segments {
deleteSegment(seg) deleteSegment(seg)
} }
......
...@@ -13,7 +13,60 @@ import ( ...@@ -13,7 +13,60 @@ import (
) )
//----------------------------------------------------------------------------------------------------- collection //----------------------------------------------------------------------------------------------------- collection
func TestColSegContainer_addCollection(t *testing.T) { func TestCollectionReplica_getCollectionNum(t *testing.T) {
ctx := context.Background()
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
(*node.replica).freeAll()
}
func TestCollectionReplica_addCollection(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
...@@ -66,9 +119,11 @@ func TestColSegContainer_addCollection(t *testing.T) { ...@@ -66,9 +119,11 @@ func TestColSegContainer_addCollection(t *testing.T) {
assert.Equal(t, collection.meta.Schema.Name, collectionName) assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1) assert.Equal(t, (*node.replica).getCollectionNum(), 1)
(*node.replica).freeAll()
} }
func TestColSegContainer_removeCollection(t *testing.T) { func TestCollectionReplica_removeCollection(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
...@@ -127,9 +182,11 @@ func TestColSegContainer_removeCollection(t *testing.T) { ...@@ -127,9 +182,11 @@ func TestColSegContainer_removeCollection(t *testing.T) {
err = (*node.replica).removeCollection(collectionID) err = (*node.replica).removeCollection(collectionID)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, (*node.replica).getCollectionNum(), 0) assert.Equal(t, (*node.replica).getCollectionNum(), 0)
(*node.replica).freeAll()
} }
func TestColSegContainer_getCollectionByID(t *testing.T) { func TestCollectionReplica_getCollectionByID(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
...@@ -189,9 +246,11 @@ func TestColSegContainer_getCollectionByID(t *testing.T) { ...@@ -189,9 +246,11 @@ func TestColSegContainer_getCollectionByID(t *testing.T) {
assert.NotNil(t, targetCollection) assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") assert.Equal(t, targetCollection.meta.Schema.Name, "collection0")
assert.Equal(t, targetCollection.meta.ID, UniqueID(0)) assert.Equal(t, targetCollection.meta.ID, UniqueID(0))
(*node.replica).freeAll()
} }
func TestColSegContainer_getCollectionByName(t *testing.T) { func TestCollectionReplica_getCollectionByName(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
...@@ -251,10 +310,68 @@ func TestColSegContainer_getCollectionByName(t *testing.T) { ...@@ -251,10 +310,68 @@ func TestColSegContainer_getCollectionByName(t *testing.T) {
assert.NotNil(t, targetCollection) assert.NotNil(t, targetCollection)
assert.Equal(t, targetCollection.meta.Schema.Name, "collection0") assert.Equal(t, targetCollection.meta.Schema.Name, "collection0")
assert.Equal(t, targetCollection.meta.ID, UniqueID(0)) assert.Equal(t, targetCollection.meta.ID, UniqueID(0))
(*node.replica).freeAll()
}
func TestCollectionReplica_hasCollection(t *testing.T) {
ctx := context.Background()
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: collectionName,
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: UniqueID(0),
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
hasCollection := (*node.replica).hasCollection(UniqueID(0))
assert.Equal(t, hasCollection, true)
hasCollection = (*node.replica).hasCollection(UniqueID(1))
assert.Equal(t, hasCollection, false)
(*node.replica).freeAll()
} }
//----------------------------------------------------------------------------------------------------- partition //----------------------------------------------------------------------------------------------------- partition
func TestColSegContainer_addPartition(t *testing.T) { func TestCollectionReplica_getPartitionNum(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
...@@ -317,9 +434,82 @@ func TestColSegContainer_addPartition(t *testing.T) { ...@@ -317,9 +434,82 @@ func TestColSegContainer_addPartition(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, "default") assert.Equal(t, partition.partitionTag, "default")
} }
partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, 1)
(*node.replica).freeAll()
} }
func TestColSegContainer_removePartition(t *testing.T) { func TestCollectionReplica_addPartition(t *testing.T) {
ctx := context.Background()
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
for _, tag := range collectionMeta.PartitionTags {
err := (*node.replica).addPartition(collectionID, tag)
assert.NoError(t, err)
partition, err := (*node.replica).getPartitionByTag(collectionID, tag)
assert.NoError(t, err)
assert.Equal(t, partition.partitionTag, "default")
}
(*node.replica).freeAll()
}
func TestCollectionReplica_removePartition(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
...@@ -385,9 +575,157 @@ func TestColSegContainer_removePartition(t *testing.T) { ...@@ -385,9 +575,157 @@ func TestColSegContainer_removePartition(t *testing.T) {
err = (*node.replica).removePartition(collectionID, partitionTag) err = (*node.replica).removePartition(collectionID, partitionTag)
assert.NoError(t, err) assert.NoError(t, err)
} }
(*node.replica).freeAll()
}
func TestCollectionReplica_addPartitionsByCollectionMeta(t *testing.T) {
ctx := context.Background()
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"p0"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
collectionMeta.PartitionTags = []string{"p0", "p1", "p2"}
err = (*node.replica).addPartitionsByCollectionMeta(&collectionMeta)
assert.NoError(t, err)
partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, 3)
hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2")
assert.Equal(t, hasPartition, true)
(*node.replica).freeAll()
}
func TestCollectionReplica_removePartitionsByCollectionMeta(t *testing.T) {
ctx := context.Background()
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"p0", "p1", "p2"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
collectionMeta.PartitionTags = []string{"p0"}
err = (*node.replica).addPartitionsByCollectionMeta(&collectionMeta)
assert.NoError(t, err)
partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, 1)
hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1")
assert.Equal(t, hasPartition, false)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2")
assert.Equal(t, hasPartition, false)
(*node.replica).freeAll()
} }
func TestColSegContainer_getPartitionByTag(t *testing.T) { func TestCollectionReplica_getPartitionByTag(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
...@@ -451,10 +789,78 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) { ...@@ -451,10 +789,78 @@ func TestColSegContainer_getPartitionByTag(t *testing.T) {
assert.Equal(t, partition.partitionTag, "default") assert.Equal(t, partition.partitionTag, "default")
assert.NotNil(t, partition) assert.NotNil(t, partition)
} }
(*node.replica).freeAll()
}
func TestCollectionReplica_hasPartition(t *testing.T) {
ctx := context.Background()
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, collectionID)
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0])
assert.NoError(t, err)
hasPartition := (*node.replica).hasPartition(UniqueID(0), "default")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "default1")
assert.Equal(t, hasPartition, false)
(*node.replica).freeAll()
} }
//----------------------------------------------------------------------------------------------------- segment //----------------------------------------------------------------------------------------------------- segment
func TestColSegContainer_addSegment(t *testing.T) { func TestCollectionReplica_addSegment(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
...@@ -521,9 +927,11 @@ func TestColSegContainer_addSegment(t *testing.T) { ...@@ -521,9 +927,11 @@ func TestColSegContainer_addSegment(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i)) assert.Equal(t, targetSeg.segmentID, UniqueID(i))
} }
(*node.replica).freeAll()
} }
func TestColSegContainer_removeSegment(t *testing.T) { func TestCollectionReplica_removeSegment(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
...@@ -592,9 +1000,11 @@ func TestColSegContainer_removeSegment(t *testing.T) { ...@@ -592,9 +1000,11 @@ func TestColSegContainer_removeSegment(t *testing.T) {
err = (*node.replica).removeSegment(UniqueID(i)) err = (*node.replica).removeSegment(UniqueID(i))
assert.NoError(t, err) assert.NoError(t, err)
} }
(*node.replica).freeAll()
} }
func TestColSegContainer_getSegmentByID(t *testing.T) { func TestCollectionReplica_getSegmentByID(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
...@@ -661,9 +1071,11 @@ func TestColSegContainer_getSegmentByID(t *testing.T) { ...@@ -661,9 +1071,11 @@ func TestColSegContainer_getSegmentByID(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i)) assert.Equal(t, targetSeg.segmentID, UniqueID(i))
} }
(*node.replica).freeAll()
} }
func TestColSegContainer_hasSegment(t *testing.T) { func TestCollectionReplica_hasSegment(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0) node := NewQueryNode(ctx, 0)
...@@ -734,4 +1146,81 @@ func TestColSegContainer_hasSegment(t *testing.T) { ...@@ -734,4 +1146,81 @@ func TestColSegContainer_hasSegment(t *testing.T) {
hasSeg = (*node.replica).hasSegment(UniqueID(i + 100)) hasSeg = (*node.replica).hasSegment(UniqueID(i + 100))
assert.Equal(t, hasSeg, false) assert.Equal(t, hasSeg, false)
} }
(*node.replica).freeAll()
}
func TestCollectionReplica_freeAll(t *testing.T) {
ctx := context.Background()
node := NewQueryNode(ctx, 0)
collectionName := "collection0"
collectionID := UniqueID(0)
fieldVec := schemapb.FieldSchema{
Name: "vec",
DataType: schemapb.DataType_VECTOR_FLOAT,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "16",
},
},
}
fieldInt := schemapb.FieldSchema{
Name: "age",
DataType: schemapb.DataType_INT32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "1",
},
},
}
schema := schemapb.CollectionSchema{
Name: "collection0",
Fields: []*schemapb.FieldSchema{
&fieldVec, &fieldInt,
},
}
collectionMeta := etcdpb.CollectionMeta{
ID: collectionID,
Schema: &schema,
CreateTime: Timestamp(0),
SegmentIDs: []UniqueID{0},
PartitionTags: []string{"default"},
}
collectionMetaBlob := proto.MarshalTextString(&collectionMeta)
assert.NotEqual(t, "", collectionMetaBlob)
var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob)
assert.NoError(t, err)
collection, err := (*node.replica).getCollectionByName(collectionName)
assert.NoError(t, err)
assert.Equal(t, collection.meta.Schema.Name, collectionName)
assert.Equal(t, collection.meta.ID, UniqueID(0))
assert.Equal(t, (*node.replica).getCollectionNum(), 1)
err = (*node.replica).addPartition(collectionID, collectionMeta.PartitionTags[0])
assert.NoError(t, err)
const segmentNum = 3
for i := 0; i < segmentNum; i++ {
err := (*node.replica).addSegment(UniqueID(i), collectionMeta.PartitionTags[0], collectionID)
assert.NoError(t, err)
targetSeg, err := (*node.replica).getSegmentByID(UniqueID(i))
assert.NoError(t, err)
assert.Equal(t, targetSeg.segmentID, UniqueID(i))
hasSeg := (*node.replica).hasSegment(UniqueID(i))
assert.Equal(t, hasSeg, true)
hasSeg = (*node.replica).hasSegment(UniqueID(i + 100))
assert.Equal(t, hasSeg, false)
}
(*node.replica).freeAll()
} }
...@@ -6,8 +6,8 @@ package reader ...@@ -6,8 +6,8 @@ package reader
#cgo LDFLAGS: -L../core/output/lib -lmilvus_segcore -Wl,-rpath=../core/output/lib #cgo LDFLAGS: -L../core/output/lib -lmilvus_segcore -Wl,-rpath=../core/output/lib
#include "collection_c.h" #include "segcore/collection_c.h"
#include "segment_c.h" #include "segcore/segment_c.h"
*/ */
import "C" import "C"
......
...@@ -214,6 +214,18 @@ func (mService *metaService) processSegmentModify(id string, value string) { ...@@ -214,6 +214,18 @@ func (mService *metaService) processSegmentModify(id string, value string) {
func (mService *metaService) processCollectionModify(id string, value string) { func (mService *metaService) processCollectionModify(id string, value string) {
println("Modify Collection: ", id) println("Modify Collection: ", id)
col := mService.collectionUnmarshal(value)
if col != nil {
err := (*mService.replica).addPartitionsByCollectionMeta(col)
if err != nil {
log.Println(err)
}
err = (*mService.replica).removePartitionsByCollectionMeta(col)
if err != nil {
log.Println(err)
}
}
} }
func (mService *metaService) processModify(key string, msg string) { func (mService *metaService) processModify(key string, msg string) {
......
...@@ -452,7 +452,9 @@ func TestMetaService_processCollectionModify(t *testing.T) { ...@@ -452,7 +452,9 @@ func TestMetaService_processCollectionModify(t *testing.T) {
> >
> >
segmentIDs: 0 segmentIDs: 0
partition_tags: "default" partition_tags: "p0"
partition_tags: "p1"
partition_tags: "p2"
` `
(*node.metaService).processCollectionCreate(id, value) (*node.metaService).processCollectionCreate(id, value)
...@@ -463,7 +465,19 @@ func TestMetaService_processCollectionModify(t *testing.T) { ...@@ -463,7 +465,19 @@ func TestMetaService_processCollectionModify(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
// TODO: use different index for testing processCollectionModify partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, 3)
hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3")
assert.Equal(t, hasPartition, false)
newValue := `schema: < newValue := `schema: <
name: "test" name: "test"
fields: < fields: <
...@@ -484,13 +498,28 @@ func TestMetaService_processCollectionModify(t *testing.T) { ...@@ -484,13 +498,28 @@ func TestMetaService_processCollectionModify(t *testing.T) {
> >
> >
segmentIDs: 0 segmentIDs: 0
partition_tags: "default" partition_tags: "p1"
partition_tags: "p2"
partition_tags: "p3"
` `
(*node.metaService).processCollectionModify(id, newValue) (*node.metaService).processCollectionModify(id, newValue)
collection, err = (*node.replica).getCollectionByName("test") collection, err = (*node.replica).getCollectionByName("test")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
partitionNum, err = (*node.replica).getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, 3)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p0")
assert.Equal(t, hasPartition, false)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3")
assert.Equal(t, hasPartition, true)
} }
func TestMetaService_processModify(t *testing.T) { func TestMetaService_processModify(t *testing.T) {
...@@ -523,7 +552,9 @@ func TestMetaService_processModify(t *testing.T) { ...@@ -523,7 +552,9 @@ func TestMetaService_processModify(t *testing.T) {
> >
> >
segmentIDs: 0 segmentIDs: 0
partition_tags: "default" partition_tags: "p0"
partition_tags: "p1"
partition_tags: "p2"
` `
(*node.metaService).processCreate(key1, msg1) (*node.metaService).processCreate(key1, msg1)
...@@ -534,8 +565,21 @@ func TestMetaService_processModify(t *testing.T) { ...@@ -534,8 +565,21 @@ func TestMetaService_processModify(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
partitionNum, err := (*node.replica).getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, 3)
hasPartition := (*node.replica).hasPartition(UniqueID(0), "p0")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3")
assert.Equal(t, hasPartition, false)
key2 := "by-dev/segment/0" key2 := "by-dev/segment/0"
msg2 := `partition_tag: "default" msg2 := `partition_tag: "p1"
channel_start: 0 channel_start: 0
channel_end: 128 channel_end: 128
close_time: 18446744073709551615 close_time: 18446744073709551615
...@@ -568,7 +612,9 @@ func TestMetaService_processModify(t *testing.T) { ...@@ -568,7 +612,9 @@ func TestMetaService_processModify(t *testing.T) {
> >
> >
segmentIDs: 0 segmentIDs: 0
partition_tags: "default" partition_tags: "p1"
partition_tags: "p2"
partition_tags: "p3"
` `
(*node.metaService).processModify(key1, msg3) (*node.metaService).processModify(key1, msg3)
...@@ -576,13 +622,25 @@ func TestMetaService_processModify(t *testing.T) { ...@@ -576,13 +622,25 @@ func TestMetaService_processModify(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, collection.ID(), UniqueID(0)) assert.Equal(t, collection.ID(), UniqueID(0))
msg4 := `partition_tag: "default" partitionNum, err = (*node.replica).getPartitionNum(UniqueID(0))
assert.NoError(t, err)
assert.Equal(t, partitionNum, 3)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p0")
assert.Equal(t, hasPartition, false)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p1")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p2")
assert.Equal(t, hasPartition, true)
hasPartition = (*node.replica).hasPartition(UniqueID(0), "p3")
assert.Equal(t, hasPartition, true)
msg4 := `partition_tag: "p1"
channel_start: 0 channel_start: 0
channel_end: 128 channel_end: 128
close_time: 18446744073709551615 close_time: 18446744073709551615
` `
// TODO: modify segment for testing processCollectionModify
(*node.metaService).processModify(key2, msg4) (*node.metaService).processModify(key2, msg4)
seg, err := (*node.replica).getSegmentByID(UniqueID(0)) seg, err := (*node.replica).getSegmentByID(UniqueID(0))
assert.NoError(t, err) assert.NoError(t, err)
......
...@@ -6,8 +6,8 @@ package reader ...@@ -6,8 +6,8 @@ package reader
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#include "collection_c.h" #include "segcore/collection_c.h"
#include "segment_c.h" #include "segcore/segment_c.h"
*/ */
import "C" import "C"
......
...@@ -3,9 +3,9 @@ package reader ...@@ -3,9 +3,9 @@ package reader
/* /*
#cgo CFLAGS: -I${SRCDIR}/../core/output/include #cgo CFLAGS: -I${SRCDIR}/../core/output/include
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#include "collection_c.h" #include "segcore/collection_c.h"
#include "segment_c.h" #include "segcore/segment_c.h"
#include "plan_c.h" #include "segcore/plan_c.h"
*/ */
import "C" import "C"
import ( import (
......
...@@ -6,8 +6,8 @@ package reader ...@@ -6,8 +6,8 @@ package reader
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#include "collection_c.h" #include "segcore/collection_c.h"
#include "segment_c.h" #include "segcore/segment_c.h"
*/ */
import "C" import "C"
......
...@@ -6,10 +6,10 @@ package reader ...@@ -6,10 +6,10 @@ package reader
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib #cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
#include "collection_c.h" #include "segcore/collection_c.h"
#include "segment_c.h" #include "segcore/segment_c.h"
#include "plan_c.h" #include "segcore/plan_c.h"
#include "reduce_c.h" #include "segcore/reduce_c.h"
*/ */
import "C" import "C"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册