提交 e7ebfcb0 编写于 作者: S sunby 提交者: yefu.chen

Save index meta to meta table

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 e52b130b
......@@ -5,6 +5,8 @@ import (
"log"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/errors"
)
......@@ -77,7 +79,21 @@ func (scheduler *FlushScheduler) describe() error {
return err
}
}
//TODO: Save data to meta table
// Save data to meta table
segMeta, err := scheduler.metaTable.GetSegmentByID(singleSegmentID)
if err != nil {
return err
}
segMeta.BinlogFilePaths = make([]*etcdpb.FieldBinlogFiles, 0)
for k, v := range mapData {
segMeta.BinlogFilePaths = append(segMeta.BinlogFilePaths, &etcdpb.FieldBinlogFiles{
FieldID: k,
BinlogFiles: v,
})
}
if err = scheduler.metaTable.UpdateSegment(segMeta); err != nil {
return err
}
log.Printf("flush segment %d finished", singleSegmentID)
break
}
......
......@@ -5,6 +5,10 @@ import (
"log"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb"
)
......@@ -15,8 +19,9 @@ type IndexBuildInfo struct {
binlogFilePath []string
}
type IndexBuildChannelInfo struct {
id UniqueID
info *IndexBuildInfo
id UniqueID
info *IndexBuildInfo
indexParams []*commonpb.KeyValuePair
}
type IndexBuildScheduler struct {
......@@ -47,14 +52,43 @@ func NewIndexBuildScheduler(ctx context.Context, client BuildIndexClient, metaTa
func (scheduler *IndexBuildScheduler) schedule(info interface{}) error {
indexBuildInfo := info.(*IndexBuildInfo)
indexID, err := scheduler.client.BuildIndexWithoutID(indexBuildInfo.binlogFilePath, nil, nil)
segMeta, err := scheduler.metaTable.GetSegmentByID(indexBuildInfo.segmentID)
if err != nil {
return err
}
// parse index params
indexParams, err := scheduler.metaTable.GetFieldIndexParams(segMeta.CollectionID, indexBuildInfo.fieldID)
if err != nil {
return err
}
indexParamsMap := make(map[string]string)
for _, kv := range indexParams {
indexParamsMap[kv.Key] = kv.Value
}
indexID, err := scheduler.client.BuildIndexWithoutID(indexBuildInfo.binlogFilePath, nil, indexParamsMap)
log.Printf("build index for segment %d field %d", indexBuildInfo.segmentID, indexBuildInfo.fieldID)
if err != nil {
return err
}
err = scheduler.metaTable.AddFieldIndexMeta(&etcdpb.FieldIndexMeta{
SegmentID: indexBuildInfo.segmentID,
FieldID: indexBuildInfo.fieldID,
IndexID: indexID,
IndexParams: indexParams,
Status: indexbuilderpb.IndexStatus_NONE,
})
if err != nil {
log.Printf("WARNING: " + err.Error())
//return err
}
scheduler.indexDescribe <- &IndexBuildChannelInfo{
id: indexID,
info: indexBuildInfo,
id: indexID,
info: indexBuildInfo,
indexParams: indexParams,
}
return nil
}
......@@ -100,7 +134,18 @@ func (scheduler *IndexBuildScheduler) describe() error {
fieldName: fieldName,
indexFilePaths: filePaths,
}
//TODO: Save data to meta table
// Save data to meta table
err = scheduler.metaTable.UpdateFieldIndexMeta(&etcdpb.FieldIndexMeta{
SegmentID: indexBuildInfo.segmentID,
FieldID: indexBuildInfo.fieldID,
IndexID: indexID,
IndexParams: channelInfo.indexParams,
Status: indexbuilderpb.IndexStatus_FINISHED,
IndexFilePaths: filePaths,
})
if err != nil {
return err
}
err = scheduler.indexLoadSch.Enqueue(info)
log.Printf("build index for segment %d field %d enqueue load index", indexBuildInfo.segmentID, indexBuildInfo.fieldID)
......@@ -108,6 +153,18 @@ func (scheduler *IndexBuildScheduler) describe() error {
return err
}
log.Printf("build index for segment %d field %d finished", indexBuildInfo.segmentID, indexBuildInfo.fieldID)
} else {
// save status to meta table
err = scheduler.metaTable.UpdateFieldIndexMeta(&etcdpb.FieldIndexMeta{
SegmentID: indexBuildInfo.segmentID,
FieldID: indexBuildInfo.fieldID,
IndexID: indexID,
IndexParams: channelInfo.indexParams,
Status: description.Status,
})
if err != nil {
return err
}
}
time.Sleep(1 * time.Second)
}
......
......@@ -619,3 +619,19 @@ func (mt *metaTable) removeSegmentIndexMeta(segID UniqueID) error {
return nil
}
func (mt *metaTable) GetFieldIndexParams(collID UniqueID, fieldID UniqueID) ([]*commonpb.KeyValuePair, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
if _, ok := mt.collID2Meta[collID]; !ok {
return nil, fmt.Errorf("can not find collection with id %d", collID)
}
for _, fieldSchema := range mt.collID2Meta[collID].Schema.Fields {
if fieldSchema.FieldID == fieldID {
return fieldSchema.IndexParams, nil
}
}
return nil, fmt.Errorf("can not find field %d in collection %d", fieldID, collID)
}
......@@ -5,6 +5,9 @@ import (
"testing"
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/stretchr/testify/assert"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/proto/indexbuilderpb"
......@@ -34,6 +37,23 @@ func TestPersistenceScheduler(t *testing.T) {
assert.Nil(t, err)
defer meta.client.Close()
err = meta.AddCollection(&etcdpb.CollectionMeta{
ID: 1,
Schema: &schemapb.CollectionSchema{
Name: "testcoll",
Fields: []*schemapb.FieldSchema{
{FieldID: 1},
{FieldID: 100},
},
},
})
assert.Nil(t, err)
err = meta.AddSegment(&etcdpb.SegmentMeta{
SegmentID: 1,
CollectionID: 1,
})
assert.Nil(t, err)
//Init scheduler
indexLoadSch := NewIndexLoadScheduler(ctx, loadIndexClient, meta)
indexBuildSch := NewIndexBuildScheduler(ctx, buildIndexClient, meta, indexLoadSch)
......
......@@ -13,11 +13,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
const (
Ts = "ts"
DDL = "ddl"
)
type (
UniqueID = typeutil.UniqueID
FieldID = typeutil.UniqueID
......@@ -59,6 +54,13 @@ func (b Blob) GetValue() []byte {
return b.Value
}
type Base struct {
Version int
CommitID int
TenantID UniqueID
Schema *etcdpb.CollectionMeta
}
type FieldData interface{}
type BoolFieldData struct {
......@@ -120,14 +122,10 @@ type InsertData struct {
// Blob key example:
// ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}
type InsertCodec struct {
Schema *etcdpb.CollectionMeta
Base
readerCloseFunc []func() error
}
func NewInsertCodec(schema *etcdpb.CollectionMeta) *InsertCodec {
return &InsertCodec{Schema: schema}
}
func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) {
var blobs []*Blob
var writer *InsertBinlogWriter
......@@ -427,59 +425,20 @@ func (insertCodec *InsertCodec) Close() error {
}
// Blob key example:
// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
// ${tenant}/data_definition_log/${collection_id}/${field_type}/${log_idx}
type DataDefinitionCodec struct {
collectionID int64
Base
readerCloseFunc []func() error
}
func NewDataDefinitionCodec(collectionID int64) *DataDefinitionCodec {
return &DataDefinitionCodec{collectionID: collectionID}
}
func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) {
writer, err := NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.collectionID)
writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.Schema.ID)
if err != nil {
return nil, err
}
var blobs []*Blob
eventWriter, err := writer.NextCreateCollectionEventWriter()
if err != nil {
return nil, err
}
var int64Ts []int64
for _, singleTs := range ts {
int64Ts = append(int64Ts, int64(singleTs))
}
err = eventWriter.AddInt64ToPayload(int64Ts)
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetEndTimestamp(ts[len(ts)-1])
writer.SetStartTimeStamp(ts[0])
writer.SetEndTimeStamp(ts[len(ts)-1])
err = writer.Close()
if err != nil {
return nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: Ts,
Value: buffer,
})
writer, err = NewDDLBinlogWriter(schemapb.DataType_STRING, dataDefinitionCodec.collectionID)
if err != nil {
return nil, err
}
for pos, req := range ddRequests {
switch eventTypes[pos] {
case CreateCollectionEventType:
......@@ -534,12 +493,46 @@ func (dataDefinitionCodec *DataDefinitionCodec) Serialize(ts []Timestamp, ddRequ
if err != nil {
return nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: "",
Value: buffer,
})
writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64, dataDefinitionCodec.Schema.ID)
if err != nil {
return nil, err
}
eventWriter, err := writer.NextCreateCollectionEventWriter()
if err != nil {
return nil, err
}
var int64Ts []int64
for _, singleTs := range ts {
int64Ts = append(int64Ts, int64(singleTs))
}
err = eventWriter.AddInt64ToPayload(int64Ts)
if err != nil {
return nil, err
}
eventWriter.SetStartTimestamp(ts[0])
eventWriter.SetEndTimestamp(ts[len(ts)-1])
writer.SetStartTimeStamp(ts[0])
writer.SetEndTimeStamp(ts[len(ts)-1])
err = writer.Close()
if err != nil {
return nil, err
}
buffer, err = writer.GetBuffer()
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: DDL,
Key: "",
Value: buffer,
})
......@@ -627,10 +620,7 @@ func (dataDefinitionCodec *DataDefinitionCodec) Close() error {
//func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, error) {}
type IndexCodec struct {
}
func NewIndexCodec() *IndexCodec {
return &IndexCodec{}
Base
}
func (indexCodec *IndexCodec) Serialize(blobs []*Blob) ([]*Blob, error) {
......
......@@ -10,104 +10,112 @@ import (
)
func TestInsertCodec(t *testing.T) {
Schema := &etcdpb.CollectionMeta{
ID: 1,
CreateTime: 1,
SegmentIDs: []int64{0, 1},
PartitionTags: []string{"partition_0", "partition_1"},
Schema: &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 0,
Name: "row_id",
IsPrimaryKey: false,
Description: "row_id",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 1,
Name: "Ts",
IsPrimaryKey: false,
Description: "Ts",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 100,
Name: "field_bool",
IsPrimaryKey: false,
Description: "description_2",
DataType: schemapb.DataType_BOOL,
},
{
FieldID: 101,
Name: "field_int8",
IsPrimaryKey: false,
Description: "description_3",
DataType: schemapb.DataType_INT8,
},
{
FieldID: 102,
Name: "field_int16",
IsPrimaryKey: false,
Description: "description_4",
DataType: schemapb.DataType_INT16,
},
{
FieldID: 103,
Name: "field_int32",
IsPrimaryKey: false,
Description: "description_5",
DataType: schemapb.DataType_INT32,
},
{
FieldID: 104,
Name: "field_int64",
IsPrimaryKey: false,
Description: "description_6",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 105,
Name: "field_float",
IsPrimaryKey: false,
Description: "description_7",
DataType: schemapb.DataType_FLOAT,
},
{
FieldID: 106,
Name: "field_double",
IsPrimaryKey: false,
Description: "description_8",
DataType: schemapb.DataType_DOUBLE,
},
{
FieldID: 107,
Name: "field_string",
IsPrimaryKey: false,
Description: "description_9",
DataType: schemapb.DataType_STRING,
},
{
FieldID: 108,
Name: "field_binary_vector",
IsPrimaryKey: false,
Description: "description_10",
DataType: schemapb.DataType_VECTOR_BINARY,
},
{
FieldID: 109,
Name: "field_float_vector",
IsPrimaryKey: false,
Description: "description_11",
DataType: schemapb.DataType_VECTOR_FLOAT,
base := Base{
Version: 1,
CommitID: 1,
TenantID: 1,
Schema: &etcdpb.CollectionMeta{
ID: 1,
CreateTime: 1,
SegmentIDs: []int64{0, 1},
PartitionTags: []string{"partition_0", "partition_1"},
Schema: &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
FieldID: 0,
Name: "row_id",
IsPrimaryKey: false,
Description: "row_id",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 1,
Name: "Ts",
IsPrimaryKey: false,
Description: "Ts",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 100,
Name: "field_bool",
IsPrimaryKey: false,
Description: "description_2",
DataType: schemapb.DataType_BOOL,
},
{
FieldID: 101,
Name: "field_int8",
IsPrimaryKey: false,
Description: "description_3",
DataType: schemapb.DataType_INT8,
},
{
FieldID: 102,
Name: "field_int16",
IsPrimaryKey: false,
Description: "description_4",
DataType: schemapb.DataType_INT16,
},
{
FieldID: 103,
Name: "field_int32",
IsPrimaryKey: false,
Description: "description_5",
DataType: schemapb.DataType_INT32,
},
{
FieldID: 104,
Name: "field_int64",
IsPrimaryKey: false,
Description: "description_6",
DataType: schemapb.DataType_INT64,
},
{
FieldID: 105,
Name: "field_float",
IsPrimaryKey: false,
Description: "description_7",
DataType: schemapb.DataType_FLOAT,
},
{
FieldID: 106,
Name: "field_double",
IsPrimaryKey: false,
Description: "description_8",
DataType: schemapb.DataType_DOUBLE,
},
{
FieldID: 107,
Name: "field_string",
IsPrimaryKey: false,
Description: "description_9",
DataType: schemapb.DataType_STRING,
},
{
FieldID: 108,
Name: "field_binary_vector",
IsPrimaryKey: false,
Description: "description_10",
DataType: schemapb.DataType_VECTOR_BINARY,
},
{
FieldID: 109,
Name: "field_float_vector",
IsPrimaryKey: false,
Description: "description_11",
DataType: schemapb.DataType_VECTOR_FLOAT,
},
},
},
},
}
insertCodec := NewInsertCodec(Schema)
insertCodec := &InsertCodec{
base,
make([]func() error, 0),
}
insertDataFirst := &InsertData{
Data: map[int64]FieldData{
0: &Int64FieldData{
......@@ -260,7 +268,58 @@ func TestInsertCodec(t *testing.T) {
assert.Nil(t, insertCodec.Close())
}
func TestDDCodec(t *testing.T) {
dataDefinitionCodec := NewDataDefinitionCodec(int64(1))
base := Base{
Version: 1,
CommitID: 1,
TenantID: 1,
Schema: &etcdpb.CollectionMeta{
ID: 1,
CreateTime: 1,
SegmentIDs: []int64{0, 1},
PartitionTags: []string{"partition_0", "partition_1"},
Schema: &schemapb.CollectionSchema{
Name: "schema",
Description: "schema",
AutoID: true,
Fields: []*schemapb.FieldSchema{
{
Name: "field_1",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_INT32,
},
{
Name: "field_2",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_INT64,
},
{
Name: "field_3",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_STRING,
},
{
Name: "field_3",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_STRING,
},
{
Name: "field_3",
IsPrimaryKey: false,
Description: "description_1",
DataType: schemapb.DataType_STRING,
},
},
},
},
}
dataDefinitionCodec := &DataDefinitionCodec{
base,
make([]func() error, 0),
}
ts := []Timestamp{
1,
2,
......@@ -292,7 +351,9 @@ func TestDDCodec(t *testing.T) {
}
func TestIndexCodec(t *testing.T) {
indexCodec := NewIndexCodec()
indexCodec := &IndexCodec{
Base{},
}
blobs := []*Blob{
{
"12345",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册