提交 aab6dede 编写于 作者: X XuanYang-cn 提交者: yefu.chen

Add Flush for DDBuffer and insertBuffer

Signed-off-by: NXuanYang-cn <xuan.yang@zilliz.com>
上级 11cef6e9
......@@ -3,7 +3,6 @@ package writenode
import (
"context"
"encoding/binary"
"fmt"
"math"
"strconv"
"testing"
......@@ -343,8 +342,6 @@ func newMeta() {
collBytes := proto.MarshalTextString(&collection)
kvClient.Save("/collection/"+strconv.FormatInt(collection.ID, 10), collBytes)
value, _ := kvClient.Load("/collection/1")
fmt.Println("========value: ", value)
segSch := etcdpb.SegmentMeta{
SegmentID: UniqueID(1),
......
......@@ -38,7 +38,7 @@ type ddData struct {
}
type ddBuffer struct {
ddData map[UniqueID]*ddData
ddData map[UniqueID]*ddData // collection ID
maxSize int
}
......@@ -88,6 +88,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
timestampMin: msMsg.TimestampMin(),
timestampMax: msMsg.TimestampMax(),
},
flushMessages: make([]*msgstream.FlushMsg, 0),
}
ddNode.ddMsg = &ddMsg
......@@ -98,6 +99,8 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
return tsMessages[i].BeginTs() < tsMessages[j].BeginTs()
})
var flush bool = false
var flushSegID UniqueID
// do dd tasks
for _, msg := range tsMessages {
switch msg.Type() {
......@@ -109,13 +112,19 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
ddNode.createPartition(msg.(*msgstream.CreatePartitionMsg))
case internalPb.MsgType_kDropPartition:
ddNode.dropPartition(msg.(*msgstream.DropPartitionMsg))
case internalPb.MsgType_kFlush:
fMsg := msg.(*msgstream.FlushMsg)
flush = true
flushSegID = fMsg.SegmentID
ddMsg.flushMessages = append(ddMsg.flushMessages, fMsg)
default:
log.Println("Non supporting message type:", msg.Type())
}
}
// generate binlog
if ddNode.ddBuffer.full() {
if ddNode.ddBuffer.full() || flush {
log.Println(". dd buffer full or receive Flush msg ...")
ddCodec := &storage.DataDefinitionCodec{}
for collectionID, data := range ddNode.ddBuffer.ddData {
// buffer data to binlog
......@@ -135,6 +144,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
log.Println("illegal ddBuffer, failed to save binlog")
continue
} else {
log.Println(".. dd buffer flushing ...")
// Blob key example:
// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
// ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx}
......@@ -163,11 +173,35 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
log.Println(err)
}
log.Println("save dd binlog, key = ", ddKey)
ddlFlushMsg := &ddlFlushSyncMsg{
flushCompleted: false,
ddlBinlogPathMsg: ddlBinlogPathMsg{
collID: collectionID,
paths: []string{timestampKey, ddKey},
},
}
ddNode.outCh <- ddlFlushMsg
}
}
// clear buffer
ddNode.ddBuffer.ddData = make(map[UniqueID]*ddData)
log.Println("dd buffer flushed")
}
if flush {
log.Println(".. manual flush completed ...")
ddlFlushMsg := &ddlFlushSyncMsg{
flushCompleted: true,
ddlBinlogPathMsg: ddlBinlogPathMsg{
segID: flushSegID,
},
}
ddNode.outCh <- ddlFlushMsg
}
var res Msg = ddNode.ddMsg
......
......@@ -5,6 +5,10 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
......@@ -13,6 +17,7 @@ import (
)
func TestFlowGraphDDNode_Operate(t *testing.T) {
newMeta()
const ctxTimeInMillisecond = 2000
const closeWithDeadline = false
var ctx context.Context
......@@ -26,9 +31,22 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
ctx = context.Background()
}
ddChan := make(chan *ddlFlushSyncMsg, 10)
defer close(ddChan)
insertChan := make(chan *insertFlushSyncMsg, 10)
defer close(insertChan)
testPath := "/test/writenode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
Params.MetaRootPath = testPath
fService := newFlushSyncService(ctx, ddChan, insertChan)
assert.Equal(t, testPath, fService.metaTable.client.(*etcdkv.EtcdKV).GetPath("."))
go fService.start()
Params.FlushDdBufSize = 4
ddNode := newDDNode(ctx, nil)
ddNode := newDDNode(ctx, ddChan)
colID := UniqueID(0)
colName := "col-test-0"
......@@ -114,11 +132,25 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
DropPartitionRequest: dropPartitionReq,
}
flushMsg := msgstream.FlushMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: Timestamp(5),
EndTimestamp: Timestamp(5),
HashValues: []uint32{uint32(0)},
},
FlushMsg: internalpb.FlushMsg{
MsgType: internalpb.MsgType_kFlush,
SegmentID: 1,
Timestamp: Timestamp(6),
},
}
tsMessages := make([]msgstream.TsMsg, 0)
tsMessages = append(tsMessages, msgstream.TsMsg(&createColMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropColMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&flushMsg))
msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3))
var inMsg Msg = msgStream
ddNode.Operate([]*Msg{&inMsg})
......
......@@ -46,6 +46,16 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
timestampMax: msgStreamMsg.TimestampMax(),
},
}
for _, fmsg := range ddMsg.flushMessages {
switch fmsg.Type() {
case internalPb.MsgType_kFlush:
iMsg.flushMessages = append(iMsg.flushMessages, fmsg)
default:
log.Println("Non supporting message type:", fmsg.Type())
}
}
for _, msg := range msgStreamMsg.TsMessages() {
switch msg.Type() {
case internalPb.MsgType_kInsert:
......@@ -53,8 +63,6 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
if resMsg != nil {
iMsg.insertMessages = append(iMsg.insertMessages, resMsg)
}
case internalPb.MsgType_kFlush:
iMsg.flushMessages = append(iMsg.flushMessages, msg.(*msgstream.FlushMsg))
// case internalPb.MsgType_kDelete:
// dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask))
default:
......
......@@ -62,13 +62,13 @@ func (ib *insertBuffer) size(segmentID UniqueID) int {
maxSize := 0
for _, data := range idata.Data {
fdata, ok := data.(storage.FloatVectorFieldData)
if ok && len(fdata.Data) > maxSize {
fdata, ok := data.(*storage.FloatVectorFieldData)
if ok && fdata.NumRows > maxSize {
maxSize = len(fdata.Data)
}
bdata, ok := data.(storage.BinaryVectorFieldData)
if ok && len(bdata.Data) > maxSize {
bdata, ok := data.(*storage.BinaryVectorFieldData)
if ok && bdata.NumRows > maxSize {
maxSize = len(bdata.Data)
}
......@@ -77,7 +77,6 @@ func (ib *insertBuffer) size(segmentID UniqueID) int {
}
func (ib *insertBuffer) full(segmentID UniqueID) bool {
// GOOSE TODO
return ib.size(segmentID) >= ib.maxSize
}
......@@ -131,23 +130,10 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
tsData.NumRows += len(msg.Timestamps)
// 1.1 Get CollectionMeta from etcd
segMeta := etcdpb.SegmentMeta{}
key := path.Join(SegmentPrefix, strconv.FormatInt(currentSegID, 10))
value, _ := ibNode.kvClient.Load(key)
err := proto.UnmarshalText(value, &segMeta)
segMeta, collMeta, err := ibNode.getMeta(currentSegID)
if err != nil {
log.Println("Load segMeta error")
// TODO: add error handling
}
collMeta := etcdpb.CollectionMeta{}
key = path.Join(CollectionPrefix, strconv.FormatInt(segMeta.GetCollectionID(), 10))
value, _ = ibNode.kvClient.Load(key)
err = proto.UnmarshalText(value, &collMeta)
if err != nil {
log.Println("Load collMeta error")
// TODO: add error handling
// GOOSE TODO add error handler
log.Println("Get meta wrong")
}
// 1.2 Get Fields
......@@ -378,6 +364,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
// 1.5 if full
// 1.5.1 generate binlogs
if ibNode.insertBuffer.full(currentSegID) {
log.Println("Insert Buffer full, auto flushing ...")
// partitionTag -> partitionID
partitionTag := msg.GetPartitionTag()
partitionID, err := typeutil.Hash32String(partitionTag)
......@@ -385,20 +372,17 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
log.Println("partitionTag to partitionID Wrong")
}
inCodec := storage.NewInsertCodec(&collMeta)
inCodec := storage.NewInsertCodec(collMeta)
// buffer data to binlogs
binLogs, err := inCodec.Serialize(partitionID,
currentSegID, ibNode.insertBuffer.insertData[currentSegID])
for _, v := range binLogs {
log.Println("key ", v.Key, "- value ", v.Value)
}
if err != nil {
log.Println("generate binlog wrong")
}
// clear buffer
log.Println("=========", binLogs)
delete(ibNode.insertBuffer.insertData, currentSegID)
// 1.5.2 binLogs -> minIO/S3
......@@ -420,14 +404,117 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
log.Println("Save to MinIO failed")
// GOOSE TODO error handle
}
log.Println(".. Saving binlogs to MinIO ...")
fieldID, err := strconv.ParseInt(blob.Key, 10, 32)
if err != nil {
log.Println("string to fieldID wrong")
// GOOSE TODO error handle
}
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: false,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: iMsg.timeRange.timestampMax,
segID: currentSegID,
fieldID: int32(fieldID),
paths: []string{key},
},
}
log.Println(".. Appending binlog paths ...")
ibNode.outCh <- inBinlogMsg
}
}
}
// iMsg is Flush() msg from master
// 1. insertBuffer(not empty) -> binLogs -> minIO/S3
// Return
for _, msg := range iMsg.flushMessages {
currentSegID := msg.GetSegmentID()
flushTs := msg.GetTimestamp()
log.Printf(". Receiving flush message segID(%v)...", currentSegID)
if ibNode.insertBuffer.size(currentSegID) > 0 {
log.Println(".. Buffer not empty, flushing ...")
segMeta, collMeta, err := ibNode.getMeta(currentSegID)
if err != nil {
// GOOSE TODO add error handler
log.Println("Get meta wrong")
}
inCodec := storage.NewInsertCodec(collMeta)
// partitionTag -> partitionID
partitionTag := segMeta.GetPartitionTag()
partitionID, err := typeutil.Hash32String(partitionTag)
if err != nil {
// GOOSE TODO add error handler
log.Println("partitionTag to partitionID Wrong")
}
// buffer data to binlogs
binLogs, err := inCodec.Serialize(partitionID,
currentSegID, ibNode.insertBuffer.insertData[currentSegID])
if err != nil {
log.Println("generate binlog wrong")
}
// clear buffer
delete(ibNode.insertBuffer.insertData, currentSegID)
// binLogs -> minIO/S3
collIDStr := strconv.FormatInt(segMeta.GetCollectionID(), 10)
partitionIDStr := strconv.FormatInt(partitionID, 10)
segIDStr := strconv.FormatInt(currentSegID, 10)
keyPrefix := path.Join(ibNode.minioPrifex, collIDStr, partitionIDStr, segIDStr)
for _, blob := range binLogs {
uid, err := ibNode.idAllocator.AllocOne()
if err != nil {
log.Println("Allocate Id failed")
// GOOSE TODO error handler
}
key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10))
err = ibNode.minIOKV.Save(key, string(blob.Value[:]))
if err != nil {
log.Println("Save to MinIO failed")
// GOOSE TODO error handler
}
fieldID, err := strconv.ParseInt(blob.Key, 10, 32)
if err != nil {
log.Println("string to fieldID wrong")
// GOOSE TODO error handler
}
// Append binlogs
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: false,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: flushTs,
segID: currentSegID,
fieldID: int32(fieldID),
paths: []string{key},
},
}
ibNode.outCh <- inBinlogMsg
}
}
// Flushed
log.Println(".. Flush finished ...")
inBinlogMsg := &insertFlushSyncMsg{
flushCompleted: true,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: flushTs,
segID: currentSegID,
},
}
ibNode.outCh <- inBinlogMsg
}
}
if err := ibNode.writeHardTimeTick(iMsg.timeRange.timestampMax); err != nil {
......@@ -437,6 +524,27 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
return nil
}
func (ibNode *insertBufferNode) getMeta(segID UniqueID) (*etcdpb.SegmentMeta, *etcdpb.CollectionMeta, error) {
segMeta := &etcdpb.SegmentMeta{}
key := path.Join(SegmentPrefix, strconv.FormatInt(segID, 10))
value, _ := ibNode.kvClient.Load(key)
err := proto.UnmarshalText(value, segMeta)
if err != nil {
return nil, nil, err
}
collMeta := &etcdpb.CollectionMeta{}
key = path.Join(CollectionPrefix, strconv.FormatInt(segMeta.GetCollectionID(), 10))
value, _ = ibNode.kvClient.Load(key)
err = proto.UnmarshalText(value, collMeta)
if err != nil {
return nil, nil, err
}
return segMeta, collMeta, nil
}
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
msgPack := msgstream.MsgPack{}
timeTickMsg := msgstream.TimeTickMsg{
......@@ -506,6 +614,10 @@ func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg) *i
if err != nil {
panic(err)
}
err = idAllocator.Start()
if err != nil {
panic(err)
}
wTt := msgstream.NewPulsarMsgStream(ctx, 1024) //input stream, write node time tick
wTt.SetPulsarClient(Params.PulsarAddress)
......
package writenode
import (
"context"
"encoding/binary"
"math"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
func TestFlowGraphInputBufferNode_Operate(t *testing.T) {
const ctxTimeInMillisecond = 2000
const closeWithDeadline = false
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
ctx = context.Background()
}
ddChan := make(chan *ddlFlushSyncMsg, 10)
defer close(ddChan)
insertChan := make(chan *insertFlushSyncMsg, 10)
defer close(insertChan)
testPath := "/test/writenode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
Params.MetaRootPath = testPath
fService := newFlushSyncService(ctx, ddChan, insertChan)
assert.Equal(t, testPath, fService.metaTable.client.(*etcdkv.EtcdKV).GetPath("."))
go fService.start()
// Params.FlushInsertBufSize = 2
iBNode := newInsertBufferNode(ctx, insertChan)
newMeta()
inMsg := genInsertMsg()
var iMsg flowgraph.Msg = &inMsg
iBNode.Operate([]*flowgraph.Msg{&iMsg})
}
func genInsertMsg() insertMsg {
// test data generate
// GOOSE TODO orgnize
const DIM = 2
const N = 1
var rawData []byte
// Float vector
var fvector = [DIM]float32{1, 2}
for _, ele := range fvector {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
rawData = append(rawData, buf...)
}
// Binary vector
// Dimension of binary vector is 32
var bvector = [4]byte{255, 255, 255, 0}
for _, ele := range bvector {
bs := make([]byte, 4)
binary.LittleEndian.PutUint32(bs, uint32(ele))
rawData = append(rawData, bs...)
}
// Bool
bb := make([]byte, 4)
var fieldBool = true
var fieldBoolInt uint32
if fieldBool {
fieldBoolInt = 1
} else {
fieldBoolInt = 0
}
binary.LittleEndian.PutUint32(bb, fieldBoolInt)
rawData = append(rawData, bb...)
// int8
var dataInt8 int8 = 100
bint8 := make([]byte, 4)
binary.LittleEndian.PutUint32(bint8, uint32(dataInt8))
rawData = append(rawData, bint8...)
// int16
var dataInt16 int16 = 200
bint16 := make([]byte, 4)
binary.LittleEndian.PutUint32(bint16, uint32(dataInt16))
rawData = append(rawData, bint16...)
// int32
var dataInt32 int32 = 300
bint32 := make([]byte, 4)
binary.LittleEndian.PutUint32(bint32, uint32(dataInt32))
rawData = append(rawData, bint32...)
// int64
var dataInt64 int64 = 300
bint64 := make([]byte, 4)
binary.LittleEndian.PutUint32(bint64, uint32(dataInt64))
rawData = append(rawData, bint64...)
// float32
var datafloat float32 = 1.1
bfloat32 := make([]byte, 4)
binary.LittleEndian.PutUint32(bfloat32, math.Float32bits(datafloat))
rawData = append(rawData, bfloat32...)
// float64
var datafloat64 float64 = 2.2
bfloat64 := make([]byte, 8)
binary.LittleEndian.PutUint64(bfloat64, math.Float64bits(datafloat64))
rawData = append(rawData, bfloat64...)
timeRange := TimeRange{
timestampMin: 0,
timestampMax: math.MaxUint64,
}
var iMsg = &insertMsg{
insertMessages: make([]*msgstream.InsertMsg, 0),
flushMessages: make([]*msgstream.FlushMsg, 0),
timeRange: TimeRange{
timestampMin: timeRange.timestampMin,
timestampMax: timeRange.timestampMax,
},
}
// messages generate
const MSGLENGTH = 1
// insertMessages := make([]msgstream.TsMsg, 0)
for i := 0; i < MSGLENGTH; i++ {
var msg = &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{
uint32(i),
},
},
InsertRequest: internalpb.InsertRequest{
MsgType: internalpb.MsgType_kInsert,
ReqID: UniqueID(0),
CollectionName: "coll1",
PartitionTag: "default",
SegmentID: UniqueID(1),
ChannelID: UniqueID(0),
ProxyID: UniqueID(0),
Timestamps: []Timestamp{Timestamp(i + 1000)},
RowIDs: []UniqueID{UniqueID(i)},
RowData: []*commonpb.Blob{
{Value: rawData},
},
},
}
iMsg.insertMessages = append(iMsg.insertMessages, msg)
}
var fmsg msgstream.FlushMsg = msgstream.FlushMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{
uint32(10),
},
},
FlushMsg: internalpb.FlushMsg{
MsgType: internalpb.MsgType_kFlush,
SegmentID: UniqueID(1),
Timestamp: Timestamp(2000),
},
}
iMsg.flushMessages = append(iMsg.flushMessages, &fmsg)
return *iMsg
}
......@@ -21,6 +21,7 @@ type (
collectionRecords map[string][]metaOperateRecord
// TODO: use partition id
partitionRecords map[string][]metaOperateRecord
flushMessages []*msgstream.FlushMsg
timeRange TimeRange
}
......
......@@ -12,10 +12,10 @@ type (
flushSyncService struct {
ctx context.Context
metaTable *metaTable
ddChan chan *ddlFlushSyncMsg // TODO GOOSE Init Size??
insertChan chan *insertFlushSyncMsg // TODO GOOSE Init Size??
ddFlushed map[UniqueID]bool // Segment ID
insertFlushed map[UniqueID]bool // Segment ID
ddChan chan *ddlFlushSyncMsg
insertChan chan *insertFlushSyncMsg
ddFlushed map[UniqueID]bool // Segment ID
insertFlushed map[UniqueID]bool // Segment ID
}
)
......
......@@ -81,12 +81,12 @@ func TestFlushSyncService_Start(t *testing.T) {
for _, msg := range ddMsgs {
ddChan <- msg
time.Sleep(time.Millisecond * 10)
time.Sleep(time.Millisecond * 50)
}
for _, msg := range insertMsgs {
insertChan <- msg
time.Sleep(time.Millisecond * 10)
time.Sleep(time.Millisecond * 50)
}
ret, err := fService.metaTable.getSegBinlogPaths(SegID)
......
......@@ -25,8 +25,10 @@ func NewWriteNode(ctx context.Context, writeNodeID uint64) *WriteNode {
func (node *WriteNode) Start() {
ddChan := make(chan *ddlFlushSyncMsg, 5)
insertChan := make(chan *insertFlushSyncMsg, 5)
// TODO GOOSE Init Size??
chanSize := 100
ddChan := make(chan *ddlFlushSyncMsg, chanSize)
insertChan := make(chan *insertFlushSyncMsg, chanSize)
node.flushSyncService = newFlushSyncService(node.ctx, ddChan, insertChan)
node.dataSyncService = newDataSyncService(node.ctx, ddChan, insertChan)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册