提交 e316533a 编写于 作者: X XuanYang-cn 提交者: yefu.chen

Add Flush Sync service

Signed-off-by: NXuanYang-cn <xuan.yang@zilliz.com>
上级 5acc9681
......@@ -29,7 +29,7 @@ minio:
accessKeyID: minioadmin
secretAccessKey: minioadmin
useSSL: false
bucketName: "A-bucket"
bucketName: "a-bucket"
pulsar:
address: localhost
......
......@@ -75,12 +75,6 @@ GetTopK(CPlan plan) {
return res;
}
const char*
GetMetricType(CPlan plan) {
auto query_plan = static_cast<milvus::query::Plan*>(plan);
return strdup(query_plan->plan_node_->query_info_.metric_type_.c_str());
}
void
DeletePlan(CPlan cPlan) {
auto plan = (milvus::query::Plan*)cPlan;
......
......@@ -35,9 +35,6 @@ GetNumOfQueries(CPlaceholderGroup placeholder_group);
int64_t
GetTopK(CPlan plan);
const char*
GetMetricType(CPlan plan);
void
DeletePlan(CPlan plan);
......
......@@ -64,11 +64,6 @@ struct SearchResultPair {
return (distance_ < pair.distance_);
}
bool
operator>(const SearchResultPair& pair) const {
return (distance_ > pair.distance_);
}
void
reset_distance() {
distance_ = search_result_->result_distances_[offset_];
......@@ -94,7 +89,7 @@ GetResultData(std::vector<std::vector<int64_t>>& search_records,
AssertInfo(topk > 0, "topK must greater than 0");
for (int i = 0; i < topk; ++i) {
result_pairs[0].reset_distance();
std::sort(result_pairs.begin(), result_pairs.end(), std::greater<>());
std::sort(result_pairs.begin(), result_pairs.end());
auto& result_pair = result_pairs[0];
auto index = result_pair.index_;
is_selected[index] = true;
......
......@@ -140,11 +140,6 @@ Search(CSegmentBase c_segment,
auto status = CStatus();
try {
auto res = segment->Search(plan, placeholder_groups.data(), timestamps, num_groups, *query_result);
if (plan->plan_node_->query_info_.metric_type_ != "IP") {
for (auto& dis : query_result->result_distances_) {
dis *= -1;
}
}
status.error_code = Success;
status.error_msg = "";
} catch (std::exception& e) {
......
......@@ -259,7 +259,6 @@ message SearchResult {
uint64 timestamp = 6;
int64 result_channelID = 7;
repeated bytes hits = 8;
string metric_type = 9;
}
message TimeTickMsg {
......
......@@ -487,7 +487,6 @@ func (qt *QueryTask) PostExecute() error {
Hits: make([][]byte, 0),
}
const minFloat32 = -1 * float32(math.MaxFloat32)
for i := 0; i < nq; i++ {
locs := make([]int, availableQueryNodeNum)
reducedHits := &servicepb.Hits{
......@@ -497,18 +496,18 @@ func (qt *QueryTask) PostExecute() error {
}
for j := 0; j < topk; j++ {
choice, maxDistance := 0, minFloat32
choice, minDistance := 0, float32(math.MaxFloat32)
for q, loc := range locs { // query num, the number of ways to merge
distance := hits[q][i].Scores[loc]
if distance > maxDistance {
if distance < minDistance {
choice = q
maxDistance = distance
minDistance = distance
}
}
choiceOffset := locs[choice]
// check if distance is valid, `invalid` here means very very big,
// in this process, distance here is the smallest, so the rest of distance are all invalid
if hits[choice][i].Scores[choiceOffset] <= minFloat32 {
if hits[choice][i].Scores[choiceOffset] >= float32(math.MaxFloat32) {
break
}
reducedHits.IDs = append(reducedHits.IDs, hits[choice][i].IDs[choiceOffset])
......@@ -518,11 +517,6 @@ func (qt *QueryTask) PostExecute() error {
reducedHits.Scores = append(reducedHits.Scores, hits[choice][i].Scores[choiceOffset])
locs[choice]++
}
if searchResults[0].MetricType != "IP" {
for k := range reducedHits.Scores {
reducedHits.Scores[k] *= -1
}
}
reducedHitsBs, err := proto.Marshal(reducedHits)
if err != nil {
log.Println("marshal error")
......
......@@ -41,13 +41,6 @@ func (plan *Plan) getTopK() int64 {
return int64(topK)
}
func (plan *Plan) getMetricType() string {
cMetricType := C.GetMetricType(plan.cPlan)
defer C.free(unsafe.Pointer(cMetricType))
metricType := C.GoString(cMetricType)
return metricType
}
func (plan *Plan) delete() {
C.DeletePlan(plan.cPlan)
}
......
......@@ -27,8 +27,6 @@ func TestPlan_Plan(t *testing.T) {
assert.NotEqual(t, plan, nil)
topk := plan.getTopK()
assert.Equal(t, int(topk), 10)
metricType := plan.getMetricType()
assert.Equal(t, metricType, "L2")
plan.delete()
deleteCollection(collection)
}
......
......@@ -336,7 +336,6 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
Timestamp: searchTimestamp,
ResultChannelID: searchMsg.ResultChannelID,
Hits: hits,
MetricType: plan.getMetricType(),
}
searchResultMsg := &msgstream.SearchResultMsg{
BaseMsg: msgstream.BaseMsg{HashValues: []uint32{uint32(searchMsg.ResultChannelID)}},
......
......@@ -8,15 +8,20 @@ import (
)
type dataSyncService struct {
ctx context.Context
fg *flowgraph.TimeTickedFlowGraph
ctx context.Context
fg *flowgraph.TimeTickedFlowGraph
ddChan chan *ddlFlushSyncMsg
insertChan chan *insertFlushSyncMsg
}
func newDataSyncService(ctx context.Context) *dataSyncService {
func newDataSyncService(ctx context.Context,
ddChan chan *ddlFlushSyncMsg, insertChan chan *insertFlushSyncMsg) *dataSyncService {
return &dataSyncService{
ctx: ctx,
fg: nil,
ctx: ctx,
fg: nil,
ddChan: ddChan,
insertChan: insertChan,
}
}
......@@ -39,9 +44,10 @@ func (dsService *dataSyncService) initNodes() {
var dmStreamNode Node = newDmInputNode(dsService.ctx)
var ddStreamNode Node = newDDInputNode(dsService.ctx)
var ddNode Node = newDDNode(dsService.ctx)
var filterDmNode Node = newFilteredDmNode()
var insertBufferNode Node = newInsertBufferNode(dsService.ctx)
var ddNode Node = newDDNode(dsService.ctx, dsService.ddChan)
var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.insertChan)
dsService.fg.AddNode(&dmStreamNode)
dsService.fg.AddNode(&ddStreamNode)
......
......@@ -39,8 +39,7 @@ func TestDataSyncService_Start(t *testing.T) {
// init write node
pulsarURL := Params.PulsarAddress
node, err := NewWriteNode(ctx, 0)
assert.Nil(t, err)
node := NewWriteNode(ctx, 0)
// test data generate
// GOOSE TODO orgnize
......@@ -190,7 +189,7 @@ func TestDataSyncService_Start(t *testing.T) {
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err = insertMsgStream.Produce(&msgPack)
err := insertMsgStream.Produce(&msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
......@@ -199,7 +198,7 @@ func TestDataSyncService_Start(t *testing.T) {
assert.NoError(t, err)
// dataSync
node.dataSyncService = newDataSyncService(node.ctx)
node.dataSyncService = newDataSyncService(node.ctx, nil, nil)
go node.dataSyncService.start()
node.Close()
......
......@@ -25,6 +25,7 @@ type ddNode struct {
ddMsg *ddMsg
ddRecords *ddRecords
ddBuffer *ddBuffer
outCh chan *ddlFlushSyncMsg // for flush sync
idAllocator *allocator.IDAllocator
kv kv.Base
......@@ -311,7 +312,7 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) {
ddNode.ddBuffer.ddData[collectionID].eventTypes = append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.DropPartitionEventType)
}
func newDDNode(ctx context.Context) *ddNode {
func newDDNode(ctx context.Context, outCh chan *ddlFlushSyncMsg) *ddNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......@@ -335,8 +336,8 @@ func newDDNode(ctx context.Context) *ddNode {
if err != nil {
panic(err)
}
// TODO: load bucket name from yaml?
minioKV, err := miniokv.NewMinIOKV(ctx, minIOClient, "write-node-dd-node")
bucketName := Params.MinioBucketName
minioKV, err := miniokv.NewMinIOKV(ctx, minIOClient, bucketName)
if err != nil {
panic(err)
}
......@@ -357,6 +358,7 @@ func newDDNode(ctx context.Context) *ddNode {
ddData: make(map[UniqueID]*ddData),
maxSize: Params.FlushDdBufSize,
},
outCh: outCh,
idAllocator: idAllocator,
kv: minioKV,
......
......@@ -26,10 +26,9 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
ctx = context.Background()
}
startMaster(ctx)
Params.FlushDdBufSize = 4
ddNode := newDDNode(ctx)
ddNode := newDDNode(ctx, nil)
colID := UniqueID(0)
colName := "col-test-0"
......
......@@ -39,6 +39,7 @@ type (
minIOKV kv.Base
minioPrifex string
idAllocator *allocator.IDAllocator
outCh chan *insertFlushSyncMsg
}
insertBuffer struct {
......@@ -428,7 +429,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
return nil
}
func newInsertBufferNode(ctx context.Context) *insertBufferNode {
func newInsertBufferNode(ctx context.Context, outCh chan *insertFlushSyncMsg) *insertBufferNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
......@@ -447,10 +448,13 @@ func newInsertBufferNode(ctx context.Context) *insertBufferNode {
ETCDAddr := Params.EtcdAddress
MetaRootPath := Params.MetaRootPath
log.Println("metaRootPath: ", MetaRootPath)
cli, _ := clientv3.New(clientv3.Config{
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{ETCDAddr},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
kvClient := etcdkv.NewEtcdKV(cli, MetaRootPath)
// MinIO
......@@ -460,14 +464,23 @@ func newInsertBufferNode(ctx context.Context) *insertBufferNode {
minioUseSSL := Params.MinioUseSSL
minioBucketName := Params.MinioBucketName
minioClient, _ := minio.New(minioendPoint, &minio.Options{
minioClient, err := minio.New(minioendPoint, &minio.Options{
Creds: credentials.NewStaticV4(miniioAccessKeyID, miniioSecretAccessKey, ""),
Secure: minioUseSSL,
})
minIOKV, _ := miniokv.NewMinIOKV(ctx, minioClient, minioBucketName)
if err != nil {
panic(err)
}
minIOKV, err := miniokv.NewMinIOKV(ctx, minioClient, minioBucketName)
if err != nil {
panic(err)
}
minioPrefix := Params.InsertLogRootPath
idAllocator, _ := allocator.NewIDAllocator(ctx, Params.MasterAddress)
idAllocator, err := allocator.NewIDAllocator(ctx, Params.MasterAddress)
if err != nil {
panic(err)
}
return &insertBufferNode{
BaseNode: baseNode,
......@@ -476,5 +489,6 @@ func newInsertBufferNode(ctx context.Context) *insertBufferNode {
minIOKV: minIOKV,
minioPrifex: minioPrefix,
idAllocator: idAllocator,
outCh: outCh,
}
}
package writenode
type (
// segID: set when flushComplete == true, to tell
// the flush_sync_service which segFlush msg does this
// DDL flush for, so that ddl flush and insert flush
// will sync.
ddlBinlogPathMsg struct {
collID UniqueID
segID UniqueID
paths []string
}
ddlFlushSyncMsg struct {
ddlBinlogPathMsg
flushCompleted bool
}
insertBinlogPathMsg struct {
ts Timestamp
segID UniqueID
fieldID int32 // TODO GOOSE may need to change
paths []string
}
// This Msg can notify flushSyncService
// 1.To append binary logs
// 2.To set flush-completed status
//
// When `flushComplete == false`
// `ts` means OpenTime of a segFlushMeta
// When `flushComplete == true`
// `ts` means CloseTime of a segFlushMeta,
// `fieldID` and `paths` need to be empty
insertFlushSyncMsg struct {
insertBinlogPathMsg
flushCompleted bool
}
)
package writenode
import (
"context"
"log"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"go.etcd.io/etcd/clientv3"
)
type (
flushSyncService struct {
ctx context.Context
metaTable *metaTable
ddChan chan *ddlFlushSyncMsg // TODO GOOSE Init Size??
insertChan chan *insertFlushSyncMsg // TODO GOOSE Init Size??
ddFlushed map[UniqueID]bool // Segment ID
insertFlushed map[UniqueID]bool // Segment ID
}
)
func newFlushSyncService(ctx context.Context,
ddChan chan *ddlFlushSyncMsg, insertChan chan *insertFlushSyncMsg) *flushSyncService {
service := &flushSyncService{
ctx: ctx,
ddChan: ddChan,
insertChan: insertChan,
ddFlushed: make(map[UniqueID]bool),
insertFlushed: make(map[UniqueID]bool),
}
// New metaTable
etcdAddr := Params.EtcdAddress
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
if err != nil {
panic(err)
}
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
metaKV, err2 := NewMetaTable(etcdKV)
if err2 != nil {
panic(err2)
}
service.metaTable = metaKV
return service
}
func (fService *flushSyncService) completeDDFlush(segID UniqueID) {
if _, ok := fService.ddFlushed[segID]; !ok {
fService.ddFlushed[segID] = true
return
}
fService.ddFlushed[segID] = true
}
func (fService *flushSyncService) completeInsertFlush(segID UniqueID) {
if _, ok := fService.insertFlushed[segID]; !ok {
fService.insertFlushed[segID] = true
return
}
fService.insertFlushed[segID] = true
}
func (fService *flushSyncService) FlushCompleted(segID UniqueID) bool {
isddFlushed, ok := fService.ddFlushed[segID]
if !ok {
return false
}
isinsertFlushed, ok := fService.insertFlushed[segID]
if !ok {
return false
}
return isddFlushed && isinsertFlushed
}
func (fService *flushSyncService) start() {
for {
select {
case <-fService.ctx.Done():
return
case ddFlushMsg := <-fService.ddChan:
if ddFlushMsg == nil {
continue
}
if !ddFlushMsg.flushCompleted {
err := fService.metaTable.AppendDDLBinlogPaths(ddFlushMsg.collID, ddFlushMsg.paths)
if err != nil {
log.Println("Append segBinlog Error")
// GOOSE TODO error handling
}
continue
}
fService.completeDDFlush(ddFlushMsg.segID)
case insertFlushMsg := <-fService.insertChan:
if insertFlushMsg == nil {
continue
}
if !insertFlushMsg.flushCompleted {
err := fService.metaTable.AppendSegBinlogPaths(insertFlushMsg.ts, insertFlushMsg.segID, insertFlushMsg.fieldID,
insertFlushMsg.paths)
if err != nil {
log.Println("Append segBinlog Error")
// GOOSE TODO error handling
}
continue
}
fService.completeInsertFlush(insertFlushMsg.segID)
if fService.FlushCompleted(insertFlushMsg.segID) {
fService.metaTable.CompleteFlush(insertFlushMsg.ts, insertFlushMsg.segID)
}
}
}
}
package writenode
import (
"context"
"log"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
)
func clearEtcd(rootPath string) error {
etcdAddr := Params.EtcdAddress
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
if err != nil {
return err
}
etcdKV := etcdkv.NewEtcdKV(etcdClient, rootPath)
err = etcdKV.RemoveWithPrefix("writer/segment")
if err != nil {
return err
}
_, _, err = etcdKV.LoadWithPrefix("writer/segment")
if err != nil {
return err
}
log.Println("Clear ETCD with prefix writer/segment ")
err = etcdKV.RemoveWithPrefix("writer/ddl")
if err != nil {
return err
}
_, _, err = etcdKV.LoadWithPrefix("writer/ddl")
if err != nil {
return err
}
log.Println("Clear ETCD with prefix writer/ddl")
return nil
}
func TestFlushSyncService_Start(t *testing.T) {
const ctxTimeInMillisecond = 3000
const closeWithDeadline = false
var ctx context.Context
var cancel context.CancelFunc
if closeWithDeadline {
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
// ctx = context.Background()
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
}
ddChan := make(chan *ddlFlushSyncMsg, 10)
defer close(ddChan)
insertChan := make(chan *insertFlushSyncMsg, 10)
defer close(insertChan)
testPath := "/test/writenode/root/meta"
err := clearEtcd(testPath)
require.NoError(t, err)
Params.MetaRootPath = testPath
fService := newFlushSyncService(ctx, ddChan, insertChan)
assert.Equal(t, testPath, fService.metaTable.client.(*etcdkv.EtcdKV).GetPath("."))
t.Run("FlushSyncService", func(t *testing.T) {
go fService.start()
SegID := UniqueID(100)
ddMsgs := genDdlFlushSyncMsgs(SegID)
insertMsgs := geninsertFlushSyncMsgs(SegID)
for _, msg := range ddMsgs {
ddChan <- msg
time.Sleep(time.Millisecond * 10)
}
for _, msg := range insertMsgs {
insertChan <- msg
time.Sleep(time.Millisecond * 10)
}
ret, err := fService.metaTable.getSegBinlogPaths(SegID)
assert.NoError(t, err)
assert.Equal(t, map[int32][]string{
0: {"x", "y", "z"},
1: {"x", "y", "z"},
2: {"x", "y", "z"},
3: {"x", "y", "z"},
4: {"x", "y", "z"},
}, ret)
ts, err := fService.metaTable.getFlushOpenTime(SegID)
assert.NoError(t, err)
assert.Equal(t, Timestamp(1000), ts)
ts, err = fService.metaTable.getFlushCloseTime(SegID)
assert.NoError(t, err)
assert.Equal(t, Timestamp(2010), ts)
cp, err := fService.metaTable.checkFlushComplete(SegID)
assert.NoError(t, err)
assert.Equal(t, true, cp)
cp, err = fService.metaTable.checkFlushComplete(SegID)
assert.NoError(t, err)
assert.Equal(t, true, cp)
})
}
func genDdlFlushSyncMsgs(segID UniqueID) []*ddlFlushSyncMsg {
ret := make([]*ddlFlushSyncMsg, 0)
for i := 0; i < 5; i++ {
ret = append(ret, &ddlFlushSyncMsg{
flushCompleted: false,
ddlBinlogPathMsg: ddlBinlogPathMsg{
collID: UniqueID(100),
paths: []string{"a", "b", "c"},
},
})
}
ret = append(ret, &ddlFlushSyncMsg{
flushCompleted: true,
ddlBinlogPathMsg: ddlBinlogPathMsg{
segID: segID,
},
})
return ret
}
func geninsertFlushSyncMsgs(segID UniqueID) []*insertFlushSyncMsg {
ret := make([]*insertFlushSyncMsg, 0)
for i := 0; i < 5; i++ {
ret = append(ret, &insertFlushSyncMsg{
flushCompleted: false,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: Timestamp(1000 + i),
segID: segID,
fieldID: int32(i),
paths: []string{"x", "y", "z"},
},
})
}
ret = append(ret, &insertFlushSyncMsg{
flushCompleted: true,
insertBinlogPathMsg: insertBinlogPathMsg{
ts: Timestamp(2010),
segID: segID,
},
})
return ret
}
......@@ -55,13 +55,11 @@ func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error
return mt.saveDDLFlushMeta(meta)
}
func (mt *metaTable) AppendSegBinlogPaths(timestamp Timestamp, segmentID UniqueID, fieldID int32, dataPaths []string) error {
mt.lock.Lock()
defer mt.lock.Unlock()
func (mt *metaTable) AppendSegBinlogPaths(tsOpen Timestamp, segmentID UniqueID, fieldID int32, dataPaths []string) error {
_, ok := mt.segID2FlushMeta[segmentID]
if !ok {
err := mt.addSegmentFlush(segmentID, timestamp)
err := mt.addSegmentFlush(segmentID, tsOpen)
if err != nil {
return err
}
......@@ -89,6 +87,19 @@ func (mt *metaTable) AppendSegBinlogPaths(timestamp Timestamp, segmentID UniqueI
return mt.saveSegFlushMeta(&meta)
}
func (mt *metaTable) CompleteFlush(tsClose Timestamp, segmentID UniqueID) error {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
}
meta.IsClosed = true
meta.CloseTime = tsClose
return mt.saveSegFlushMeta(&meta)
}
// metaTable.lock.Lock() before call this function
func (mt *metaTable) saveDDLFlushMeta(meta *pb.DDLFlushMeta) error {
value := proto.MarshalTextString(meta)
......@@ -180,19 +191,6 @@ func (mt *metaTable) getFlushOpenTime(segmentID UniqueID) (Timestamp, error) {
return meta.OpenTime, nil
}
func (mt *metaTable) CompleteFlush(segmentID UniqueID, timestamp Timestamp) error {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.segID2FlushMeta[segmentID]
if !ok {
return errors.Errorf("segment not exists with ID = " + strconv.FormatInt(segmentID, 10))
}
meta.IsClosed = true
meta.CloseTime = timestamp
return mt.saveSegFlushMeta(&meta)
}
func (mt *metaTable) checkFlushComplete(segmentID UniqueID) (bool, error) {
mt.lock.Lock()
defer mt.lock.Unlock()
......
......@@ -112,7 +112,7 @@ func TestMetaTable_all(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, false, ret)
meta.CompleteFlush(segmentID, closeTime)
meta.CompleteFlush(closeTime, segmentID)
ret, err = meta.checkFlushComplete(segmentID)
assert.NoError(t, err)
......
......@@ -84,7 +84,7 @@ func TestParamTable_WriteNode(t *testing.T) {
t.Run("Test MinioBucketName", func(t *testing.T) {
name := Params.MinioBucketName
assert.Equal(t, name, "A-bucket")
assert.Equal(t, name, "a-bucket")
})
t.Run("Test FlushInsertBufSize", func(t *testing.T) {
......
......@@ -2,48 +2,37 @@ package writenode
import (
"context"
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
"go.etcd.io/etcd/clientv3"
)
type WriteNode struct {
ctx context.Context
WriteNodeID uint64
dataSyncService *dataSyncService
metaTable *metaTable
ctx context.Context
WriteNodeID uint64
dataSyncService *dataSyncService
flushSyncService *flushSyncService
}
func NewWriteNode(ctx context.Context, writeNodeID uint64) (*WriteNode, error) {
func NewWriteNode(ctx context.Context, writeNodeID uint64) *WriteNode {
node := &WriteNode{
ctx: ctx,
WriteNodeID: writeNodeID,
dataSyncService: nil,
ctx: ctx,
WriteNodeID: writeNodeID,
dataSyncService: nil,
flushSyncService: nil,
}
etcdAddress := Params.EtcdAddress
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
if err != nil {
return nil, err
}
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
metaKV, err2 := NewMetaTable(etcdKV)
if err2 != nil {
return nil, err
}
node.metaTable = metaKV
return node, nil
return node
}
func (node *WriteNode) Start() {
node.dataSyncService = newDataSyncService(node.ctx)
// node.statsService = newStatsService(node.ctx)
ddChan := make(chan *ddlFlushSyncMsg, 5)
insertChan := make(chan *insertFlushSyncMsg, 5)
node.flushSyncService = newFlushSyncService(node.ctx, ddChan, insertChan)
node.dataSyncService = newDataSyncService(node.ctx, ddChan, insertChan)
go node.dataSyncService.start()
// node.statsService.start()
go node.flushSyncService.start()
}
func (node *WriteNode) Close() {
......@@ -53,7 +42,4 @@ func (node *WriteNode) Close() {
if node.dataSyncService != nil {
(*node.dataSyncService).close()
}
// if node.statsService != nil {
// (*node.statsService).close()
// }
}
......@@ -8,6 +8,7 @@ import (
"os"
"strconv"
"testing"
"time"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
......@@ -60,6 +61,20 @@ func startMaster(ctx context.Context) {
func TestMain(m *testing.M) {
Params.Init()
refreshChannelNames()
const ctxTimeInMillisecond = 2000
const closeWithDeadline = true
var ctx context.Context
if closeWithDeadline {
var cancel context.CancelFunc
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, cancel = context.WithDeadline(context.Background(), d)
defer cancel()
} else {
ctx = context.Background()
}
startMaster(ctx)
p := Params
fmt.Println(p)
exitCode := m.Run()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册