Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
milvus
milvus
提交
63239075
M
milvus
项目概览
milvus
/
milvus
11 个月 前同步成功
通知
260
Star
22476
Fork
2472
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
63239075
编写于
1月 10, 2022
作者:
C
Cai Yudong
提交者:
GitHub
1月 10, 2022
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Use CommonConfig in GlobalParams for all components (#15106)
Signed-off-by:
N
yudong.cai
<
yudong.cai@zilliz.com
>
上级
c21b40d6
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
88 addition
and
122 deletion
+88
-122
internal/datacoord/util.go
internal/datacoord/util.go
+1
-1
internal/datacoord/util_test.go
internal/datacoord/util_test.go
+2
-2
internal/distributed/rootcoord/service_test.go
internal/distributed/rootcoord/service_test.go
+7
-7
internal/proxy/impl.go
internal/proxy/impl.go
+1
-1
internal/proxy/metrics_info.go
internal/proxy/metrics_info.go
+2
-2
internal/proxy/proxy_test.go
internal/proxy/proxy_test.go
+4
-4
internal/proxy/task.go
internal/proxy/task.go
+7
-7
internal/rootcoord/meta_table.go
internal/rootcoord/meta_table.go
+2
-2
internal/rootcoord/meta_table_test.go
internal/rootcoord/meta_table_test.go
+2
-2
internal/rootcoord/root_coord_test.go
internal/rootcoord/root_coord_test.go
+11
-11
internal/rootcoord/task.go
internal/rootcoord/task.go
+3
-3
internal/util/paramtable/global_param.go
internal/util/paramtable/global_param.go
+34
-68
internal/util/paramtable/global_param_test.go
internal/util/paramtable/global_param_test.go
+12
-12
未找到文件。
internal/datacoord/util.go
浏览文件 @
63239075
...
...
@@ -73,7 +73,7 @@ func getTimetravelReverseTime(ctx context.Context, allocator allocator) (*timetr
}
pts
,
_
:=
tsoutil
.
ParseTS
(
ts
)
ttpts
:=
pts
.
Add
(
-
time
.
Duration
(
Params
.
DataCoord
Cfg
.
RetentionDuration
)
*
time
.
Second
)
ttpts
:=
pts
.
Add
(
-
time
.
Duration
(
Params
.
Common
Cfg
.
RetentionDuration
)
*
time
.
Second
)
tt
:=
tsoutil
.
ComposeTS
(
ttpts
.
UnixNano
()
/
int64
(
time
.
Millisecond
),
0
)
return
&
timetravel
{
tt
},
nil
}
internal/datacoord/util_test.go
浏览文件 @
63239075
...
...
@@ -113,10 +113,10 @@ func TestVerifyResponse(t *testing.T) {
func
Test_getTimetravelReverseTime
(
t
*
testing
.
T
)
{
Params
.
Init
()
Params
.
DataCoord
Cfg
.
RetentionDuration
=
43200
// 5 days
Params
.
Common
Cfg
.
RetentionDuration
=
43200
// 5 days
tFixed
:=
time
.
Date
(
2021
,
11
,
15
,
0
,
0
,
0
,
0
,
time
.
Local
)
tBefore
:=
tFixed
.
Add
(
-
time
.
Duration
(
Params
.
DataCoord
Cfg
.
RetentionDuration
)
*
time
.
Second
)
tBefore
:=
tFixed
.
Add
(
-
time
.
Duration
(
Params
.
Common
Cfg
.
RetentionDuration
)
*
time
.
Second
)
type
args
struct
{
allocator
allocator
...
...
internal/distributed/rootcoord/service_test.go
浏览文件 @
63239075
...
...
@@ -87,8 +87,8 @@ func TestGrpcService(t *testing.T) {
rootcoord
.
Params
.
RootCoordCfg
.
StatisticsChannel
=
fmt
.
Sprintf
(
"stateChannel%d"
,
randVal
)
rootcoord
.
Params
.
RootCoordCfg
.
MaxPartitionNum
=
64
rootcoord
.
Params
.
RootCoord
Cfg
.
DefaultPartitionName
=
"_default"
rootcoord
.
Params
.
RootCoord
Cfg
.
DefaultIndexName
=
"_default"
rootcoord
.
Params
.
Common
Cfg
.
DefaultPartitionName
=
"_default"
rootcoord
.
Params
.
Common
Cfg
.
DefaultIndexName
=
"_default"
t
.
Logf
(
"service port = %d"
,
Params
.
Port
)
...
...
@@ -639,7 +639,7 @@ func TestGrpcService(t *testing.T) {
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
commonpb
.
ErrorCode_Success
,
rsp
.
Status
.
ErrorCode
)
assert
.
Equal
(
t
,
1
,
len
(
rsp
.
IndexDescriptions
))
assert
.
Equal
(
t
,
rootcoord
.
Params
.
RootCoord
Cfg
.
DefaultIndexName
,
rsp
.
IndexDescriptions
[
0
]
.
IndexName
)
assert
.
Equal
(
t
,
rootcoord
.
Params
.
Common
Cfg
.
DefaultIndexName
,
rsp
.
IndexDescriptions
[
0
]
.
IndexName
)
})
t
.
Run
(
"flush segment"
,
func
(
t
*
testing
.
T
)
{
...
...
@@ -687,7 +687,7 @@ func TestGrpcService(t *testing.T) {
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
commonpb
.
ErrorCode_Success
,
rsp
.
Status
.
ErrorCode
)
assert
.
Equal
(
t
,
1
,
len
(
rsp
.
IndexDescriptions
))
assert
.
Equal
(
t
,
rootcoord
.
Params
.
RootCoord
Cfg
.
DefaultIndexName
,
rsp
.
IndexDescriptions
[
0
]
.
IndexName
)
assert
.
Equal
(
t
,
rootcoord
.
Params
.
Common
Cfg
.
DefaultIndexName
,
rsp
.
IndexDescriptions
[
0
]
.
IndexName
)
})
...
...
@@ -702,9 +702,9 @@ func TestGrpcService(t *testing.T) {
DbName
:
dbName
,
CollectionName
:
collName
,
FieldName
:
fieldName
,
IndexName
:
rootcoord
.
Params
.
RootCoord
Cfg
.
DefaultIndexName
,
IndexName
:
rootcoord
.
Params
.
Common
Cfg
.
DefaultIndexName
,
}
_
,
idx
,
err
:=
core
.
MetaTable
.
GetIndexByName
(
collName
,
rootcoord
.
Params
.
RootCoord
Cfg
.
DefaultIndexName
)
_
,
idx
,
err
:=
core
.
MetaTable
.
GetIndexByName
(
collName
,
rootcoord
.
Params
.
Common
Cfg
.
DefaultIndexName
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
len
(
idx
),
1
)
rsp
,
err
:=
cli
.
DropIndex
(
ctx
,
req
)
...
...
@@ -737,7 +737,7 @@ func TestGrpcService(t *testing.T) {
assert
.
Equal
(
t
,
1
,
len
(
collMeta
.
PartitionIDs
))
partName
,
err
:=
core
.
MetaTable
.
GetPartitionNameByID
(
collMeta
.
ID
,
collMeta
.
PartitionIDs
[
0
],
0
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
rootcoord
.
Params
.
RootCoord
Cfg
.
DefaultPartitionName
,
partName
)
assert
.
Equal
(
t
,
rootcoord
.
Params
.
Common
Cfg
.
DefaultPartitionName
,
partName
)
assert
.
Equal
(
t
,
2
,
len
(
collectionMetaCache
))
})
...
...
internal/proxy/impl.go
浏览文件 @
63239075
...
...
@@ -1962,7 +1962,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
}
if
len
(
it
.
PartitionName
)
<=
0
{
it
.
PartitionName
=
Params
.
Proxy
Cfg
.
DefaultPartitionName
it
.
PartitionName
=
Params
.
Common
Cfg
.
DefaultPartitionName
}
constructFailedResponse
:=
func
(
err
error
)
*
milvuspb
.
MutationResult
{
...
...
internal/proxy/metrics_info.go
浏览文件 @
63239075
...
...
@@ -73,8 +73,8 @@ func getSystemInfoMetrics(
ID
:
node
.
session
.
ServerID
,
},
SystemConfigurations
:
metricsinfo
.
ProxyConfiguration
{
DefaultPartitionName
:
Params
.
Proxy
Cfg
.
DefaultPartitionName
,
DefaultIndexName
:
Params
.
Proxy
Cfg
.
DefaultIndexName
,
DefaultPartitionName
:
Params
.
Common
Cfg
.
DefaultPartitionName
,
DefaultIndexName
:
Params
.
Common
Cfg
.
DefaultIndexName
,
},
},
}
...
...
internal/proxy/proxy_test.go
浏览文件 @
63239075
...
...
@@ -1260,7 +1260,7 @@ func TestProxy(t *testing.T) {
wg
.
Add
(
1
)
t
.
Run
(
"search_travel"
,
func
(
t
*
testing
.
T
)
{
defer
wg
.
Done
()
past
:=
time
.
Now
()
.
Add
(
time
.
Duration
(
-
1
*
Params
.
Proxy
Cfg
.
RetentionDuration
-
100
)
*
time
.
Second
)
past
:=
time
.
Now
()
.
Add
(
time
.
Duration
(
-
1
*
Params
.
Common
Cfg
.
RetentionDuration
-
100
)
*
time
.
Second
)
travelTs
:=
tsoutil
.
ComposeTSByTime
(
past
,
0
)
req
:=
constructSearchRequest
()
req
.
TravelTimestamp
=
travelTs
...
...
@@ -1273,7 +1273,7 @@ func TestProxy(t *testing.T) {
wg
.
Add
(
1
)
t
.
Run
(
"search_travel_succ"
,
func
(
t
*
testing
.
T
)
{
defer
wg
.
Done
()
past
:=
time
.
Now
()
.
Add
(
time
.
Duration
(
-
1
*
Params
.
Proxy
Cfg
.
RetentionDuration
+
100
)
*
time
.
Second
)
past
:=
time
.
Now
()
.
Add
(
time
.
Duration
(
-
1
*
Params
.
Common
Cfg
.
RetentionDuration
+
100
)
*
time
.
Second
)
travelTs
:=
tsoutil
.
ComposeTSByTime
(
past
,
0
)
req
:=
constructSearchRequest
()
req
.
TravelTimestamp
=
travelTs
...
...
@@ -1306,7 +1306,7 @@ func TestProxy(t *testing.T) {
wg
.
Add
(
1
)
t
.
Run
(
"query_travel"
,
func
(
t
*
testing
.
T
)
{
defer
wg
.
Done
()
past
:=
time
.
Now
()
.
Add
(
time
.
Duration
(
-
1
*
Params
.
Proxy
Cfg
.
RetentionDuration
-
100
)
*
time
.
Second
)
past
:=
time
.
Now
()
.
Add
(
time
.
Duration
(
-
1
*
Params
.
Common
Cfg
.
RetentionDuration
-
100
)
*
time
.
Second
)
travelTs
:=
tsoutil
.
ComposeTSByTime
(
past
,
0
)
queryReq
:=
&
milvuspb
.
QueryRequest
{
Base
:
nil
,
...
...
@@ -1326,7 +1326,7 @@ func TestProxy(t *testing.T) {
wg
.
Add
(
1
)
t
.
Run
(
"query_travel_succ"
,
func
(
t
*
testing
.
T
)
{
defer
wg
.
Done
()
past
:=
time
.
Now
()
.
Add
(
time
.
Duration
(
-
1
*
Params
.
Proxy
Cfg
.
RetentionDuration
+
100
)
*
time
.
Second
)
past
:=
time
.
Now
()
.
Add
(
time
.
Duration
(
-
1
*
Params
.
Common
Cfg
.
RetentionDuration
+
100
)
*
time
.
Second
)
travelTs
:=
tsoutil
.
ComposeTSByTime
(
past
,
0
)
queryReq
:=
&
milvuspb
.
QueryRequest
{
Base
:
nil
,
...
...
internal/proxy/task.go
浏览文件 @
63239075
...
...
@@ -1003,7 +1003,7 @@ func (it *insertTask) Execute(ctx context.Context) error {
return
err
}
}
else
{
partitionID
,
err
=
globalMetaCache
.
GetPartitionID
(
ctx
,
collectionName
,
Params
.
Proxy
Cfg
.
DefaultPartitionName
)
partitionID
,
err
=
globalMetaCache
.
GetPartitionID
(
ctx
,
collectionName
,
Params
.
Common
Cfg
.
DefaultPartitionName
)
if
err
!=
nil
{
return
err
}
...
...
@@ -1580,7 +1580,7 @@ func (st *searchTask) PreExecute(ctx context.Context) error {
travelTimestamp
=
st
.
BeginTs
()
}
else
{
durationSeconds
:=
tsoutil
.
CalculateDuration
(
st
.
BeginTs
(),
travelTimestamp
)
/
1000
if
durationSeconds
>
Params
.
Proxy
Cfg
.
RetentionDuration
{
if
durationSeconds
>
Params
.
Common
Cfg
.
RetentionDuration
{
duration
:=
time
.
Second
*
time
.
Duration
(
durationSeconds
)
return
fmt
.
Errorf
(
"only support to travel back to %s so far"
,
duration
.
String
())
}
...
...
@@ -2198,7 +2198,7 @@ func (qt *queryTask) PreExecute(ctx context.Context) error {
travelTimestamp
=
qt
.
BeginTs
()
}
else
{
durationSeconds
:=
tsoutil
.
CalculateDuration
(
qt
.
BeginTs
(),
travelTimestamp
)
/
1000
if
durationSeconds
>
Params
.
Proxy
Cfg
.
RetentionDuration
{
if
durationSeconds
>
Params
.
Common
Cfg
.
RetentionDuration
{
duration
:=
time
.
Second
*
time
.
Duration
(
durationSeconds
)
return
fmt
.
Errorf
(
"only support to travel back to %s so far"
,
duration
.
String
())
}
...
...
@@ -3513,7 +3513,7 @@ func (dit *describeIndexTask) PreExecute(ctx context.Context) error {
// only support default index name for now. @2021.02.18
if
dit
.
IndexName
==
""
{
dit
.
IndexName
=
Params
.
Proxy
Cfg
.
DefaultIndexName
dit
.
IndexName
=
Params
.
Common
Cfg
.
DefaultIndexName
}
return
nil
...
...
@@ -3595,7 +3595,7 @@ func (dit *dropIndexTask) PreExecute(ctx context.Context) error {
}
if
dit
.
IndexName
==
""
{
dit
.
IndexName
=
Params
.
Proxy
Cfg
.
DefaultIndexName
dit
.
IndexName
=
Params
.
Common
Cfg
.
DefaultIndexName
}
return
nil
...
...
@@ -3699,7 +3699,7 @@ func (gibpt *getIndexBuildProgressTask) Execute(ctx context.Context) error {
}
if
gibpt
.
IndexName
==
""
{
gibpt
.
IndexName
=
Params
.
Proxy
Cfg
.
DefaultIndexName
gibpt
.
IndexName
=
Params
.
Common
Cfg
.
DefaultIndexName
}
describeIndexReq
:=
milvuspb
.
DescribeIndexRequest
{
...
...
@@ -3920,7 +3920,7 @@ func (gist *getIndexStateTask) Execute(ctx context.Context) error {
}
if
gist
.
IndexName
==
""
{
gist
.
IndexName
=
Params
.
Proxy
Cfg
.
DefaultIndexName
gist
.
IndexName
=
Params
.
Common
Cfg
.
DefaultIndexName
}
describeIndexReq
:=
milvuspb
.
DescribeIndexRequest
{
...
...
internal/rootcoord/meta_table.go
浏览文件 @
63239075
...
...
@@ -715,7 +715,7 @@ func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
mt
.
ddLock
.
Lock
()
defer
mt
.
ddLock
.
Unlock
()
if
partitionName
==
Params
.
RootCoord
Cfg
.
DefaultPartitionName
{
if
partitionName
==
Params
.
Common
Cfg
.
DefaultPartitionName
{
return
0
,
fmt
.
Errorf
(
"default partition cannot be deleted"
)
}
...
...
@@ -954,7 +954,7 @@ func (mt *MetaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, fieldID in
if
fieldID
==
-
1
&&
idxName
==
""
{
// return default index
for
_
,
seg
:=
range
segIdxMap
{
info
,
ok
:=
mt
.
indexID2Meta
[
seg
.
IndexID
]
if
ok
&&
info
.
IndexName
==
Params
.
RootCoord
Cfg
.
DefaultIndexName
{
if
ok
&&
info
.
IndexName
==
Params
.
Common
Cfg
.
DefaultIndexName
{
return
seg
,
nil
}
}
...
...
internal/rootcoord/meta_table_test.go
浏览文件 @
63239075
...
...
@@ -276,7 +276,7 @@ func TestMetaTable(t *testing.T) {
},
CreateTime
:
0
,
PartitionIDs
:
[]
typeutil
.
UniqueID
{
partIDDefault
},
PartitionNames
:
[]
string
{
Params
.
RootCoord
Cfg
.
DefaultPartitionName
},
PartitionNames
:
[]
string
{
Params
.
Common
Cfg
.
DefaultPartitionName
},
PartitionCreatedTimestamps
:
[]
uint64
{
0
},
}
idxInfo
:=
[]
*
pb
.
IndexInfo
{
...
...
@@ -733,7 +733,7 @@ func TestMetaTable(t *testing.T) {
assert
.
Nil
(
t
,
err
)
ts
=
ftso
()
_
,
err
=
mt
.
DeletePartition
(
collInfo
.
ID
,
Params
.
RootCoord
Cfg
.
DefaultPartitionName
,
ts
,
""
)
_
,
err
=
mt
.
DeletePartition
(
collInfo
.
ID
,
Params
.
Common
Cfg
.
DefaultPartitionName
,
ts
,
""
)
assert
.
NotNil
(
t
,
err
)
assert
.
EqualError
(
t
,
err
,
"default partition cannot be deleted"
)
...
...
internal/rootcoord/root_coord_test.go
浏览文件 @
63239075
...
...
@@ -345,7 +345,7 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32
ID
:
collID
,
Schema
:
&
schema
,
PartitionIDs
:
[]
typeutil
.
UniqueID
{
partID
},
PartitionNames
:
[]
string
{
Params
.
RootCoord
Cfg
.
DefaultPartitionName
},
PartitionNames
:
[]
string
{
Params
.
Common
Cfg
.
DefaultPartitionName
},
FieldIndexes
:
make
([]
*
etcdpb
.
FieldIndexInfo
,
0
,
16
),
VirtualChannelNames
:
vchanNames
,
PhysicalChannelNames
:
chanNames
,
...
...
@@ -370,7 +370,7 @@ func createCollectionInMeta(dbName, collName string, core *Core, shardsNum int32
Base
:
t
.
Base
,
DbName
:
t
.
DbName
,
CollectionName
:
t
.
CollectionName
,
PartitionName
:
Params
.
RootCoord
Cfg
.
DefaultPartitionName
,
PartitionName
:
Params
.
Common
Cfg
.
DefaultPartitionName
,
DbID
:
0
,
//TODO,not used
CollectionID
:
collID
,
PartitionID
:
partID
,
...
...
@@ -1071,7 +1071,7 @@ func TestRootCoord(t *testing.T) {
assert
.
Equal
(
t
,
1
,
len
(
collMeta
.
FieldIndexes
))
idxMeta
,
err
:=
core
.
MetaTable
.
GetIndexByID
(
collMeta
.
FieldIndexes
[
0
]
.
IndexID
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
Params
.
RootCoord
Cfg
.
DefaultIndexName
,
idxMeta
.
IndexName
)
assert
.
Equal
(
t
,
Params
.
Common
Cfg
.
DefaultIndexName
,
idxMeta
.
IndexName
)
req
.
FieldName
=
"no field"
rsp
,
err
=
core
.
CreateIndex
(
ctx
,
req
)
...
...
@@ -1120,7 +1120,7 @@ func TestRootCoord(t *testing.T) {
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
commonpb
.
ErrorCode_Success
,
rsp
.
Status
.
ErrorCode
)
assert
.
Equal
(
t
,
1
,
len
(
rsp
.
IndexDescriptions
))
assert
.
Equal
(
t
,
Params
.
RootCoord
Cfg
.
DefaultIndexName
,
rsp
.
IndexDescriptions
[
0
]
.
IndexName
)
assert
.
Equal
(
t
,
Params
.
Common
Cfg
.
DefaultIndexName
,
rsp
.
IndexDescriptions
[
0
]
.
IndexName
)
assert
.
Equal
(
t
,
"vector"
,
rsp
.
IndexDescriptions
[
0
]
.
FieldName
)
})
...
...
@@ -1182,7 +1182,7 @@ func TestRootCoord(t *testing.T) {
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
commonpb
.
ErrorCode_Success
,
rsp
.
Status
.
ErrorCode
)
assert
.
Equal
(
t
,
1
,
len
(
rsp
.
IndexDescriptions
))
assert
.
Equal
(
t
,
Params
.
RootCoord
Cfg
.
DefaultIndexName
,
rsp
.
IndexDescriptions
[
0
]
.
IndexName
)
assert
.
Equal
(
t
,
Params
.
Common
Cfg
.
DefaultIndexName
,
rsp
.
IndexDescriptions
[
0
]
.
IndexName
)
})
wg
.
Add
(
1
)
...
...
@@ -1223,11 +1223,11 @@ func TestRootCoord(t *testing.T) {
idxMeta
,
err
:=
core
.
MetaTable
.
GetIndexByID
(
collMeta
.
FieldIndexes
[
1
]
.
IndexID
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
Params
.
RootCoord
Cfg
.
DefaultIndexName
,
idxMeta
.
IndexName
)
assert
.
Equal
(
t
,
Params
.
Common
Cfg
.
DefaultIndexName
,
idxMeta
.
IndexName
)
idxMeta
,
err
=
core
.
MetaTable
.
GetIndexByID
(
collMeta
.
FieldIndexes
[
0
]
.
IndexID
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
Params
.
RootCoord
Cfg
.
DefaultIndexName
+
"_bak"
,
idxMeta
.
IndexName
)
assert
.
Equal
(
t
,
Params
.
Common
Cfg
.
DefaultIndexName
+
"_bak"
,
idxMeta
.
IndexName
)
})
...
...
@@ -1244,9 +1244,9 @@ func TestRootCoord(t *testing.T) {
DbName
:
""
,
CollectionName
:
collName
,
FieldName
:
"vector"
,
IndexName
:
Params
.
RootCoord
Cfg
.
DefaultIndexName
,
IndexName
:
Params
.
Common
Cfg
.
DefaultIndexName
,
}
_
,
idx
,
err
:=
core
.
MetaTable
.
GetIndexByName
(
collName
,
Params
.
RootCoord
Cfg
.
DefaultIndexName
)
_
,
idx
,
err
:=
core
.
MetaTable
.
GetIndexByName
(
collName
,
Params
.
Common
Cfg
.
DefaultIndexName
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
1
,
len
(
idx
))
...
...
@@ -1259,7 +1259,7 @@ func TestRootCoord(t *testing.T) {
assert
.
Equal
(
t
,
idx
[
0
]
.
IndexID
,
im
.
idxDropID
[
0
])
im
.
mutex
.
Unlock
()
_
,
idx
,
err
=
core
.
MetaTable
.
GetIndexByName
(
collName
,
Params
.
RootCoord
Cfg
.
DefaultIndexName
)
_
,
idx
,
err
=
core
.
MetaTable
.
GetIndexByName
(
collName
,
Params
.
Common
Cfg
.
DefaultIndexName
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
0
,
len
(
idx
))
})
...
...
@@ -1289,7 +1289,7 @@ func TestRootCoord(t *testing.T) {
assert
.
Equal
(
t
,
1
,
len
(
collMeta
.
PartitionIDs
))
partName
,
err
:=
core
.
MetaTable
.
GetPartitionNameByID
(
collMeta
.
ID
,
collMeta
.
PartitionIDs
[
0
],
0
)
assert
.
Nil
(
t
,
err
)
assert
.
Equal
(
t
,
Params
.
RootCoord
Cfg
.
DefaultPartitionName
,
partName
)
assert
.
Equal
(
t
,
Params
.
Common
Cfg
.
DefaultPartitionName
,
partName
)
msgs
:=
getNotTtMsg
(
ctx
,
1
,
dmlStream
.
Chan
())
assert
.
Equal
(
t
,
1
,
len
(
msgs
))
...
...
internal/rootcoord/task.go
浏览文件 @
63239075
...
...
@@ -156,7 +156,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
ID
:
collID
,
Schema
:
&
schema
,
PartitionIDs
:
[]
typeutil
.
UniqueID
{
partID
},
PartitionNames
:
[]
string
{
Params
.
RootCoord
Cfg
.
DefaultPartitionName
},
PartitionNames
:
[]
string
{
Params
.
Common
Cfg
.
DefaultPartitionName
},
FieldIndexes
:
make
([]
*
etcdpb
.
FieldIndexInfo
,
0
,
16
),
VirtualChannelNames
:
vchanNames
,
PhysicalChannelNames
:
chanNames
,
...
...
@@ -178,7 +178,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
Base
:
t
.
Req
.
Base
,
DbName
:
t
.
Req
.
DbName
,
CollectionName
:
t
.
Req
.
CollectionName
,
PartitionName
:
Params
.
RootCoord
Cfg
.
DefaultPartitionName
,
PartitionName
:
Params
.
Common
Cfg
.
DefaultPartitionName
,
DbID
:
0
,
//TODO,not used
CollectionID
:
collID
,
PartitionID
:
partID
,
...
...
@@ -821,7 +821,7 @@ func (t *CreateIndexReqTask) Execute(ctx context.Context) error {
if
t
.
Type
()
!=
commonpb
.
MsgType_CreateIndex
{
return
fmt
.
Errorf
(
"create index, msg type = %s"
,
commonpb
.
MsgType_name
[
int32
(
t
.
Type
())])
}
indexName
:=
Params
.
RootCoord
Cfg
.
DefaultIndexName
//TODO, get name from request
indexName
:=
Params
.
Common
Cfg
.
DefaultIndexName
//TODO, get name from request
indexID
,
_
,
err
:=
t
.
core
.
IDAllocator
(
1
)
log
.
Debug
(
"RootCoord CreateIndexReqTask"
,
zap
.
Any
(
"indexID"
,
indexID
),
zap
.
Error
(
err
))
if
err
!=
nil
{
...
...
internal/util/paramtable/global_param.go
浏览文件 @
63239075
...
...
@@ -56,7 +56,8 @@ type GlobalParamTable struct {
PulsarCfg
pulsarConfig
RocksmqCfg
rocksmqConfig
MinioCfg
minioConfig
//CommonCfg commonConfig
CommonCfg
commonConfig
//KnowhereCfg knowhereConfig
//MsgChannelCfg msgChannelConfig
...
...
@@ -84,7 +85,8 @@ func (p *GlobalParamTable) Init() {
p
.
PulsarCfg
.
init
(
&
p
.
BaseParams
)
p
.
RocksmqCfg
.
init
(
&
p
.
BaseParams
)
p
.
MinioCfg
.
init
(
&
p
.
BaseParams
)
//p.CommonCfg.init(&p.BaseParams)
p
.
CommonCfg
.
init
(
&
p
.
BaseParams
)
//p.KnowhereCfg.init(&p.BaseParams)
//p.MsgChannelCfg.init(&p.BaseParams)
...
...
@@ -238,28 +240,35 @@ func (p *minioConfig) initRootPath() {
///////////////////////////////////////////////////////////////////////////////
// --- common ---
//type commonConfig struct {
// BaseParams *BaseParamTable
//
// DefaultPartitionName string
// DefaultIndexName string
//}
//
//func (p *commonConfig) init(bp *BaseParamTable) {
// p.BaseParams = bp
// p.initDefaultPartitionName()
// p.initDefaultIndexName()
//}
//
//func (p *commonConfig) initDefaultPartitionName() {
// name := p.BaseParams.LoadWithDefault("common.defaultPartitionName", "_default")
// p.DefaultPartitionName = name
//}
//
//func (p *commonConfig) initDefaultIndexName() {
// name := p.BaseParams.LoadWithDefault("common.defaultIndexName", "_default_idx")
// p.DefaultIndexName = name
//}
type
commonConfig
struct
{
BaseParams
*
BaseParamTable
DefaultPartitionName
string
DefaultIndexName
string
RetentionDuration
int64
}
func
(
p
*
commonConfig
)
init
(
bp
*
BaseParamTable
)
{
p
.
BaseParams
=
bp
p
.
initDefaultPartitionName
()
p
.
initDefaultIndexName
()
p
.
initRetentionDuration
()
}
func
(
p
*
commonConfig
)
initDefaultPartitionName
()
{
name
:=
p
.
BaseParams
.
LoadWithDefault
(
"common.defaultPartitionName"
,
"_default"
)
p
.
DefaultPartitionName
=
name
}
func
(
p
*
commonConfig
)
initDefaultIndexName
()
{
name
:=
p
.
BaseParams
.
LoadWithDefault
(
"common.defaultIndexName"
,
"_default_idx"
)
p
.
DefaultIndexName
=
name
}
func
(
p
*
commonConfig
)
initRetentionDuration
()
{
p
.
RetentionDuration
=
p
.
BaseParams
.
ParseInt64WithDefault
(
"common.retentionDuration"
,
DefaultRetentionDuration
)
}
///////////////////////////////////////////////////////////////////////////////
// --- knowhere ---
...
...
@@ -440,8 +449,6 @@ type rootCoordConfig struct {
DmlChannelNum
int64
MaxPartitionNum
int64
DefaultPartitionName
string
DefaultIndexName
string
MinSegmentSizeToEnableIndex
int64
CreatedTime
time
.
Time
...
...
@@ -462,8 +469,6 @@ func (p *rootCoordConfig) init(bp *BaseParamTable) {
p
.
initDmlChannelNum
()
p
.
initMaxPartitionNum
()
p
.
initMinSegmentSizeToEnableIndex
()
p
.
initDefaultPartitionName
()
p
.
initDefaultIndexName
()
}
func
(
p
*
rootCoordConfig
)
initClusterMsgChannelPrefix
()
{
...
...
@@ -531,16 +536,6 @@ func (p *rootCoordConfig) initMinSegmentSizeToEnableIndex() {
p
.
MinSegmentSizeToEnableIndex
=
p
.
BaseParams
.
ParseInt64WithDefault
(
"rootCoord.minSegmentSizeToEnableIndex"
,
1024
)
}
func
(
p
*
rootCoordConfig
)
initDefaultPartitionName
()
{
name
:=
p
.
BaseParams
.
LoadWithDefault
(
"common.defaultPartitionName"
,
"_default"
)
p
.
DefaultPartitionName
=
name
}
func
(
p
*
rootCoordConfig
)
initDefaultIndexName
()
{
name
:=
p
.
BaseParams
.
LoadWithDefault
(
"common.defaultIndexName"
,
"_default_idx"
)
p
.
DefaultIndexName
=
name
}
///////////////////////////////////////////////////////////////////////////////
// --- proxy ---
type
proxyConfig
struct
{
...
...
@@ -560,8 +555,6 @@ type proxyConfig struct {
MaxFieldNum
int64
MaxShardNum
int32
MaxDimension
int64
DefaultPartitionName
string
DefaultIndexName
string
BufFlagExpireTime
time
.
Duration
BufFlagCleanupInterval
time
.
Duration
...
...
@@ -576,8 +569,6 @@ type proxyConfig struct {
MaxTaskNum
int64
RetentionDuration
int64
CreatedTime
time
.
Time
UpdatedTime
time
.
Time
}
...
...
@@ -596,13 +587,10 @@ func (p *proxyConfig) init(bp *BaseParamTable) {
p
.
initMaxFieldNum
()
p
.
initMaxShardNum
()
p
.
initMaxDimension
()
p
.
initDefaultPartitionName
()
p
.
initDefaultIndexName
()
p
.
initMaxTaskNum
()
p
.
initBufFlagExpireTime
()
p
.
initBufFlagCleanupInterval
()
p
.
initRetentionDuration
()
}
// Refresh is called after session init
...
...
@@ -687,16 +675,6 @@ func (p *proxyConfig) initMaxDimension() {
p
.
MaxDimension
=
maxDimension
}
func
(
p
*
proxyConfig
)
initDefaultPartitionName
()
{
name
:=
p
.
BaseParams
.
LoadWithDefault
(
"common.defaultPartitionName"
,
"_default"
)
p
.
DefaultPartitionName
=
name
}
func
(
p
*
proxyConfig
)
initDefaultIndexName
()
{
name
:=
p
.
BaseParams
.
LoadWithDefault
(
"common.defaultIndexName"
,
"_default_idx"
)
p
.
DefaultIndexName
=
name
}
func
(
p
*
proxyConfig
)
initMaxTaskNum
()
{
p
.
MaxTaskNum
=
p
.
BaseParams
.
ParseInt64WithDefault
(
"proxy.maxTaskNum"
,
1024
)
}
...
...
@@ -711,10 +689,6 @@ func (p *proxyConfig) initBufFlagCleanupInterval() {
p
.
BufFlagCleanupInterval
=
time
.
Duration
(
interval
)
*
time
.
Second
}
func
(
p
*
proxyConfig
)
initRetentionDuration
()
{
p
.
RetentionDuration
=
p
.
BaseParams
.
ParseInt64WithDefault
(
"common.retentionDuration"
,
DefaultRetentionDuration
)
}
///////////////////////////////////////////////////////////////////////////////
// --- querycoord ---
type
queryCoordConfig
struct
{
...
...
@@ -1123,11 +1097,9 @@ type dataCoordConfig struct {
UpdatedTime
time
.
Time
EnableCompaction
bool
EnableAutoCompaction
bool
EnableGarbageCollection
bool
RetentionDuration
int64
EnableAutoCompaction
bool
// Garbage Collection
GCInterval
time
.
Duration
GCMissingTolerance
time
.
Duration
...
...
@@ -1151,8 +1123,6 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) {
p
.
initDataCoordSubscriptionName
()
p
.
initEnableCompaction
()
p
.
initRetentionDuration
()
p
.
initEnableAutoCompaction
()
p
.
initEnableGarbageCollection
()
...
...
@@ -1245,10 +1215,6 @@ func (p *dataCoordConfig) initGCDropTolerance() {
p
.
GCDropTolerance
=
time
.
Duration
(
p
.
BaseParams
.
ParseInt64WithDefault
(
"dataCoord.gc.dropTolerance"
,
24
*
60
*
60
))
*
time
.
Second
}
func
(
p
*
dataCoordConfig
)
initRetentionDuration
()
{
p
.
RetentionDuration
=
p
.
BaseParams
.
ParseInt64WithDefault
(
"common.retentionDuration"
,
DefaultRetentionDuration
)
}
func
(
p
*
dataCoordConfig
)
initEnableAutoCompaction
()
{
p
.
EnableAutoCompaction
=
p
.
BaseParams
.
ParseBool
(
"dataCoord.compaction.enableAutoCompaction"
,
false
)
}
...
...
internal/util/paramtable/global_param_test.go
浏览文件 @
63239075
...
...
@@ -67,6 +67,18 @@ func TestGlobalParamTable(t *testing.T) {
t
.
Logf
(
"Minio rootpath = %s"
,
Params
.
RootPath
)
})
t
.
Run
(
"test commonConfig"
,
func
(
t
*
testing
.
T
)
{
Params
:=
GlobalParams
.
CommonCfg
assert
.
NotEqual
(
t
,
Params
.
DefaultPartitionName
,
""
)
t
.
Logf
(
"default partition name = %s"
,
Params
.
DefaultPartitionName
)
assert
.
NotEqual
(
t
,
Params
.
DefaultIndexName
,
""
)
t
.
Logf
(
"default index name = %s"
,
Params
.
DefaultIndexName
)
assert
.
Equal
(
t
,
Params
.
RetentionDuration
,
int64
(
DefaultRetentionDuration
))
})
t
.
Run
(
"test rootCoordConfig"
,
func
(
t
*
testing
.
T
)
{
Params
:=
GlobalParams
.
RootCoordCfg
...
...
@@ -91,12 +103,6 @@ func TestGlobalParamTable(t *testing.T) {
assert
.
NotEqual
(
t
,
Params
.
MinSegmentSizeToEnableIndex
,
0
)
t
.
Logf
(
"master MinSegmentSizeToEnableIndex = %d"
,
Params
.
MinSegmentSizeToEnableIndex
)
assert
.
NotEqual
(
t
,
Params
.
DefaultPartitionName
,
""
)
t
.
Logf
(
"default partition name = %s"
,
Params
.
DefaultPartitionName
)
assert
.
NotEqual
(
t
,
Params
.
DefaultIndexName
,
""
)
t
.
Logf
(
"default index name = %s"
,
Params
.
DefaultIndexName
)
Params
.
CreatedTime
=
time
.
Now
()
Params
.
UpdatedTime
=
time
.
Now
()
t
.
Logf
(
"created time: %v"
,
Params
.
CreatedTime
)
...
...
@@ -124,12 +130,6 @@ func TestGlobalParamTable(t *testing.T) {
t
.
Logf
(
"MaxDimension: %d"
,
Params
.
MaxDimension
)
t
.
Logf
(
"DefaultPartitionName: %s"
,
Params
.
DefaultPartitionName
)
t
.
Logf
(
"DefaultIndexName: %s"
,
Params
.
DefaultIndexName
)
//t.Logf("RoleName: %s", typeutil.ProxyRole)
t
.
Logf
(
"MaxTaskNum: %d"
,
Params
.
MaxTaskNum
)
})
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录