未验证 提交 32a51267 编写于 作者: Z zhenshan.cao 提交者: GitHub

Add the limitation of the traceable interval to timeTravel (#14498)

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 c5c38bdb
......@@ -177,7 +177,6 @@ dataCoord:
compaction:
enableAutoCompaction: true
retentionDuration: 432000 # 5 days in seconds
gc:
interval: 3600 # gc interval in seconds
......@@ -220,7 +219,7 @@ log:
msgChannel:
# Channel name generation rule: ${namePrefix}-${ChannelIdx}
chanNamePrefix:
cluster: "by-dev"
cluster: "by-dev"
rootCoordTimeTick: "rootcoord-timetick"
rootCoordStatistics: "rootcoord-statistics"
rootCoordDml: "rootcoord-dml"
......@@ -250,6 +249,7 @@ msgChannel:
common:
defaultPartitionName: "_default" # default partition name for a collection
defaultIndexName: "_default_idx" # default index name
retentionDuration: 432000 # 5 days in seconds
knowhere:
# Default value: auto
......
......@@ -73,7 +73,7 @@ func getTimetravelReverseTime(ctx context.Context, allocator allocator) (*timetr
}
pts, _ := tsoutil.ParseTS(ts)
ttpts := pts.Add(-time.Duration(Params.DataCoordCfg.CompactionRetentionDuration) * time.Second)
ttpts := pts.Add(-time.Duration(Params.DataCoordCfg.RetentionDuration) * time.Second)
tt := tsoutil.ComposeTS(ttpts.UnixNano()/int64(time.Millisecond), 0)
return &timetravel{tt}, nil
}
......@@ -113,10 +113,10 @@ func TestVerifyResponse(t *testing.T) {
func Test_getTimetravelReverseTime(t *testing.T) {
Params.Init()
Params.DataCoordCfg.CompactionRetentionDuration = 43200 // 5 days
Params.DataCoordCfg.RetentionDuration = 43200 // 5 days
tFixed := time.Date(2021, 11, 15, 0, 0, 0, 0, time.Local)
tBefore := tFixed.Add(-time.Duration(Params.DataCoordCfg.CompactionRetentionDuration) * time.Second)
tBefore := tFixed.Add(-time.Duration(Params.DataCoordCfg.RetentionDuration) * time.Second)
type args struct {
allocator allocator
......
......@@ -2341,14 +2341,6 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
defer sp.Finish()
traceID, _, _ := trace.InfoFromSpan(sp)
queryRequest := &milvuspb.QueryRequest{
DbName: request.DbName,
CollectionName: request.CollectionName,
PartitionNames: request.PartitionNames,
Expr: request.Expr,
OutputFields: request.OutputFields,
}
qt := &queryTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
......@@ -2360,7 +2352,7 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
ResultChannelID: strconv.FormatInt(Params.ProxyCfg.ProxyID, 10),
},
resultBuf: make(chan []*internalpb.RetrieveResults),
query: queryRequest,
query: request,
chMgr: node.chMgr,
qc: node.queryCoord,
}
......@@ -2371,9 +2363,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
rpcReceived(method),
zap.String("traceID", traceID),
zap.String("role", typeutil.ProxyRole),
zap.String("db", queryRequest.DbName),
zap.String("collection", queryRequest.CollectionName),
zap.Any("partitions", queryRequest.PartitionNames))
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.Any("partitions", request.PartitionNames))
if err := node.sched.dqQueue.Enqueue(qt); err != nil {
log.Warn(
......@@ -2381,9 +2373,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
zap.Error(err),
zap.String("traceID", traceID),
zap.String("role", typeutil.ProxyRole),
zap.String("db", queryRequest.DbName),
zap.String("collection", queryRequest.CollectionName),
zap.Any("partitions", queryRequest.PartitionNames))
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.Any("partitions", request.PartitionNames))
return &milvuspb.QueryResults{
Status: &commonpb.Status{
......@@ -2400,9 +2392,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
zap.Int64("MsgID", qt.ID()),
zap.Uint64("BeginTs", qt.BeginTs()),
zap.Uint64("EndTs", qt.EndTs()),
zap.String("db", queryRequest.DbName),
zap.String("collection", queryRequest.CollectionName),
zap.Any("partitions", queryRequest.PartitionNames))
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.Any("partitions", request.PartitionNames))
if err := qt.WaitToFinish(); err != nil {
log.Warn(
......@@ -2413,9 +2405,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
zap.Int64("MsgID", qt.ID()),
zap.Uint64("BeginTs", qt.BeginTs()),
zap.Uint64("EndTs", qt.EndTs()),
zap.String("db", queryRequest.DbName),
zap.String("collection", queryRequest.CollectionName),
zap.Any("partitions", queryRequest.PartitionNames))
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.Any("partitions", request.PartitionNames))
return &milvuspb.QueryResults{
Status: &commonpb.Status{
......@@ -2432,9 +2424,9 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
zap.Int64("MsgID", qt.ID()),
zap.Uint64("BeginTs", qt.BeginTs()),
zap.Uint64("EndTs", qt.EndTs()),
zap.String("db", queryRequest.DbName),
zap.String("collection", queryRequest.CollectionName),
zap.Any("partitions", queryRequest.PartitionNames))
zap.String("db", request.DbName),
zap.String("collection", request.CollectionName),
zap.Any("partitions", request.PartitionNames))
return &milvuspb.QueryResults{
Status: qt.result.Status,
......
......@@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/zap"
......@@ -1256,6 +1257,32 @@ func TestProxy(t *testing.T) {
// TODO(dragondriver): compare search result
})
wg.Add(1)
t.Run("search_travel", func(t *testing.T) {
defer wg.Done()
past := time.Now().Add(time.Duration(-1*Params.ProxyCfg.RetentionDuration-100) * time.Second)
travelTs := tsoutil.ComposeTSByTime(past, 0)
req := constructSearchRequest()
req.TravelTimestamp = travelTs
//resp, err := proxy.Search(ctx, req)
res, err := proxy.Search(ctx, req)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, res.Status.ErrorCode)
})
wg.Add(1)
t.Run("search_travel_succ", func(t *testing.T) {
defer wg.Done()
past := time.Now().Add(time.Duration(-1*Params.ProxyCfg.RetentionDuration+100) * time.Second)
travelTs := tsoutil.ComposeTSByTime(past, 0)
req := constructSearchRequest()
req.TravelTimestamp = travelTs
//resp, err := proxy.Search(ctx, req)
res, err := proxy.Search(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, res.Status.ErrorCode)
})
wg.Add(1)
t.Run("query", func(t *testing.T) {
defer wg.Done()
......@@ -1275,6 +1302,46 @@ func TestProxy(t *testing.T) {
// assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// TODO(dragondriver): compare query result
})
wg.Add(1)
t.Run("query_travel", func(t *testing.T) {
defer wg.Done()
past := time.Now().Add(time.Duration(-1*Params.ProxyCfg.RetentionDuration-100) * time.Second)
travelTs := tsoutil.ComposeTSByTime(past, 0)
queryReq := &milvuspb.QueryRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
Expr: expr,
OutputFields: nil,
PartitionNames: nil,
TravelTimestamp: travelTs,
GuaranteeTimestamp: 0,
}
res, err := proxy.Query(ctx, queryReq)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, res.Status.ErrorCode)
})
wg.Add(1)
t.Run("query_travel_succ", func(t *testing.T) {
defer wg.Done()
past := time.Now().Add(time.Duration(-1*Params.ProxyCfg.RetentionDuration+100) * time.Second)
travelTs := tsoutil.ComposeTSByTime(past, 0)
queryReq := &milvuspb.QueryRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
Expr: expr,
OutputFields: nil,
PartitionNames: nil,
TravelTimestamp: travelTs,
GuaranteeTimestamp: 0,
}
res, err := proxy.Query(ctx, queryReq)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_EmptyCollection, res.Status.ErrorCode)
})
}
wg.Add(1)
......
......@@ -28,6 +28,7 @@ import (
"sort"
"strconv"
"strings"
"time"
"unsafe"
"github.com/golang/protobuf/proto"
......@@ -1577,6 +1578,12 @@ func (st *searchTask) PreExecute(ctx context.Context) error {
travelTimestamp := st.query.TravelTimestamp
if travelTimestamp == 0 {
travelTimestamp = st.BeginTs()
} else {
durationSeconds := tsoutil.CalculateDuration(st.BeginTs(), travelTimestamp) / 1000
if durationSeconds > Params.ProxyCfg.RetentionDuration {
duration := time.Second * time.Duration(durationSeconds)
return fmt.Errorf("only support to travel back to %s so far", duration.String())
}
}
guaranteeTimestamp := st.query.GuaranteeTimestamp
if guaranteeTimestamp == 0 {
......@@ -2186,10 +2193,15 @@ func (qt *queryTask) PreExecute(ctx context.Context) error {
if err != nil {
return err
}
travelTimestamp := qt.query.TravelTimestamp
if travelTimestamp == 0 {
travelTimestamp = qt.BeginTs()
} else {
durationSeconds := tsoutil.CalculateDuration(qt.BeginTs(), travelTimestamp) / 1000
if durationSeconds > Params.ProxyCfg.RetentionDuration {
duration := time.Second * time.Duration(durationSeconds)
return fmt.Errorf("only support to travel back to %s so far", duration.String())
}
}
guaranteeTimestamp := qt.query.GuaranteeTimestamp
if guaranteeTimestamp == 0 {
......
......@@ -42,6 +42,9 @@ const (
// SuggestPulsarMaxMessageSize defines the maximum size of Pulsar message.
SuggestPulsarMaxMessageSize = 5 * 1024 * 1024
// DefaultRetentionDuration defines the default duration for retention which is 5 days in seconds.
DefaultRetentionDuration = 3600 * 24 * 5
)
// GlobalParamTable is a derived struct of BaseParamTable.
......@@ -489,6 +492,8 @@ type proxyConfig struct {
PulsarMaxMessageSize int
RetentionDuration int64
CreatedTime time.Time
UpdatedTime time.Time
}
......@@ -518,6 +523,7 @@ func (p *proxyConfig) init(bp *BaseParamTable) {
p.initMaxTaskNum()
p.initBufFlagExpireTime()
p.initBufFlagCleanupInterval()
p.initRetentionDuration()
}
// Refresh is called after session init
......@@ -668,6 +674,10 @@ func (p *proxyConfig) initBufFlagCleanupInterval() {
p.BufFlagCleanupInterval = time.Duration(interval) * time.Second
}
func (p *proxyConfig) initRetentionDuration() {
p.RetentionDuration = p.BaseParams.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration)
}
///////////////////////////////////////////////////////////////////////////////
// --- querycoord ---
type queryCoordConfig struct {
......@@ -1298,8 +1308,8 @@ type dataCoordConfig struct {
EnableCompaction bool
EnableGarbageCollection bool
CompactionRetentionDuration int64
EnableAutoCompaction bool
RetentionDuration int64
EnableAutoCompaction bool
// Garbage Collection
GCInterval time.Duration
......@@ -1342,7 +1352,7 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) {
p.initMinioBucketName()
p.initMinioRootPath()
p.initCompactionRetentionDuration()
p.initRetentionDuration()
p.initEnableAutoCompaction()
p.initEnableGarbageCollection()
......@@ -1556,8 +1566,8 @@ func (p *dataCoordConfig) initMinioRootPath() {
p.MinioRootPath = rootPath
}
func (p *dataCoordConfig) initCompactionRetentionDuration() {
p.CompactionRetentionDuration = p.BaseParams.ParseInt64WithDefault("dataCoord.compaction.retentionDuration", 432000)
func (p *dataCoordConfig) initRetentionDuration() {
p.RetentionDuration = p.BaseParams.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration)
}
func (p *dataCoordConfig) initEnableAutoCompaction() {
......
......@@ -55,6 +55,13 @@ func ParseHybridTs(ts uint64) (int64, int64) {
return int64(physical), int64(logical)
}
// CalculateDuration returns the number of milliseconds obtained by subtracting ts2 from ts1.
func CalculateDuration(ts1, ts2 typeutil.Timestamp) int64 {
p1, _ := ParseHybridTs(ts1)
p2, _ := ParseHybridTs(ts2)
return p1 - p2
}
// Mod24H parses the ts to millisecond in one day
func Mod24H(ts uint64) uint64 {
logical := ts & logicalBitsMask
......
......@@ -49,3 +49,12 @@ func Test_Tso(t *testing.T) {
assert.Equal(t, typeutil.ZeroTimestamp, l)
})
}
func TestCalculateDuration(t *testing.T) {
now := time.Now()
ts1 := ComposeTSByTime(now, 0)
durationInMilliSecs := int64(20 * 1000)
ts2 := ComposeTSByTime(now.Add(time.Duration(durationInMilliSecs)*time.Millisecond), 0)
diff := CalculateDuration(ts2, ts1)
assert.Equal(t, durationInMilliSecs, diff)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册