diff --git a/docker-compose.yml b/docker-compose.yml index 010afc26e18b05aec54a187c5bf5affa49316d62..5517c06177419bf5714603b92b88236b62468769 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,7 +25,6 @@ services: command: &ubuntu-command > /bin/bash -c " /milvus-distributed/scripts/core_build.sh -u && \ - go build -o /milvus-distributed/cmd/writer/writer /milvus-distributed/cmd/writer/writer.go && \ go build -o /milvus-distributed/cmd/reader/reader /milvus-distributed/cmd/reader/reader.go && \ go build -o /milvus-distributed/cmd/master/master /milvus-distributed/cmd/master/main.go && \ go build -o /milvus-distributed/cmd/proxy/proxy /milvus-distributed/cmd/proxy/proxy.go" diff --git a/docs/developer_guides/developer_guides.md b/docs/developer_guides/developer_guides.md index 219884c1ba514ba5af5b53ec4cb6fd69df02f020..12103a7cb669498b5afca1fed43552e5e86a8938 100644 --- a/docs/developer_guides/developer_guides.md +++ b/docs/developer_guides/developer_guides.md @@ -224,12 +224,11 @@ func (tso *timestampOracle) loadTimestamp() error #### 4.2 Timestamp Allocator ```go -type TimestampAllocator struct { - Alloc(count uint32) ([]Timestamp, error) -} +type TimestampAllocator struct {} func (allocator *TimestampAllocator) Start() error func (allocator *TimestampAllocator) Close() error +func (allocator *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) func NewTimestampAllocator() *TimestampAllocator ``` @@ -303,8 +302,15 @@ func (gparams *GlobalParamsTable) Remove(key string) error ``` go type MsgType uint32 const { - USER_REQUEST MsgType = 1 - TIME_TICK = 2 + kInsert MsgType = 400 + kDelete MsgType = 401 + kSearch MsgType = 500 + KSearchResult MsgType = 1000 + + kSegStatistics MsgType = 1100 + + kTimeTick MsgType = 1200 + kTimeSync MsgType = 1201 } type TsMsg interface { @@ -370,24 +376,105 @@ func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick -#### 5.4 ID Allocator +#### 5.4 Time Ticked Flow Graph + +###### 5.4.1 Flow Graph States + +```go +type flowGraphStates struct { + startTick Timestamp + numActiveTasks map[string]int32 + numCompletedTasks map[string]int64 +} +``` + +###### 5.4.2 Message + +```go +type Msg interface { + TimeTick() Timestamp + DownStreamNodeIdx() int32 +} +``` + +###### 5.4.3 Node + +```go +type Node interface { + Name() string + MaxQueueLength() int32 + MaxParallelism() int32 + SetPipelineStates(states *flowGraphStates) + Operate([]*Msg) []*Msg +} +``` + + + +```go +type baseNode struct { + Name string + maxQueueLength int32 + maxParallelism int32 + graphStates *flowGraphStates +} +func (node *baseNode) MaxQueueLength() int32 +func (node *baseNode) MaxParallelism() int32 +func (node *baseNode) SetMaxQueueLength(n int32) +func (node *baseNode) SetMaxParallelism(n int32) +func (node *baseNode) SetPipelineStates(states *flowGraphStates) +``` + +###### 5.4.4 Flow Graph + +```go +type nodeCtx struct { + node *Node + inputChans [](*chan *Msg) + outputChans [](*chan *Msg) + inputMsgs [](*Msg List) + downstreams []*nodeCtx +} + +func (nodeCtx *nodeCtx) Start(ctx context.Context) error +``` + +*Start()* will enter a loop. In each iteration, it tries to collect input messges from *inputChan*, then prepare node's input. When input is ready, it will trigger *node.Operate*. When *node.Operate* returns, it sends the returned *Msg* to *outputChans*, which connects to the downstreams' *inputChans*. + +```go +type TimeTickedFlowGraph struct { + states *flowGraphStates + nodeCtx map[string]*nodeCtx +} + +func (*pipeline TimeTickedFlowGraph) AddNode(node *Node) +func (*pipeline TimeTickedFlowGraph) SetEdges(nodeName string, in []string, out []string) +func (*pipeline TimeTickedFlowGraph) Start() error +func (*pipeline TimeTickedFlowGraph) Close() error + +func NewTimeTickedFlowGraph(ctx context.Context) *TimeTickedFlowGraph +``` + + + +#### 5.5 ID Allocator ```go type IdAllocator struct { - Alloc(count uint32) ([]int64, error) } func (allocator *IdAllocator) Start() error func (allocator *IdAllocator) Close() error +func (allocator *IdAllocator) Alloc(count uint32) ([]int64, error) -func NewIdAllocator() *IdAllocator +func NewIdAllocator(ctx context.Context) *IdAllocator ``` -#### 5.4 KV +#### 5.6 KV -###### 5.4.1 KV Base +###### 5.6.1 KV Base ```go type KVBase interface { @@ -409,7 +496,7 @@ type KVBase interface { -###### 5.4.2 Etcd KV +###### 5.6.2 Etcd KV ```go type EtcdKV struct { @@ -854,6 +941,8 @@ type task interface { } ``` + + A task example is as follows. In this example, we wrap a CreateCollectionRequest (a proto) as a createCollectionTask. The wrapper need to contain task interfaces. ``` go diff --git a/internal/msgstream/marshaler.go b/internal/msgstream/marshaler.go new file mode 100644 index 0000000000000000000000000000000000000000..aa9d4153a29ab449465910eab7aa73c50a83d737 --- /dev/null +++ b/internal/msgstream/marshaler.go @@ -0,0 +1,194 @@ +package msgstream + +import ( + "github.com/golang/protobuf/proto" + commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +type TsMsgMarshaler interface { + Marshal(input *TsMsg) ([]byte, commonPb.Status) + Unmarshal(input []byte) (*TsMsg, commonPb.Status) +} + +func GetMarshalers(inputMsgType MsgType, outputMsgType MsgType) (*TsMsgMarshaler, *TsMsgMarshaler) { + return GetMarshaler(inputMsgType), GetMarshaler(outputMsgType) +} + +func GetMarshaler(MsgType MsgType) *TsMsgMarshaler { + switch MsgType { + case kInsert: + insertMarshaler := &InsertMarshaler{} + var tsMsgMarshaller TsMsgMarshaler = insertMarshaler + return &tsMsgMarshaller + case kDelete: + deleteMarshaler := &DeleteMarshaler{} + var tsMsgMarshaller TsMsgMarshaler = deleteMarshaler + return &tsMsgMarshaller + case kSearch: + searchMarshaler := &SearchMarshaler{} + var tsMsgMarshaller TsMsgMarshaler = searchMarshaler + return &tsMsgMarshaller + case kSearchResult: + searchResultMarshler := &SearchResultMarshaler{} + var tsMsgMarshaller TsMsgMarshaler = searchResultMarshler + return &tsMsgMarshaller + case kTimeSync: + timeSyncMarshaler := &TimeSyncMarshaler{} + var tsMsgMarshaller TsMsgMarshaler = timeSyncMarshaler + return &tsMsgMarshaller + default: + return nil + } +} + +//////////////////////////////////////Insert/////////////////////////////////////////////// + +type InsertMarshaler struct{} + +func (im *InsertMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) { + insertTask := (*input).(InsertTask) + insertRequest := &insertTask.InsertRequest + mb, err := proto.Marshal(insertRequest) + if err != nil { + return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} + } + return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +} + +func (im *InsertMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) { + insertRequest := internalPb.InsertRequest{} + err := proto.Unmarshal(input, &insertRequest) + insertTask := InsertTask{insertRequest} + + if err != nil { + return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} + } + var tsMsg TsMsg = insertTask + return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +} + +/////////////////////////////////////Delete////////////////////////////////////////////// + +type DeleteMarshaler struct{} + +func (dm *DeleteMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) { + deleteTask := (*input).(DeleteTask) + deleteRequest := &deleteTask.DeleteRequest + mb, err := proto.Marshal(deleteRequest) + if err != nil { + return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} + } + return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +} + +func (dm *DeleteMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) { + deleteRequest := internalPb.DeleteRequest{} + err := proto.Unmarshal(input, &deleteRequest) + deleteTask := DeleteTask{deleteRequest} + if err != nil { + return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} + } + var tsMsg TsMsg = deleteTask + return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +} + +/////////////////////////////////////Search/////////////////////////////////////////////// + +type SearchMarshaler struct{} + +func (sm *SearchMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) { + searchTask := (*input).(SearchTask) + searchRequest := &searchTask.SearchRequest + mb, err := proto.Marshal(searchRequest) + if err != nil { + return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} + } + return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +} + +func (sm *SearchMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) { + searchRequest := internalPb.SearchRequest{} + err := proto.Unmarshal(input, &searchRequest) + searchTask := SearchTask{searchRequest} + if err != nil { + return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} + } + var tsMsg TsMsg = searchTask + return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +} + +/////////////////////////////////////SearchResult/////////////////////////////////////////////// + +type SearchResultMarshaler struct{} + +func (srm *SearchResultMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) { + searchResultTask := (*input).(SearchResultTask) + searchResult := &searchResultTask.SearchResult + mb, err := proto.Marshal(searchResult) + if err != nil { + return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} + } + return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +} + +func (srm *SearchResultMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) { + searchResult := internalPb.SearchResult{} + err := proto.Unmarshal(input, &searchResult) + searchResultTask := SearchResultTask{searchResult} + if err != nil { + return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} + } + var tsMsg TsMsg = searchResultTask + return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +} + +/////////////////////////////////////TimeSync/////////////////////////////////////////////// + +type TimeSyncMarshaler struct{} + +func (tm *TimeSyncMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) { + timeSyncTask := (*input).(TimeSyncTask) + timeSyncMsg := &timeSyncTask.TimeSyncMsg + mb, err := proto.Marshal(timeSyncMsg) + if err != nil { + return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} + } + return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +} + +func (tm *TimeSyncMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) { + timeSyncMsg := internalPb.TimeSyncMsg{} + err := proto.Unmarshal(input, &timeSyncMsg) + timeSyncTask := TimeSyncTask{timeSyncMsg} + if err != nil { + return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} + } + var tsMsg TsMsg = timeSyncTask + return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +} + +///////////////////////////////////////Key2Seg/////////////////////////////////////////////// +// +//type Key2SegMarshaler struct{} +// +//func (km *Key2SegMarshaler) Marshal(input *TsMsg) ([]byte, commonPb.Status) { +// key2SegTask := (*input).(Key2SegTask) +// key2SegMsg := &key2SegTask.Key2SegMsg +// mb, err := proto.Marshal(key2SegMsg) +// if err != nil{ +// return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} +// } +// return mb, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +//} +// +//func (km *Key2SegMarshaler) Unmarshal(input []byte) (*TsMsg, commonPb.Status) { +// key2SegMsg := internalPb.Key2SegMsg{} +// err := proto.Unmarshal(input, &key2SegMsg) +// key2SegTask := Key2SegTask{key2SegMsg} +// if err != nil{ +// return nil, commonPb.Status{ErrorCode: commonPb.ErrorCode_UNEXPECTED_ERROR} +// } +// var tsMsg TsMsg = key2SegTask +// return &tsMsg, commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +//} diff --git a/internal/msgstream/msgstream.go b/internal/msgstream/msgstream.go new file mode 100644 index 0000000000000000000000000000000000000000..85897f968d719c668238d0045967bc565869677f --- /dev/null +++ b/internal/msgstream/msgstream.go @@ -0,0 +1,230 @@ +package msgstream + +import ( + "context" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/zilliztech/milvus-distributed/internal/msgclient" + commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "log" + "sync" +) + +const PulsarChannelLength = 100 + +type TimeStamp uint64 + +type MsgPack struct { + BeginTs TimeStamp + EndTs TimeStamp + Msgs []*TsMsg +} + +type HashFunc func(*MsgPack) map[uint32]*MsgPack + +type MsgStream interface { + SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) + Produce(*MsgPack) commonPb.Status + Consume() *MsgPack // message can be consumed exactly once +} + +type PulsarMsgStream struct { + client *pulsar.Client + producers []*pulsar.Producer + consumers []*pulsar.Consumer + msgHashFunc HashFunc // return a map from produceChannel idx to *MsgPack + + msgMarshaler *TsMsgMarshaler + msgUnmarshaler *TsMsgMarshaler + inputChannel chan *MsgPack + outputChannel chan *MsgPack +} + +func (ms *PulsarMsgStream) SetPulsarCient(address string) { + client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address}) + if err != nil { + log.Printf("connect pulsar failed, %v", err) + } + ms.client = &client +} + +func (ms *PulsarMsgStream) SetProducers(channels []string) { + for i := 0; i < len(channels); i++ { + pp, err := (*ms.client).CreateProducer(pulsar.ProducerOptions{Topic: channels[i]}) + if err != nil { + log.Printf("failed to create reader producer %s, error = %v", channels[i], err) + } + ms.producers = append(ms.producers, &pp) + } +} + +func (ms *PulsarMsgStream) SetConsumers(channels []string, subName string) { + for i := 0; i < len(channels); i++ { + receiveChannel := make(chan pulsar.ConsumerMessage, PulsarChannelLength) + pc, err := (*ms.client).Subscribe(pulsar.ConsumerOptions{ + Topic: channels[i], + SubscriptionName: subName, + Type: pulsar.KeyShared, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + MessageChannel: receiveChannel, + }) + if err != nil { + log.Printf("failed to subscribe topic, error = %v", err) + } + ms.consumers = append(ms.consumers, &pc) + } +} + +func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler) { + ms.msgMarshaler = marshal + ms.msgUnmarshaler = unmarshal +} + +func (ms *PulsarMsgStream) SetHashFunc(hashFunc HashFunc) { + ms.msgHashFunc = func(pack *MsgPack) map[uint32]*MsgPack { + hashResult := hashFunc(pack) + bucketResult := make(map[uint32]*MsgPack) + for k, v := range hashResult { + channelIndex := k % uint32(len(ms.producers)) + _, ok := bucketResult[channelIndex] + if ok == false { + msgPack := MsgPack{} + bucketResult[channelIndex] = &msgPack + } + for _, msg := range v.Msgs { + bucketResult[channelIndex].Msgs = append(bucketResult[channelIndex].Msgs, msg) + } + } + return bucketResult + } +} + +func (ms *PulsarMsgStream) Produce(msg *MsgPack) commonPb.Status { + result := ms.msgHashFunc(msg) + for k, v := range result { + for i := 0; i < len(v.Msgs); i++ { + mb, status := (*ms.msgMarshaler).Marshal(v.Msgs[i]) + if status.ErrorCode != commonPb.ErrorCode_SUCCESS { + log.Printf("Marshal ManipulationReqMsg failed, error ") + continue + } + if _, err := (*ms.producers[k]).Send( + context.Background(), + &pulsar.ProducerMessage{Payload: mb}, + ); err != nil { + log.Printf("post into pulsar filed, error = %v", err) + } + } + } + + return commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS} +} +func (ms *PulsarMsgStream) Consume() *MsgPack { + tsMsgList := make([]*TsMsg, 0) + for i := 0; i < len(ms.consumers); i++ { + pulsarMsg, ok := <-(*ms.consumers[i]).Chan() + if ok == false { + log.Fatal("consumer closed!") + continue + } + (*ms.consumers[i]).AckID(pulsarMsg.ID()) + tsMsg, status := (*ms.msgUnmarshaler).Unmarshal(pulsarMsg.Payload()) + if status.ErrorCode != commonPb.ErrorCode_SUCCESS { + log.Printf("Marshal ManipulationReqMsg failed, error ") + } + tsMsgList = append(tsMsgList, tsMsg) + + } + + msgPack := MsgPack{Msgs: tsMsgList} + return &msgPack +} + +type PulsarTtMsgStream struct { + PulsarMsgStream + inputBuf []*TsMsg + unsolvedBuf []*TsMsg + msgPacks []*MsgPack + lastTimeStamp TimeStamp +} + +func (ms *PulsarTtMsgStream) Consume() *MsgPack { //return messages in one time tick + wg := sync.WaitGroup{} + wg.Add(len(ms.consumers)) + eofMsgTimeStamp := make(map[int]TimeStamp) + mu := sync.Mutex{} + for i := 0; i < len(ms.consumers); i++ { + go ms.findTimeTick(context.Background(), i, eofMsgTimeStamp, &wg, &mu) + } + wg.Wait() + timeStamp, ok := checkTimeTickMsg(eofMsgTimeStamp) + if ok == false { + log.Fatal("timeTick err") + } + + timeTickBuf := make([]*TsMsg, 0) + for _, v := range ms.unsolvedBuf { + ms.inputBuf = append(ms.inputBuf, v) + } + ms.unsolvedBuf = ms.unsolvedBuf[:0] + for _, v := range ms.inputBuf { + if (*v).Ts() >= timeStamp { + timeTickBuf = append(timeTickBuf, v) + } else { + ms.unsolvedBuf = append(ms.unsolvedBuf, v) + } + } + ms.inputBuf = ms.inputBuf[:0] + + msgPack := MsgPack{ + BeginTs: ms.lastTimeStamp, + EndTs: timeStamp, + Msgs: timeTickBuf, + } + + return &msgPack +} + +func (ms *PulsarTtMsgStream) findTimeTick(ctx context.Context, + channelIndex int, + eofMsgMap map[int]TimeStamp, + wg *sync.WaitGroup, + mu *sync.Mutex) { + for { + select { + case <-ctx.Done(): + return + case pulsarMsg, ok := <-(*ms.consumers[channelIndex]).Chan(): + if ok == false { + log.Fatal("consumer closed!") + continue + } + (*ms.consumers[channelIndex]).Ack(pulsarMsg) + tsMsg, status := (*ms.msgUnmarshaler).Unmarshal(pulsarMsg.Payload()) + // TODO:: Find the EOF + if (*tsMsg).Type() == msgclient.kTimeTick { + eofMsgMap[channelIndex] = (*tsMsg).Ts() + break + } + if status.ErrorCode != commonPb.ErrorCode_SUCCESS { + log.Printf("Marshal ManipulationReqMsg failed, error ") + } + mu.Lock() + ms.inputBuf = append(ms.inputBuf, tsMsg) + mu.Unlock() + } + } + wg.Done() +} + +func checkTimeTickMsg(msg map[int]TimeStamp) (TimeStamp, bool) { + checkMap := make(map[TimeStamp]int) + for _, v := range msg { + checkMap[v] += 1 + } + if len(checkMap) <= 1 { + for k, _ := range checkMap { + return k, true + } + } + return 0, false +} diff --git a/internal/msgstream/stream_test.go b/internal/msgstream/stream_test.go new file mode 100644 index 0000000000000000000000000000000000000000..0a56b2dd26fb08cef8b101aa1dd58c24f5bb7df6 --- /dev/null +++ b/internal/msgstream/stream_test.go @@ -0,0 +1,215 @@ +package msgstream + +import ( + "fmt" + commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "log" + "testing" +) + +func produceHashTopic(input *MsgPack) map[uint32]*MsgPack { + msgs := input.Msgs + result := make(map[uint32]*MsgPack) + count := len(msgs) + for i := 0; i < count; i++ { + var key uint32 + var err error + switch (*msgs[i]).Type() { + case kInsert: + var insertMsg InsertTask = (*msgs[i]).(InsertTask) + key, err = typeutil.Hash32Int64(insertMsg.ReqId) + case kDelete: + var deleteMsg DeleteTask = (*msgs[i]).(DeleteTask) + key, err = typeutil.Hash32Int64(deleteMsg.ReqId) + case kSearch: + var searchMsg SearchTask = (*msgs[i]).(SearchTask) + key, err = typeutil.Hash32Int64(searchMsg.ReqId) + case kSearchResult: + var searchResultMsg SearchResultTask = (*msgs[i]).(SearchResultTask) + key, err = typeutil.Hash32Int64(searchResultMsg.ReqId) + case kTimeSync: + var timeSyncMsg TimeSyncTask = (*msgs[i]).(TimeSyncTask) + key, err = typeutil.Hash32Int64(timeSyncMsg.PeerId) + default: + log.Fatal("con't find msgType") + } + + if err != nil { + log.Fatal(err) + } + _, ok := result[key] + if ok == false { + msgPack := MsgPack{} + result[key] = &msgPack + } + result[key].Msgs = append(result[key].Msgs, msgs[i]) + } + return result +} + +func baseTest(pulsarAddress string, + producerChannels []string, + consumerChannels []string, + consumerSubName string, + msgPack *MsgPack, + inputMsgType MsgType, + outputMsgType MsgType) { + + // set input stream + inputStream := PulsarMsgStream{} + inputStream.SetPulsarCient(pulsarAddress) + inputStream.SetMsgMarshaler(GetMarshaler(inputMsgType), nil) + inputStream.SetProducers(producerChannels) + inputStream.SetHashFunc(produceHashTopic) + + // set output stream + outputStream := PulsarMsgStream{} + outputStream.SetPulsarCient(pulsarAddress) + outputStream.SetMsgMarshaler(nil, GetMarshaler(outputMsgType)) + outputStream.SetConsumers(consumerChannels, consumerSubName) + + //send msgPack + inputStream.Produce(msgPack) + + // receive msg + for { + result := outputStream.Consume() + if len(result.Msgs) > 0 { + msgs := result.Msgs + for _, v := range msgs { + fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v) + } + break + } + } +} + +func TestStream_Insert(t *testing.T) { + pulsarAddress := "pulsar://localhost:6650" + producerChannels := []string{"insert"} + consumerChannels := []string{"insert"} + consumerSubName := "subInsert" + + //pack tsmsg + insertRequest := internalPb.InsertRequest{ + ReqType: internalPb.ReqType_kInsert, + ReqId: 1, + CollectionName: "Collection", + PartitionTag: "Partition", + SegmentId: 1, + ChannelId: 1, + ProxyId: 1, + Timestamp: 1, + } + insertMsg := InsertTask{ + insertRequest, + } + var tsMsg TsMsg = insertMsg + msgPack := MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, &tsMsg) + + //run stream + baseTest(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kInsert, kInsert) +} + +func TestStream_Delete(t *testing.T) { + pulsarAddress := "pulsar://localhost:6650" + producerChannels := []string{"delete"} + consumerChannels := []string{"delete"} + consumerSubName := "subDelete" + + //pack tsmsg + deleteRequest := internalPb.DeleteRequest{ + ReqType: internalPb.ReqType_kInsert, + ReqId: 1, + CollectionName: "Collection", + ChannelId: 1, + ProxyId: 1, + Timestamp: 1, + PrimaryKeys: []int64{1}, + } + deleteMsg := DeleteTask{ + deleteRequest, + } + var tsMsg TsMsg = deleteMsg + msgPack := MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, &tsMsg) + + //run stream + baseTest(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kDelete, kDelete) +} + +func TestStream_Search(t *testing.T) { + pulsarAddress := "pulsar://localhost:6650" + producerChannels := []string{"search"} + consumerChannels := []string{"search"} + consumerSubName := "subSearch" + + //pack tsmsg + searchRequest := internalPb.SearchRequest{ + ReqType: internalPb.ReqType_kSearch, + ReqId: 1, + ProxyId: 1, + Timestamp: 1, + ResultChannelId: 1, + } + searchMsg := SearchTask{ + searchRequest, + } + var tsMsg TsMsg = searchMsg + msgPack := MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, &tsMsg) + + //run stream + baseTest(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kSearch, kSearch) +} + +func TestStream_SearchResult(t *testing.T) { + pulsarAddress := "pulsar://localhost:6650" + producerChannels := []string{"search"} + consumerChannels := []string{"search"} + consumerSubName := "subSearch" + + //pack tsmsg + searchResult := internalPb.SearchResult{ + Status: &commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS}, + ReqId: 1, + ProxyId: 1, + QueryNodeId: 1, + Timestamp: 1, + ResultChannelId: 1, + } + searchResultMsg := SearchResultTask{ + searchResult, + } + var tsMsg TsMsg = searchResultMsg + msgPack := MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, &tsMsg) + + //run stream + baseTest(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kSearchResult, kSearchResult) +} + +func TestStream_TimeSync(t *testing.T) { + pulsarAddress := "pulsar://localhost:6650" + producerChannels := []string{"search"} + consumerChannels := []string{"search"} + consumerSubName := "subSearch" + + //pack tsmsg + timeSyncResult := internalPb.TimeSyncMsg{ + PeerId: 1, + Timestamp: 1, + } + timeSyncMsg := TimeSyncTask{ + timeSyncResult, + } + var tsMsg TsMsg = timeSyncMsg + msgPack := MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, &tsMsg) + + //run stream + baseTest(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, kTimeSync, kTimeSync) +} diff --git a/internal/msgstream/task.go b/internal/msgstream/task.go new file mode 100644 index 0000000000000000000000000000000000000000..c85595baa0552718746030fb776a10ae9bfc30d6 --- /dev/null +++ b/internal/msgstream/task.go @@ -0,0 +1,133 @@ +package msgstream + +import ( + internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +type MsgType uint32 + +const ( + kInsert MsgType = 1 + kDelete MsgType = 2 + kSearch MsgType = 3 + kSearchResult MsgType = 4 + kTimeTick MsgType = 5 + kSegmentStatics MsgType = 6 + kTimeSync MsgType = 7 +) + +type TsMsg interface { + SetTs(ts TimeStamp) + Ts() TimeStamp + Type() MsgType +} + +/////////////////////////////////////////Insert////////////////////////////////////////// +type InsertTask struct { + internalPb.InsertRequest +} + +func (it InsertTask) SetTs(ts TimeStamp) { + it.Timestamp = uint64(ts) +} + +func (it InsertTask) Ts() TimeStamp { + return TimeStamp(it.Timestamp) +} + +func (it InsertTask) Type() MsgType { + if it.ReqType == internalPb.ReqType_kNone { + return kTimeTick + } + return kInsert +} + +/////////////////////////////////////////Delete////////////////////////////////////////// +type DeleteTask struct { + internalPb.DeleteRequest +} + +func (dt DeleteTask) SetTs(ts TimeStamp) { + dt.Timestamp = uint64(ts) +} + +func (dt DeleteTask) Ts() TimeStamp { + return TimeStamp(dt.Timestamp) +} + +func (dt DeleteTask) Type() MsgType { + if dt.ReqType == internalPb.ReqType_kNone { + return kTimeTick + } + return kDelete +} + +/////////////////////////////////////////Search////////////////////////////////////////// +type SearchTask struct { + internalPb.SearchRequest +} + +func (st SearchTask) SetTs(ts TimeStamp) { + st.Timestamp = uint64(ts) +} + +func (st SearchTask) Ts() TimeStamp { + return TimeStamp(st.Timestamp) +} + +func (st SearchTask) Type() MsgType { + if st.ReqType == internalPb.ReqType_kNone { + return kTimeTick + } + return kSearch +} + +/////////////////////////////////////////SearchResult////////////////////////////////////////// +type SearchResultTask struct { + internalPb.SearchResult +} + +func (srt SearchResultTask) SetTs(ts TimeStamp) { + srt.Timestamp = uint64(ts) +} + +func (srt SearchResultTask) Ts() TimeStamp { + return TimeStamp(srt.Timestamp) +} + +func (srt SearchResultTask) Type() MsgType { + return kSearchResult +} + +/////////////////////////////////////////TimeSync////////////////////////////////////////// +type TimeSyncTask struct { + internalPb.TimeSyncMsg +} + +func (tst TimeSyncTask) SetTs(ts TimeStamp) { + tst.Timestamp = uint64(ts) +} + +func (tst TimeSyncTask) Ts() TimeStamp { + return TimeStamp(tst.Timestamp) +} + +func (tst TimeSyncTask) Type() MsgType { + return kTimeSync +} + +///////////////////////////////////////////Key2Seg////////////////////////////////////////// +//type Key2SegTask struct { +// internalPb.Key2SegMsg +//} +// +////TODO::Key2SegMsg don't have timestamp +//func (k2st Key2SegTask) SetTs(ts TimeStamp) {} +// +//func (k2st Key2SegTask) Ts() TimeStamp { +// return TimeStamp(0) +//} +// +//func (k2st Key2SegTask) Type() MsgType { +// return +//} diff --git a/internal/proto/internal_msg.proto b/internal/proto/internal_msg.proto index e303274e6d4380a42e40e82a7d67d9c71ee0b1cc..207de8ebc3d42bd6a68f3092c26b4b236ad6c81e 100644 --- a/internal/proto/internal_msg.proto +++ b/internal/proto/internal_msg.proto @@ -24,9 +24,13 @@ enum ReqType { /* Manipulation Requests */ kInsert = 400; + kDelete = 401; /* Query */ kSearch = 500; + + /* System Control */ + kTimeTick = 1200 } enum PeerRole { diff --git a/internal/proxy/proxy_node_test.go b/internal/proxy/proxy_node_test.go index c744fa6f130c54d9d0c2344dfce2972761b05b96..0c447c1377a31f117c2ef45e8eceaa945db08a4c 100644 --- a/internal/proxy/proxy_node_test.go +++ b/internal/proxy/proxy_node_test.go @@ -164,7 +164,7 @@ func TestProxyNode(t *testing.T) { Id: 100, Schema: nil, CreateTime: 0, - SegmentIds: []uint64{101, 102}, + SegmentIds: []int64{101, 102}, PartitionTags: nil, } sm101 := etcdpb.SegmentMeta{ @@ -307,7 +307,7 @@ func TestProxyNode(t *testing.T) { assert.Equal(t, insertR.EntityIdArray[i], int64(i+10)) } - var insertPrimaryKey []uint64 + var insertPrimaryKey []int64 readerM1, ok := <-reader.Chan() assert.True(t, ok) diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 3964fb84789f149544a75d996919789581cdc044..6354696bbba57cb4c0ffdc1e176520f0e0155af7 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -18,6 +18,7 @@ import ( pb "github.com/zilliztech/milvus-distributed/internal/proto/message" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" etcd "go.etcd.io/etcd/clientv3" "go.uber.org/atomic" "google.golang.org/grpc" @@ -195,7 +196,7 @@ func (s *proxyServer) Insert(ctx context.Context, req *servicepb.RowBatch) (*ser for i := 0; i < len(req.HashValues); i++ { key := int64(req.HashValues[i]) - hash, err := Hash32_Int64(key) + hash, err := typeutil.Hash32Int64(key) if err != nil { return nil, status.Errorf(codes.Unknown, "hash failed on %d", key) } diff --git a/internal/proxy/server_test.go b/internal/proxy/server_test.go index d31433c9f432c876fa7d10f6ec657e9303afdd66..aae2cef1b83e727e10ebd1c60a68858dff11c52a 100644 --- a/internal/proxy/server_test.go +++ b/internal/proxy/server_test.go @@ -125,7 +125,7 @@ func TestProxyServer_WatchEtcd(t *testing.T) { col1 := etcdpb.CollectionMeta{ Id: 1, - SegmentIds: []uint64{2, 3}, + SegmentIds: []int64{2, 3}, } seg2 := etcdpb.SegmentMeta{ SegmentId: 2, @@ -174,7 +174,7 @@ func TestProxyServer_WatchEtcd(t *testing.T) { col4 := etcdpb.CollectionMeta{ Id: 4, - SegmentIds: []uint64{5}, + SegmentIds: []int64{5}, } seg5 := etcdpb.SegmentMeta{ SegmentId: 5, @@ -206,7 +206,7 @@ func TestProxyServer_InsertAndDelete(t *testing.T) { Id: 10, Schema: nil, CreateTime: 0, - SegmentIds: []uint64{11, 12}, + SegmentIds: []int64{11, 12}, PartitionTags: nil, } seg11 := etcdpb.SegmentMeta{ @@ -323,7 +323,7 @@ func TestProxyServer_InsertAndDelete(t *testing.T) { assert.Nil(t, err) assert.Equalf(t, dr.ErrorCode, pb.ErrorCode_SUCCESS, "delete failed, error code = %d, reason = %s", dr.ErrorCode, dr.Reason) - var primaryKey []uint64 + var primaryKey []int64 isbreak = false for { if isbreak { diff --git a/internal/proxy/util.go b/internal/util/typeutil/hash.go similarity index 56% rename from internal/proxy/util.go rename to internal/util/typeutil/hash.go index 08264f866bdf87d5686ad99056e9b97ee6a10488..9d67410448f4b29afa5d3b6a01db1e7bd126d1d4 100644 --- a/internal/proxy/util.go +++ b/internal/util/typeutil/hash.go @@ -1,4 +1,4 @@ -package proxy +package typeutil import ( "encoding/binary" @@ -6,7 +6,7 @@ import ( "unsafe" ) -func Hash32_Bytes(b []byte) (uint32, error) { +func Hash32Bytes(b []byte) (uint32, error) { h := murmur3.New32() if _, err := h.Write(b); err != nil { return 0, err @@ -14,13 +14,13 @@ func Hash32_Bytes(b []byte) (uint32, error) { return h.Sum32() & 0x7fffffff, nil } -func Hash32_Uint64(v uint64) (uint32, error) { +func Hash32Uint64(v uint64) (uint32, error) { b := make([]byte, unsafe.Sizeof(v)) binary.LittleEndian.PutUint64(b, v) - return Hash32_Bytes(b) + return Hash32Bytes(b) } -func Hash32_Int64(v int64) (uint32, error) { - return Hash32_Uint64(uint64(v)) -} \ No newline at end of file +func Hash32Int64(v int64) (uint32, error) { + return Hash32Uint64(uint64(v)) +} diff --git a/internal/proxy/util_test.go b/internal/util/typeutil/hash_test.go similarity index 83% rename from internal/proxy/util_test.go rename to internal/util/typeutil/hash_test.go index bae00fdc76d4484b8ae207371a19d32e5ea114e0..c7f642e503e9a0801f97630084cf136817c5e293 100644 --- a/internal/proxy/util_test.go +++ b/internal/util/typeutil/hash_test.go @@ -1,5 +1,4 @@ -package proxy - +package typeutil import ( "github.com/stretchr/testify/assert" "testing" @@ -15,16 +14,17 @@ func TestUint64(t *testing.T) { func TestHash32_Uint64(t *testing.T) { var u uint64 = 0x12 - h, err := Hash32_Uint64(u) + h, err := Hash32Uint64(u) assert.Nil(t, err) t.Log(h) b := make([]byte, unsafe.Sizeof(u)) b[0] = 0x12 - h2, err := Hash32_Bytes(b) + h2, err := Hash32Bytes(b) assert.Nil(t, err) t.Log(h2) assert.Equal(t, h, h2) } +