提交 dd6c4bfb 编写于 作者: G GuoRentong 提交者: yefu.chen

Update doc:interface definitions related to LoadCollection

Signed-off-by: NGuoRentong <rentong.guo@zilliz.com>
上级 9ff1bc0f
...@@ -96,17 +96,26 @@ type TsMsg interface { ...@@ -96,17 +96,26 @@ type TsMsg interface {
Unmarshal([]byte) *TsMsg Unmarshal([]byte) *TsMsg
} }
type MsgPosition {
ChannelName string
MsgID string
TimestampFilter Timestamp
}
type MsgPack struct { type MsgPack struct {
BeginTs Timestamp BeginTs Timestamp
EndTs Timestamp EndTs Timestamp
Msgs []TsMsg Msgs []TsMsg
StartPositions []MsgPosition
EndPositions []MsgPosition
} }
type MsgStream interface { type MsgStream interface {
Produce(*MsgPack) error Produce(*MsgPack) error
Broadcast(*MsgPack) error Broadcast(*MsgPack) error
Consume() *MsgPack // message can be consumed exactly once Consume() *MsgPack // message can be consumed exactly once
ShowChannelNames() []string
Seek(offset MsgPosition) error
} }
type RepackFunc(msgs []* TsMsg, hashKeys [][]int32) map[int32] *MsgPack type RepackFunc(msgs []* TsMsg, hashKeys [][]int32) map[int32] *MsgPack
......
...@@ -23,6 +23,9 @@ type Master interface { ...@@ -23,6 +23,9 @@ type Master interface {
GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error) GetPartitionStatistics(req PartitionStatsRequest) (PartitionStatsResponse, error)
ShowPartitions(req ShowPartitionRequest) (ShowPartitionResponse, error) ShowPartitions(req ShowPartitionRequest) (ShowPartitionResponse, error)
DescribeSegment(req DescribeSegmentRequest) (DescribeSegmentResponse, error)
ShowSegments(req ShowSegmentRequest) (ShowSegmentResponse, error)
CreateIndex(req CreateIndexRequest) error CreateIndex(req CreateIndexRequest) error
DescribeIndex(DescribeIndexRequest) (DescribeIndexResponse, error) DescribeIndex(DescribeIndexRequest) (DescribeIndexResponse, error)
...@@ -87,6 +90,10 @@ type DescribeCollectionRequest struct { ...@@ -87,6 +90,10 @@ type DescribeCollectionRequest struct {
} }
type DescribeCollectionResponse struct { type DescribeCollectionResponse struct {
DbID UniqueID
CollectionID UniqueID
DefaultPartitionName string
DefaultPartitionID UniqueID
Schema []bytes Schema []bytes
} }
``` ```
...@@ -115,6 +122,7 @@ type ShowCollectionRequest struct { ...@@ -115,6 +122,7 @@ type ShowCollectionRequest struct {
type ShowCollectionResponse struct { type ShowCollectionResponse struct {
CollectionNames []string CollectionNames []string
CollectionIDs []UniqueID
} }
``` ```
...@@ -176,10 +184,39 @@ type ShowPartitionRequest struct { ...@@ -176,10 +184,39 @@ type ShowPartitionRequest struct {
} }
type ShowPartitionResponse struct { type ShowPartitionResponse struct {
PartitionIDs []UniqueID
PartitionNames []string PartitionNames []string
} }
``` ```
* DescribeSegment
```go
type DescribeSegmentRequest struct {
MsgBase
CollectionID UniqueID
SegmentID UniqueID
}
type DescribeSegmentResponse struct {
IndexID UniqueID
}
```
* ShowSegments
```go
type ShowSegmentRequest struct {
MsgBase
CollectionID UniqueID
PartitionID UniqueID
}
type ShowSegmentResponse struct {
SegmentIDs []UniqueID
}
```
* *CreateIndex* * *CreateIndex*
```go ```go
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
#### 8.1 Overview #### 8.1 Overview
<img src="./figs/query_service.png" width=700> <img src="./figs/query_service.png" width=500>
...@@ -230,6 +230,8 @@ type RemoveQueryChannelRequest struct { ...@@ -230,6 +230,8 @@ type RemoveQueryChannelRequest struct {
```go ```go
type WatchDmChannelRequest struct { type WatchDmChannelRequest struct {
InsertChannelNames []string InsertChannelNames []string
StartSegment UniqueID
//FieldIDs []int64
} }
``` ```
...@@ -242,7 +244,7 @@ type LoadSegmentRequest struct { ...@@ -242,7 +244,7 @@ type LoadSegmentRequest struct {
CollectionID UniqueID CollectionID UniqueID
PartitionID UniqueID PartitionID UniqueID
SegmentIDs []UniqueID SegmentIDs []UniqueID
FieldIDs []int64 //FieldIDs []int64
} }
``` ```
......
...@@ -129,8 +129,10 @@ type SegmentStatesRequest struct { ...@@ -129,8 +129,10 @@ type SegmentStatesRequest struct {
type SegmentStatesResponse struct { type SegmentStatesResponse struct {
State SegmentState State SegmentState
CreateTime Timestamp OpenTime Timestamp
SealedTime Timestamp SealedTime Timestamp
MsgStartPositions []msgstream.MsgPosition
MsgEndPositions []msgstream.MsgPosition
} }
``` ```
......
...@@ -146,7 +146,21 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com ...@@ -146,7 +146,21 @@ func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*com
} }
func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) { func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
// TODO: implement // TODO: support db
for _, segmentID := range in.SegmentIDs {
hasBeenBuiltIndex := segmentID > 0 // TODO: ???
indexID := UniqueID(0) // TODO: ???
err := node.segManager.loadSegment(segmentID, hasBeenBuiltIndex, indexID, in.FieldIDs)
if err != nil {
// TODO: return or continue
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
return status, err
}
}
return nil, nil return nil, nil
} }
......
...@@ -53,6 +53,8 @@ type QueryNode struct { ...@@ -53,6 +53,8 @@ type QueryNode struct {
loadIndexService *loadIndexService loadIndexService *loadIndexService
statsService *statsService statsService *statsService
segManager *segmentManager
//opentracing //opentracing
tracer opentracing.Tracer tracer opentracing.Tracer
closer io.Closer closer io.Closer
......
package querynode
import (
"context"
"errors"
"fmt"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/querynode/client"
"github.com/zilliztech/milvus-distributed/internal/storage"
)
type segmentManager struct {
replica collectionReplica
// TODO: replace by client instead of grpc client
dataClient datapb.DataServiceClient
indexBuilderClient indexpb.IndexServiceClient
queryNodeClient *client.Client
kv kv.Base // minio kv
iCodec storage.InsertCodec
}
func (s *segmentManager) loadSegment(segmentID UniqueID, hasBeenBuiltIndex bool, indexID UniqueID, vecFieldIDs []int64) error {
// 1. load segment
req := &datapb.InsertBinlogPathRequest{
SegmentID: segmentID,
}
pathResponse, err := s.dataClient.GetInsertBinlogPaths(context.TODO(), req)
if err != nil {
return err
}
if len(pathResponse.FieldIDs) != len(pathResponse.Paths) {
return errors.New("illegal InsertBinlogPathsResponse")
}
for fieldID, i := range pathResponse.FieldIDs {
paths := pathResponse.Paths[i].Values
blobs := make([]*storage.Blob, 0)
for _, path := range paths {
binLog, err := s.kv.Load(path)
if err != nil {
// TODO: return or continue?
return err
}
blobs = append(blobs, &storage.Blob{
Key: "", // TODO: key???
Value: []byte(binLog),
})
}
_, _, insertData, err := s.iCodec.Deserialize(blobs)
if err != nil {
// TODO: return or continue
return err
}
if len(insertData.Data) != 1 {
return errors.New("we expect only one field in deserialized insert data")
}
for _, value := range insertData.Data {
switch fieldData := value.(type) {
case storage.BoolFieldData:
numRows := fieldData.NumRows
data := fieldData.Data
fmt.Println(numRows, data, fieldID)
// TODO: s.replica.addSegment()
case storage.Int8FieldData:
// TODO: s.replica.addSegment()
case storage.Int16FieldData:
// TODO: s.replica.addSegment()
case storage.Int32FieldData:
// TODO: s.replica.addSegment()
case storage.Int64FieldData:
// TODO: s.replica.addSegment()
case storage.FloatFieldData:
// TODO: s.replica.addSegment()
case storage.DoubleFieldData:
// TODO: s.replica.addSegment()
default:
// TODO: what if the index has not been built ?
// does the info from hasBeenBuiltIndex is synced with the dataService?
return errors.New("unsupported field data type")
}
}
}
// 2. load index
// does the info from hasBeenBuiltIndex is synced with the dataService?
if !hasBeenBuiltIndex {
req := &indexpb.IndexFilePathRequest{
IndexID: indexID,
}
pathResponse, err := s.indexBuilderClient.GetIndexFilePaths(context.TODO(), req)
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return err
}
targetSegment, err := s.replica.getSegmentByID(segmentID)
if err != nil {
return err
}
for _, vecFieldID := range vecFieldIDs {
targetIndexParam, ok := targetSegment.indexParam[vecFieldID]
if !ok {
return errors.New(fmt.Sprint("cannot found index params in segment ", segmentID, " with field = ", vecFieldID))
}
err := s.queryNodeClient.LoadIndex(pathResponse.IndexFilePaths, segmentID, vecFieldID, "", targetIndexParam)
if err != nil {
return err
}
}
}
return nil
}
func (s *segmentManager) releaseSegment(in *queryPb.ReleaseSegmentRequest) error {
// TODO: implement
// TODO: release specific field, we need segCore supply relevant interface
return nil
}
...@@ -130,7 +130,7 @@ func TestDataSyncService_Start(t *testing.T) { ...@@ -130,7 +130,7 @@ func TestDataSyncService_Start(t *testing.T) {
InsertRequest: internalPb.InsertRequest{ InsertRequest: internalPb.InsertRequest{
MsgType: commonpb.MsgType_kInsert, MsgType: commonpb.MsgType_kInsert,
ReqID: UniqueID(0), ReqID: UniqueID(0),
CollectionName: "coll1", CollectionName: "col1",
PartitionTag: "default", PartitionTag: "default",
SegmentID: UniqueID(1), SegmentID: UniqueID(1),
ChannelID: UniqueID(0), ChannelID: UniqueID(0),
...@@ -206,7 +206,7 @@ func TestDataSyncService_Start(t *testing.T) { ...@@ -206,7 +206,7 @@ func TestDataSyncService_Start(t *testing.T) {
<-ctx.Done() <-ctx.Done()
} }
func newMeta() { func newMeta() *etcdpb.CollectionMeta {
ETCDAddr := Params.EtcdAddress ETCDAddr := Params.EtcdAddress
MetaRootPath := Params.MetaRootPath MetaRootPath := Params.MetaRootPath
...@@ -375,4 +375,6 @@ func newMeta() { ...@@ -375,4 +375,6 @@ func newMeta() {
segBytes := proto.MarshalTextString(&segSch) segBytes := proto.MarshalTextString(&segSch)
kvClient.Save("/segment/"+strconv.FormatInt(segSch.SegmentID, 10), segBytes) kvClient.Save("/segment/"+strconv.FormatInt(segSch.SegmentID, 10), segBytes)
return &collection
} }
...@@ -127,24 +127,8 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { ...@@ -127,24 +127,8 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
} }
} }
// Timestamps
_, ok = idata.Data[1].(*storage.Int64FieldData)
if !ok {
idata.Data[1] = &storage.Int64FieldData{
Data: []int64{},
NumRows: 0,
}
}
tsData := idata.Data[1].(*storage.Int64FieldData)
for _, ts := range msg.Timestamps {
tsData.Data = append(tsData.Data, int64(ts))
}
tsData.NumRows += len(msg.Timestamps)
span.LogFields(oplog.Int("tsData numRows", tsData.NumRows))
// 1.1 Get CollectionMeta from etcd // 1.1 Get CollectionMeta from etcd
collection, err := ibNode.replica.getCollectionByName(collectionName) collection, err := ibNode.replica.getCollectionByName(collectionName)
//collSchema, err := ibNode.getCollectionSchemaByName(collectionName)
if err != nil { if err != nil {
// GOOSE TODO add error handler // GOOSE TODO add error handler
log.Println("bbb, Get meta wrong:", err) log.Println("bbb, Get meta wrong:", err)
...@@ -183,18 +167,20 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { ...@@ -183,18 +167,20 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData) fieldData := idata.Data[field.FieldID].(*storage.FloatVectorFieldData)
var offset int
for _, blob := range msg.RowData { for _, blob := range msg.RowData {
offset = 0
for j := 0; j < dim; j++ { for j := 0; j < dim; j++ {
var v float32 var v float32
buf := bytes.NewBuffer(blob.GetValue()[pos:]) buf := bytes.NewBuffer(blob.GetValue()[pos+offset:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.read float32 err:", err) log.Println("binary.read float32 err:", err)
} }
fieldData.Data = append(fieldData.Data, v) fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v))) offset += int(unsafe.Sizeof(*(&v)))
} }
} }
pos += offset
fieldData.NumRows += len(msg.RowIDs) fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_VECTOR_BINARY: case schemapb.DataType_VECTOR_BINARY:
...@@ -222,13 +208,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { ...@@ -222,13 +208,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
} }
fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData) fieldData := idata.Data[field.FieldID].(*storage.BinaryVectorFieldData)
var offset int
for _, blob := range msg.RowData { for _, blob := range msg.RowData {
bv := blob.GetValue()[pos : pos+(dim/8)] bv := blob.GetValue()[pos+offset : pos+(dim/8)]
fieldData.Data = append(fieldData.Data, bv...) fieldData.Data = append(fieldData.Data, bv...)
pos += len(bv) offset = len(bv)
} }
pos += offset
fieldData.NumRows += len(msg.RowData) fieldData.NumRows += len(msg.RowData)
case schemapb.DataType_BOOL: case schemapb.DataType_BOOL:
if _, ok := idata.Data[field.FieldID]; !ok { if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.BoolFieldData{ idata.Data[field.FieldID] = &storage.BoolFieldData{
...@@ -238,17 +226,18 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { ...@@ -238,17 +226,18 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
} }
fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData) fieldData := idata.Data[field.FieldID].(*storage.BoolFieldData)
var v bool
for _, blob := range msg.RowData { for _, blob := range msg.RowData {
var v bool
buf := bytes.NewReader(blob.GetValue()[pos:]) buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read bool failed:", err) log.Println("binary.Read bool failed:", err)
} }
fieldData.Data = append(fieldData.Data, v) fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
}
}
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs) fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_INT8: case schemapb.DataType_INT8:
if _, ok := idata.Data[field.FieldID]; !ok { if _, ok := idata.Data[field.FieldID]; !ok {
idata.Data[field.FieldID] = &storage.Int8FieldData{ idata.Data[field.FieldID] = &storage.Int8FieldData{
...@@ -258,15 +247,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { ...@@ -258,15 +247,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
} }
fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData) fieldData := idata.Data[field.FieldID].(*storage.Int8FieldData)
var v int8
for _, blob := range msg.RowData { for _, blob := range msg.RowData {
var v int8
buf := bytes.NewReader(blob.GetValue()[pos:]) buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int8 failed:", err) log.Println("binary.Read int8 failed:", err)
} }
fieldData.Data = append(fieldData.Data, v) fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
} }
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs) fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_INT16: case schemapb.DataType_INT16:
...@@ -278,16 +267,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { ...@@ -278,16 +267,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
} }
fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData) fieldData := idata.Data[field.FieldID].(*storage.Int16FieldData)
var v int16
for _, blob := range msg.RowData { for _, blob := range msg.RowData {
var v int16
buf := bytes.NewReader(blob.GetValue()[pos:]) buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int16 failed:", err) log.Println("binary.Read int16 failed:", err)
} }
fieldData.Data = append(fieldData.Data, v) fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
} }
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs) fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_INT32: case schemapb.DataType_INT32:
...@@ -299,15 +287,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { ...@@ -299,15 +287,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
} }
fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData) fieldData := idata.Data[field.FieldID].(*storage.Int32FieldData)
var v int32
for _, blob := range msg.RowData { for _, blob := range msg.RowData {
var v int32
buf := bytes.NewReader(blob.GetValue()[pos:]) buf := bytes.NewReader(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int32 failed:", err) log.Println("binary.Read int32 failed:", err)
} }
fieldData.Data = append(fieldData.Data, v) fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
} }
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs) fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_INT64: case schemapb.DataType_INT64:
...@@ -320,27 +308,24 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { ...@@ -320,27 +308,24 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData) fieldData := idata.Data[field.FieldID].(*storage.Int64FieldData)
switch field.FieldID { switch field.FieldID {
case 0: case 0: // rowIDs
fieldData.Data = append(fieldData.Data, msg.RowIDs...) fieldData.Data = append(fieldData.Data, msg.RowIDs...)
fieldData.NumRows += len(msg.RowIDs) fieldData.NumRows += len(msg.RowIDs)
case 1: case 1: // Timestamps
// Timestamps
for _, ts := range msg.Timestamps { for _, ts := range msg.Timestamps {
fieldData.Data = append(fieldData.Data, int64(ts)) fieldData.Data = append(fieldData.Data, int64(ts))
} }
fieldData.NumRows += len(msg.Timestamps) fieldData.NumRows += len(msg.Timestamps)
default: default:
var v int64
for _, blob := range msg.RowData { for _, blob := range msg.RowData {
var v int64
buf := bytes.NewBuffer(blob.GetValue()[pos:]) buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read int64 failed:", err) log.Println("binary.Read int64 failed:", err)
} }
fieldData.Data = append(fieldData.Data, v) fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
} }
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs) fieldData.NumRows += len(msg.RowIDs)
} }
...@@ -353,16 +338,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { ...@@ -353,16 +338,15 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
} }
fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData) fieldData := idata.Data[field.FieldID].(*storage.FloatFieldData)
var v float32
for _, blob := range msg.RowData { for _, blob := range msg.RowData {
var v float32
buf := bytes.NewBuffer(blob.GetValue()[pos:]) buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read float32 failed:", err) log.Println("binary.Read float32 failed:", err)
} }
fieldData.Data = append(fieldData.Data, v) fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
} }
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs) fieldData.NumRows += len(msg.RowIDs)
case schemapb.DataType_DOUBLE: case schemapb.DataType_DOUBLE:
...@@ -374,16 +358,16 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg { ...@@ -374,16 +358,16 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
} }
fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData) fieldData := idata.Data[field.FieldID].(*storage.DoubleFieldData)
var v float64
for _, blob := range msg.RowData { for _, blob := range msg.RowData {
var v float64
buf := bytes.NewBuffer(blob.GetValue()[pos:]) buf := bytes.NewBuffer(blob.GetValue()[pos:])
if err := binary.Read(buf, binary.LittleEndian, &v); err != nil { if err := binary.Read(buf, binary.LittleEndian, &v); err != nil {
log.Println("binary.Read float64 failed:", err) log.Println("binary.Read float64 failed:", err)
} }
fieldData.Data = append(fieldData.Data, v) fieldData.Data = append(fieldData.Data, v)
pos += int(unsafe.Sizeof(*(&v)))
} }
pos += int(unsafe.Sizeof(*(&v)))
fieldData.NumRows += len(msg.RowIDs) fieldData.NumRows += len(msg.RowIDs)
} }
} }
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
...@@ -46,11 +47,16 @@ func TestFlowGraphInputBufferNode_Operate(t *testing.T) { ...@@ -46,11 +47,16 @@ func TestFlowGraphInputBufferNode_Operate(t *testing.T) {
assert.Equal(t, testPath, fService.metaTable.client.(*etcdkv.EtcdKV).GetPath(".")) assert.Equal(t, testPath, fService.metaTable.client.(*etcdkv.EtcdKV).GetPath("."))
go fService.start() go fService.start()
// Params.FlushInsertBufSize = 2 collMeta := newMeta()
schemaBlob := proto.MarshalTextString(collMeta.Schema)
require.NotEqual(t, "", schemaBlob)
replica := newReplica() replica := newReplica()
iBNode := newInsertBufferNode(ctx, insertChan, replica) err = replica.addCollection(collMeta.ID, schemaBlob)
require.NoError(t, err)
newMeta() // Params.FlushInsertBufSize = 2
iBNode := newInsertBufferNode(ctx, insertChan, replica)
inMsg := genInsertMsg() inMsg := genInsertMsg()
var iMsg flowgraph.Msg = &inMsg var iMsg flowgraph.Msg = &inMsg
iBNode.Operate([]*flowgraph.Msg{&iMsg}) iBNode.Operate([]*flowgraph.Msg{&iMsg})
...@@ -70,14 +76,12 @@ func genInsertMsg() insertMsg { ...@@ -70,14 +76,12 @@ func genInsertMsg() insertMsg {
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele)) binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
rawData = append(rawData, buf...) rawData = append(rawData, buf...)
} }
log.Println(len(rawData))
// Binary vector // Binary vector
// Dimension of binary vector is 32 // Dimension of binary vector is 32
// size := 4, = 32 / 8 // size := 4, = 32 / 8
var bvector = []byte{255, 255, 255, 0} var bvector = []byte{255, 255, 255, 0}
rawData = append(rawData, bvector...) rawData = append(rawData, bvector...)
log.Println(len(rawData))
// Bool // Bool
var fieldBool = true var fieldBool = true
...@@ -87,7 +91,6 @@ func genInsertMsg() insertMsg { ...@@ -87,7 +91,6 @@ func genInsertMsg() insertMsg {
} }
rawData = append(rawData, buf.Bytes()...) rawData = append(rawData, buf.Bytes()...)
log.Println(len(rawData))
// int8 // int8
var dataInt8 int8 = 100 var dataInt8 int8 = 100
...@@ -96,7 +99,6 @@ func genInsertMsg() insertMsg { ...@@ -96,7 +99,6 @@ func genInsertMsg() insertMsg {
panic(err) panic(err)
} }
rawData = append(rawData, bint8.Bytes()...) rawData = append(rawData, bint8.Bytes()...)
log.Println(len(rawData))
// int16 // int16
var dataInt16 int16 = 200 var dataInt16 int16 = 200
...@@ -105,7 +107,6 @@ func genInsertMsg() insertMsg { ...@@ -105,7 +107,6 @@ func genInsertMsg() insertMsg {
panic(err) panic(err)
} }
rawData = append(rawData, bint16.Bytes()...) rawData = append(rawData, bint16.Bytes()...)
log.Println(len(rawData))
// int32 // int32
var dataInt32 int32 = 300 var dataInt32 int32 = 300
...@@ -114,16 +115,14 @@ func genInsertMsg() insertMsg { ...@@ -114,16 +115,14 @@ func genInsertMsg() insertMsg {
panic(err) panic(err)
} }
rawData = append(rawData, bint32.Bytes()...) rawData = append(rawData, bint32.Bytes()...)
log.Println(len(rawData))
// int64 // int64
var dataInt64 int64 = 300 var dataInt64 int64 = 400
bint64 := new(bytes.Buffer) bint64 := new(bytes.Buffer)
if err := binary.Write(bint64, binary.LittleEndian, dataInt64); err != nil { if err := binary.Write(bint64, binary.LittleEndian, dataInt64); err != nil {
panic(err) panic(err)
} }
rawData = append(rawData, bint64.Bytes()...) rawData = append(rawData, bint64.Bytes()...)
log.Println(len(rawData))
// float32 // float32
var datafloat float32 = 1.1 var datafloat float32 = 1.1
...@@ -132,7 +131,6 @@ func genInsertMsg() insertMsg { ...@@ -132,7 +131,6 @@ func genInsertMsg() insertMsg {
panic(err) panic(err)
} }
rawData = append(rawData, bfloat32.Bytes()...) rawData = append(rawData, bfloat32.Bytes()...)
log.Println(len(rawData))
// float64 // float64
var datafloat64 float64 = 2.2 var datafloat64 float64 = 2.2
...@@ -141,7 +139,7 @@ func genInsertMsg() insertMsg { ...@@ -141,7 +139,7 @@ func genInsertMsg() insertMsg {
panic(err) panic(err)
} }
rawData = append(rawData, bfloat64.Bytes()...) rawData = append(rawData, bfloat64.Bytes()...)
log.Println(len(rawData)) log.Println("Test rawdata length:", len(rawData))
timeRange := TimeRange{ timeRange := TimeRange{
timestampMin: 0, timestampMin: 0,
...@@ -170,16 +168,32 @@ func genInsertMsg() insertMsg { ...@@ -170,16 +168,32 @@ func genInsertMsg() insertMsg {
InsertRequest: internalpb.InsertRequest{ InsertRequest: internalpb.InsertRequest{
MsgType: commonpb.MsgType_kInsert, MsgType: commonpb.MsgType_kInsert,
ReqID: UniqueID(0), ReqID: UniqueID(0),
CollectionName: "coll1", CollectionName: "col1",
PartitionTag: "default", PartitionTag: "default",
SegmentID: UniqueID(1), SegmentID: UniqueID(1),
ChannelID: UniqueID(0), ChannelID: UniqueID(0),
ProxyID: UniqueID(0), ProxyID: UniqueID(0),
Timestamps: []Timestamp{Timestamp(i + 1000)}, Timestamps: []Timestamp{
RowIDs: []UniqueID{UniqueID(i)}, Timestamp(i + 1000),
Timestamp(i + 1000),
Timestamp(i + 1000),
Timestamp(i + 1000),
Timestamp(i + 1000),
},
RowIDs: []UniqueID{
UniqueID(i),
UniqueID(i),
UniqueID(i),
UniqueID(i),
UniqueID(i),
},
RowData: []*commonpb.Blob{ RowData: []*commonpb.Blob{
{Value: rawData}, {Value: rawData},
{Value: rawData},
{Value: rawData},
{Value: rawData},
{Value: rawData},
}, },
}, },
} }
...@@ -193,9 +207,11 @@ func genInsertMsg() insertMsg { ...@@ -193,9 +207,11 @@ func genInsertMsg() insertMsg {
}, },
}, },
FlushMsg: internalpb.FlushMsg{ FlushMsg: internalpb.FlushMsg{
MsgType: commonpb.MsgType_kFlush, MsgType: commonpb.MsgType_kFlush,
SegmentID: UniqueID(1), SegmentID: UniqueID(1),
Timestamp: Timestamp(2000), Timestamp: Timestamp(2000),
CollectionID: UniqueID(1),
PartitionTag: "default",
}, },
} }
iMsg.flushMessages = append(iMsg.flushMessages, &fmsg) iMsg.flushMessages = append(iMsg.flushMessages, &fmsg)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册