提交 31178d6a 编写于 作者: B BossZou 提交者: yefu.chen

Add procude logic of RocksMQ

Signed-off-by: NBossZou <yinghao.zou@zilliz.com>
上级 b98b226d
...@@ -4,6 +4,12 @@ import ( ...@@ -4,6 +4,12 @@ import (
"github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/kv"
) )
type IDAllocator interface {
Alloc(count uint32) (UniqueID, UniqueID, error)
AllocOne() (UniqueID, error)
UpdateID() error
}
// GlobalTSOAllocator is the global single point TSO allocator. // GlobalTSOAllocator is the global single point TSO allocator.
type GlobalIDAllocator struct { type GlobalIDAllocator struct {
allocator Allocator allocator Allocator
......
package rocksmq package rocksmq
import ( import (
"strconv"
"sync" "sync"
"github.com/tecbot/gorocksdb"
"github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/kv"
memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" "github.com/zilliztech/milvus-distributed/internal/master"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem"
) )
type UniqueID = typeutil.UniqueID type UniqueID = typeutil.UniqueID
const (
FixedChannelNameLen = 32
)
/**
* @brief fill with '_' to ensure channel name fixed length
*/
func fixChannelName(name string) (string, error) {
if len(name) > FixedChannelNameLen {
return "", errors.New("Channel name exceeds limit")
}
nameBytes := make([]byte, FixedChannelNameLen-len(name))
for i := 0; i < len(nameBytes); i++ {
nameBytes[i] = byte('*')
}
return name + string(nameBytes), nil
}
/**
* Combine key with fixed channel name and unique id
*/
func combKey(channelName string, id UniqueID) (string, error) {
fixName, err := fixChannelName(channelName)
if err != nil {
return "", err
}
return fixName + "/" + strconv.FormatInt(id, 10), nil
}
type ProducerMessage struct { type ProducerMessage struct {
payload []byte payload []byte
} }
...@@ -30,20 +67,102 @@ type ConsumerGroupContext struct { ...@@ -30,20 +67,102 @@ type ConsumerGroupContext struct {
} }
type RocksMQ struct { type RocksMQ struct {
kv kv.Base //isServing int64
channels map[string]*Channel store *gorocksdb.DB
cgCtxs map[string]ConsumerGroupContext kv kv.Base
mu sync.Mutex channels map[string]*Channel
cgCtxs map[string]ConsumerGroupContext
idAllocator master.IDAllocator
mu sync.Mutex
//ctx context.Context
//serverLoopWg sync.WaitGroup
//serverLoopCtx context.Context
//serverLoopCancel func()
//// tso ticker
//tsoTicker *time.Ticker
} }
func NewRocksMQ() *RocksMQ { func NewRocksMQ() *RocksMQ {
mkv := memkv.NewMemoryKV() mkv := memkv.NewMemoryKV()
// mstore, _ :=
rmq := &RocksMQ{ rmq := &RocksMQ{
// store: mstore,
kv: mkv, kv: mkv,
} }
return rmq return rmq
} }
//func (rmq *RocksMQ) startServerLoop(ctx context.Context) error {
// rmq.serverLoopCtx, rmq.serverLoopCancel = context.WithCancel(ctx)
//
// go rmq.tsLoop()
//
// return nil
//}
//func (rmq *RocksMQ) stopServerLoop() {
// rmq.serverLoopCancel()
// rmq.serverLoopWg.Wait()
//}
//func (rmq *RocksMQ) tsLoop() {
// defer rmq.serverLoopWg.Done()
// rmq.tsoTicker = time.NewTicker(master.UpdateTimestampStep)
// defer rmq.tsoTicker.Stop()
// ctx, cancel := context.WithCancel(rmq.serverLoopCtx)
// defer cancel()
//
// for {
// select {
// case <-rmq.tsoTicker.C:
// if err := rmq.idAllocator.UpdateID(); err != nil {
// log.Println("failed to update id", err)
// return
// }
// case <-ctx.Done():
// // Server is closed and it should return nil.
// log.Println("tsLoop is closed")
// return
// }
// }
//}
//func (rmq *RocksMQ) Start() error {
// //init idAllocator
// // TODO(yhz): id allocator, which need to etcd address and path, where
// // we hardcode about the etcd path
// rmq.idAllocator = master.NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{""}, "stand-alone/rocksmq", "gid"))
// if err := rmq.idAllocator.Initialize(); err != nil {
// return err
// }
//
// // start server loop
// if err := rmq.startServerLoop(rmq.ctx); err != nil {
// return err
// }
//
// atomic.StoreInt64(&rmq.isServing, 1)
//
// return nil
//}
//func (rmq *RocksMQ) Stop() error {
// if !atomic.CompareAndSwapInt64(&rmq.isServing, 1, 0) {
// // server is already closed
// return nil
// }
//
// log.Print("closing server")
//
// rmq.stopServerLoop()
//
// rmq.kv.Close()
// rmq.store.Close()
//
// return nil
//}
func (rmq *RocksMQ) checkKeyExist(key string) bool { func (rmq *RocksMQ) checkKeyExist(key string) bool {
_, err := rmq.kv.Load(key) _, err := rmq.kv.Load(key)
return err == nil return err == nil
...@@ -118,7 +237,51 @@ func (rmq *RocksMQ) DestroyConsumerGroup(groupName string, channelName string) e ...@@ -118,7 +237,51 @@ func (rmq *RocksMQ) DestroyConsumerGroup(groupName string, channelName string) e
} }
func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) error { func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) error {
return nil msgLen := len(messages)
idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen))
if err != nil {
return err
}
// TODO(yhz): Here assume allocated id size is equal to message size
if UniqueID(msgLen) != idEnd-idStart {
return errors.New("Obtained id length is not equal that of message")
}
/* Step I: Insert data to store system */
batch := gorocksdb.NewWriteBatch()
for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ {
key, err := combKey(channelName, idStart+UniqueID(i))
if err != nil {
return err
}
batch.Put([]byte(key), messages[i].payload)
}
err = rmq.store.Write(gorocksdb.NewDefaultWriteOptions(), batch)
if err != nil {
return err
}
/* Step II: Update meta data to kv system */
kvChannelBeginID := channelName + "/begin_id"
beginIDValue, err := rmq.kv.Load(kvChannelBeginID)
if err != nil {
return err
}
kvValues := make(map[string]string)
if beginIDValue == "0" {
kvValues[kvChannelBeginID] = strconv.FormatInt(idStart, 10)
}
kvChannelEndID := channelName + "/end_id"
kvValues[kvChannelEndID] = strconv.FormatInt(idEnd, 10)
return rmq.kv.MultiSave(kvValues)
} }
func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error) { func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error) {
...@@ -126,5 +289,27 @@ func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]Cons ...@@ -126,5 +289,27 @@ func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]Cons
} }
func (rmq *RocksMQ) Seek(groupName string, channelName string, msgID UniqueID) error { func (rmq *RocksMQ) Seek(groupName string, channelName string, msgID UniqueID) error {
/* Step I: Check if key exists */
key := groupName + "/" + channelName + "/current_id"
if !rmq.checkKeyExist(key) {
return errors.New("ConsumerGroup " + groupName + ", channel " + channelName + " not exists.")
}
storeKey, err := combKey(channelName, msgID)
if err != nil {
return err
}
_, err = rmq.store.Get(gorocksdb.NewDefaultReadOptions(), []byte(storeKey))
if err != nil {
return err
}
/* Step II: Save current_id in kv */
err = rmq.kv.Save(key, strconv.FormatInt(msgID, 10))
if err != nil {
return err
}
return nil return nil
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册