提交 ef7339ce 编写于 作者: C congqixia 提交者: zhenshan.cao

DataService stores flush msgstream pos at VChannel granularity (#5397)

SaveBinLogPaths saves stream positions at VChannel granularity

resolves #5396

Signed-off-by: Congqi Xia congqi.xia@zilliz.com
上级 cb58a85a
...@@ -27,6 +27,8 @@ etcd: ...@@ -27,6 +27,8 @@ etcd:
statsStreamPosSubPath: dataservice/statsstream # Full path = rootPath/metaSubPath/statsStreamPosSubPath statsStreamPosSubPath: dataservice/statsstream # Full path = rootPath/metaSubPath/statsStreamPosSubPath
segmentDmlPosSubPath: dataservice/segmentdml # Full path = rootPath/metaSubPath/segmentDmlPosSubPath segmentDmlPosSubPath: dataservice/segmentdml # Full path = rootPath/metaSubPath/segmentDmlPosSubPath
segmentDdlPosSubPath: dataservice/segmentddl # Full path = rootPath/metaSubPath/segmentDdlPosSubPath segmentDdlPosSubPath: dataservice/segmentddl # Full path = rootPath/metaSubPath/segmentDdlPosSubPath
dmlChanPosSubPath: dataservice/dmlchannel # Full Path = root/metaSubPath/dmlChanPosSubPath
ddlChanPosSubPath: dataservice/ddlchannel # Full Path = root/metaSubPath/ddlChanPosSubPath
minio: minio:
address: localhost address: localhost
......
...@@ -39,6 +39,8 @@ func (s *Server) genKey(alloc bool, ids ...UniqueID) (key string, err error) { ...@@ -39,6 +39,8 @@ func (s *Server) genKey(alloc bool, ids ...UniqueID) (key string, err error) {
var ( var (
errNilKvClient = errors.New("kv client not initialized") errNilKvClient = errors.New("kv client not initialized")
errNilID2Paths = errors.New("nil ID2PathList")
errNilSegmentInfo = errors.New("nil segment info")
) )
//SaveBinLogMetaTxn saves segment-field2Path, collection-tsPath/ddlPath into kv store in transcation //SaveBinLogMetaTxn saves segment-field2Path, collection-tsPath/ddlPath into kv store in transcation
...@@ -49,10 +51,6 @@ func (s *Server) SaveBinLogMetaTxn(meta map[string]string) error { ...@@ -49,10 +51,6 @@ func (s *Server) SaveBinLogMetaTxn(meta map[string]string) error {
return s.kvClient.MultiSave(meta) return s.kvClient.MultiSave(meta)
} }
var (
errNilID2Paths = errors.New("nil ID2PathList")
)
// prepareField2PathMeta parses fields2Paths ID2PathList // prepareField2PathMeta parses fields2Paths ID2PathList
// into key-value for kv store // into key-value for kv store
func (s *Server) prepareField2PathMeta(segID UniqueID, field2Paths *datapb.ID2PathList) (result map[string]string, err error) { func (s *Server) prepareField2PathMeta(segID UniqueID, field2Paths *datapb.ID2PathList) (result map[string]string, err error) {
...@@ -174,23 +172,29 @@ func (s *Server) getDDLBinlogMeta(collID UniqueID) (metas []*datapb.DDLBinlogMet ...@@ -174,23 +172,29 @@ func (s *Server) getDDLBinlogMeta(collID UniqueID) (metas []*datapb.DDLBinlogMet
} }
// prepareSegmentPos prepare segment flushed pos // prepareSegmentPos prepare segment flushed pos
func (s *Server) prepareSegmentPos(segmentID UniqueID, dmlPos, ddlPos *datapb.PositionPair) (map[string]string, error) { func (s *Server) prepareSegmentPos(segInfo *datapb.SegmentInfo, dmlPos, ddlPos *datapb.PositionPair) (map[string]string, error) {
result := make(map[string]string, 2) if segInfo == nil {
return nil, errNilSegmentInfo
}
result := make(map[string]string, 4)
if dmlPos != nil { if dmlPos != nil {
key, err := s.genKey(false, segmentID) key, err := s.genKey(false, segInfo.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
msPosPair := proto.MarshalTextString(dmlPos) msPosPair := proto.MarshalTextString(dmlPos)
result[path.Join(Params.SegmentDmlPosSubPath, key)] = msPosPair result[path.Join(Params.SegmentDmlPosSubPath, key)] = msPosPair // segment pos
result[path.Join(Params.DmlChannelPosSubPath, segInfo.InsertChannel)] = msPosPair // DmlChannel pos
} }
if ddlPos != nil { if ddlPos != nil {
key, err := s.genKey(false, segmentID) key, err := s.genKey(false, segInfo.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
msPosPair := proto.MarshalTextString(ddlPos) msPosPair := proto.MarshalTextString(ddlPos)
result[path.Join(Params.SegmentDmlPosSubPath, key)] = msPosPair result[path.Join(Params.SegmentDmlPosSubPath, key)] = msPosPair //segment pos
result[path.Join(Params.DdlChannelPosSubPath, segInfo.InsertChannel)] = msPosPair // DdlChannel pos(use dm channel as Key, since dd channel may share same channel name)
} }
return map[string]string{}, nil return map[string]string{}, nil
......
...@@ -36,6 +36,8 @@ type ParamTable struct { ...@@ -36,6 +36,8 @@ type ParamTable struct {
CollectionBinlogSubPath string CollectionBinlogSubPath string
SegmentDmlPosSubPath string SegmentDmlPosSubPath string
SegmentDdlPosSubPath string SegmentDdlPosSubPath string
DmlChannelPosSubPath string
DdlChannelPosSubPath string
// --- Pulsar --- // --- Pulsar ---
PulsarAddress string PulsarAddress string
...@@ -327,3 +329,19 @@ func (p *ParamTable) initSegmentDdlPosSubPath() { ...@@ -327,3 +329,19 @@ func (p *ParamTable) initSegmentDdlPosSubPath() {
} }
p.SegmentDdlPosSubPath = subPath p.SegmentDdlPosSubPath = subPath
} }
func (p *ParamTable) initDmlChannelPosSubPath() {
subPath, err := p.Load("etcd.dmlChanPosSubPath")
if err != nil {
panic(err)
}
p.DmlChannelPosSubPath = subPath
}
func (p *ParamTable) initDdlChannelPosSubPath() {
subPath, err := p.Load("etcd.ddlChanPosSubPath")
if err != nil {
panic(err)
}
p.DdlChannelPosSubPath = subPath
}
...@@ -934,7 +934,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath ...@@ -934,7 +934,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
for k, v := range ddlMeta { for k, v := range ddlMeta {
meta[k] = v meta[k] = v
} }
segmentPos, err := s.prepareSegmentPos(req.SegmentID, req.GetDmlPosition(), req.GetDdlPosition()) segmentPos, err := s.prepareSegmentPos(segInfo, req.GetDmlPosition(), req.GetDdlPosition())
if err != nil { if err != nil {
resp.Reason = err.Error() resp.Reason = err.Error()
return resp, err return resp, err
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册