component_param.go 25.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

package paramtable

import (
	"math"
16 17
	"os"
	"path"
18
	"strconv"
19
	"strings"
20
	"sync"
21
	"time"
22

23
	"go.uber.org/zap"
G
godchen 已提交
24 25

	"github.com/milvus-io/milvus/internal/log"
26 27 28
)

const (
29 30
	// DefaultRetentionDuration defines the default duration for retention which is 5 days in seconds.
	DefaultRetentionDuration = 3600 * 24 * 5
31 32
)

33
// ComponentParam is used to quickly and easily access all components' configurations.
34 35
type ComponentParam struct {
	ServiceParam
36
	once sync.Once
37

38
	CommonCfg commonConfig
39 40 41 42 43 44 45 46 47 48 49 50

	RootCoordCfg  rootCoordConfig
	ProxyCfg      proxyConfig
	QueryCoordCfg queryCoordConfig
	QueryNodeCfg  queryNodeConfig
	DataCoordCfg  dataCoordConfig
	DataNodeCfg   dataNodeConfig
	IndexCoordCfg indexCoordConfig
	IndexNodeCfg  indexNodeConfig
}

// InitOnce initialize once
51
func (p *ComponentParam) InitOnce() {
52 53 54 55 56
	p.once.Do(func() {
		p.Init()
	})
}

57
// Init initialize the global param table
58 59
func (p *ComponentParam) Init() {
	p.ServiceParam.Init()
60

61
	p.CommonCfg.init(&p.BaseTable)
62

63 64 65 66 67 68 69 70
	p.RootCoordCfg.init(&p.BaseTable)
	p.ProxyCfg.init(&p.BaseTable)
	p.QueryCoordCfg.init(&p.BaseTable)
	p.QueryNodeCfg.init(&p.BaseTable)
	p.DataCoordCfg.init(&p.BaseTable)
	p.DataNodeCfg.init(&p.BaseTable)
	p.IndexCoordCfg.init(&p.BaseTable)
	p.IndexNodeCfg.init(&p.BaseTable)
71 72
}

73
// SetLogConfig set log config with given role
74
func (p *ComponentParam) SetLogConfig(role string) {
75 76
	p.BaseTable.RoleName = role
	p.BaseTable.SetLogConfig()
77 78
}

79 80 81 82 83 84 85 86
func (p *ComponentParam) RocksmqEnable() bool {
	return p.RocksmqCfg.Path != ""
}

func (p *ComponentParam) PulsarEnable() bool {
	return p.PulsarCfg.Address != ""
}

87
///////////////////////////////////////////////////////////////////////////////
88
// --- common ---
89
type commonConfig struct {
90
	Base *BaseTable
91

92
	ClusterPrefix string
93

94 95
	ProxySubName string

96 97 98 99 100
	RootCoordTimeTick   string
	RootCoordStatistics string
	RootCoordDml        string
	RootCoordDelta      string
	RootCoordSubName    string
101

102 103 104 105
	QueryCoordSearch       string
	QueryCoordSearchResult string
	QueryCoordTimeTick     string
	QueryNodeStats         string
106
	QueryNodeSubName       string
107

108 109 110 111
	DataCoordStatistic   string
	DataCoordTimeTick    string
	DataCoordSegmentInfo string
	DataCoordSubName     string
112
	DataNodeSubName      string
113 114 115 116 117 118

	DefaultPartitionName string
	DefaultIndexName     string
	RetentionDuration    int64

	SimdType string
119 120
}

121
func (p *commonConfig) init(base *BaseTable) {
122
	p.Base = base
123

124 125
	// must init cluster prefix first
	p.initClusterPrefix()
126

127 128
	p.initProxySubName()

129 130 131 132 133 134 135 136 137 138
	p.initRootCoordTimeTick()
	p.initRootCoordStatistics()
	p.initRootCoordDml()
	p.initRootCoordDelta()
	p.initRootCoordSubName()

	p.initQueryCoordSearch()
	p.initQueryCoordSearchResult()
	p.initQueryCoordTimeTick()
	p.initQueryNodeStats()
139
	p.initQueryNodeSubName()
140 141 142 143 144

	p.initDataCoordStatistic()
	p.initDataCoordTimeTick()
	p.initDataCoordSegmentInfo()
	p.initDataCoordSubName()
145
	p.initDataNodeSubName()
146 147 148 149 150 151

	p.initDefaultPartitionName()
	p.initDefaultIndexName()
	p.initRetentionDuration()

	p.initSimdType()
152 153
}

154 155 156 157 158 159
func (p *commonConfig) initClusterPrefix() {
	keys := []string{
		"common.chanNamePrefix.cluster",
		"msgChannel.chanNamePrefix.cluster",
	}
	str, err := p.Base.Load2(keys)
160 161 162
	if err != nil {
		panic(err)
	}
163
	p.ClusterPrefix = str
164 165
}

166 167
func (p *commonConfig) initChanNamePrefix(keys []string) string {
	value, err := p.Base.Load2(keys)
168 169 170
	if err != nil {
		panic(err)
	}
171 172
	s := []string{p.ClusterPrefix, value}
	return strings.Join(s, "-")
173 174
}

175
// --- proxy ---
176 177 178 179 180 181
func (p *commonConfig) initProxySubName() {
	keys := []string{
		"common.subNamePrefix.proxySubNamePrefix",
		"msgChannel.subNamePrefix.proxySubNamePrefix",
	}
	p.ProxySubName = p.initChanNamePrefix(keys)
182 183
}

184
// --- rootcoord ---
185 186 187 188 189 190
func (p *commonConfig) initRootCoordTimeTick() {
	keys := []string{
		"common.chanNamePrefix.rootCoordTimeTick",
		"msgChannel.chanNamePrefix.rootCoordTimeTick",
	}
	p.RootCoordTimeTick = p.initChanNamePrefix(keys)
191 192
}

193 194 195 196 197 198
func (p *commonConfig) initRootCoordStatistics() {
	keys := []string{
		"common.chanNamePrefix.rootCoordStatistics",
		"msgChannel.chanNamePrefix.rootCoordStatistics",
	}
	p.RootCoordStatistics = p.initChanNamePrefix(keys)
199 200
}

201 202 203 204 205 206
func (p *commonConfig) initRootCoordDml() {
	keys := []string{
		"common.chanNamePrefix.rootCoordDml",
		"msgChannel.chanNamePrefix.rootCoordDml",
	}
	p.RootCoordDml = p.initChanNamePrefix(keys)
207 208
}

209 210 211 212 213 214
func (p *commonConfig) initRootCoordDelta() {
	keys := []string{
		"common.chanNamePrefix.rootCoordDelta",
		"msgChannel.chanNamePrefix.rootCoordDelta",
	}
	p.RootCoordDelta = p.initChanNamePrefix(keys)
215 216
}

217 218 219 220 221 222
func (p *commonConfig) initRootCoordSubName() {
	keys := []string{
		"common.subNamePrefix.rootCoordSubNamePrefix",
		"msgChannel.subNamePrefix.rootCoordSubNamePrefix",
	}
	p.RootCoordSubName = p.initChanNamePrefix(keys)
223 224 225
}

// --- querycoord ---
226 227 228 229 230 231
func (p *commonConfig) initQueryCoordSearch() {
	keys := []string{
		"common.chanNamePrefix.search",
		"msgChannel.chanNamePrefix.search",
	}
	p.QueryCoordSearch = p.initChanNamePrefix(keys)
232 233
}

234 235 236 237 238 239
func (p *commonConfig) initQueryCoordSearchResult() {
	keys := []string{
		"common.chanNamePrefix.searchResult",
		"msgChannel.chanNamePrefix.searchResult",
	}
	p.QueryCoordSearchResult = p.initChanNamePrefix(keys)
240 241
}

242 243 244 245 246 247
func (p *commonConfig) initQueryCoordTimeTick() {
	keys := []string{
		"common.chanNamePrefix.queryTimeTick",
		"msgChannel.chanNamePrefix.queryTimeTick",
	}
	p.QueryCoordTimeTick = p.initChanNamePrefix(keys)
248 249 250
}

// --- querynode ---
251 252 253 254 255 256
func (p *commonConfig) initQueryNodeStats() {
	keys := []string{
		"common.chanNamePrefix.queryNodeStats",
		"msgChannel.chanNamePrefix.queryNodeStats",
	}
	p.QueryNodeStats = p.initChanNamePrefix(keys)
257 258
}

259 260 261 262 263 264
func (p *commonConfig) initQueryNodeSubName() {
	keys := []string{
		"common.subNamePrefix.queryNodeSubNamePrefix",
		"msgChannel.subNamePrefix.queryNodeSubNamePrefix",
	}
	p.QueryNodeSubName = p.initChanNamePrefix(keys)
265 266
}

267
// --- datacoord ---
268 269 270 271 272 273
func (p *commonConfig) initDataCoordStatistic() {
	keys := []string{
		"common.chanNamePrefix.dataCoordStatistic",
		"msgChannel.chanNamePrefix.dataCoordStatistic",
	}
	p.DataCoordStatistic = p.initChanNamePrefix(keys)
274 275
}

276 277 278 279 280 281
func (p *commonConfig) initDataCoordTimeTick() {
	keys := []string{
		"common.chanNamePrefix.dataCoordTimeTick",
		"msgChannel.chanNamePrefix.dataCoordTimeTick",
	}
	p.DataCoordTimeTick = p.initChanNamePrefix(keys)
282 283
}

284 285 286 287 288 289
func (p *commonConfig) initDataCoordSegmentInfo() {
	keys := []string{
		"common.chanNamePrefix.dataCoordSegmentInfo",
		"msgChannel.chanNamePrefix.dataCoordSegmentInfo",
	}
	p.DataCoordSegmentInfo = p.initChanNamePrefix(keys)
290 291
}

292 293 294 295 296 297
func (p *commonConfig) initDataCoordSubName() {
	keys := []string{
		"common.subNamePrefix.dataCoordSubNamePrefix",
		"msgChannel.subNamePrefix.dataCoordSubNamePrefix",
	}
	p.DataCoordSubName = p.initChanNamePrefix(keys)
298 299
}

300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
func (p *commonConfig) initDataNodeSubName() {
	keys := []string{
		"common.subNamePrefix.dataNodeSubNamePrefix",
		"msgChannel.subNamePrefix.dataNodeSubNamePrefix",
	}
	p.DataNodeSubName = p.initChanNamePrefix(keys)
}

func (p *commonConfig) initDefaultPartitionName() {
	p.DefaultPartitionName = p.Base.LoadWithDefault("common.defaultPartitionName", "_default")
}

func (p *commonConfig) initDefaultIndexName() {
	p.DefaultIndexName = p.Base.LoadWithDefault("common.defaultIndexName", "_default_idx")
}

func (p *commonConfig) initRetentionDuration() {
	p.RetentionDuration = p.Base.ParseInt64WithDefault("common.retentionDuration", DefaultRetentionDuration)
}

func (p *commonConfig) initSimdType() {
	keys := []string{
		"common.simdType",
		"knowhere.simdType",
	}
	p.SimdType = p.Base.LoadWithDefault2(keys, "auto")
326 327
}

328 329 330
///////////////////////////////////////////////////////////////////////////////
// --- rootcoord ---
type rootCoordConfig struct {
331
	Base *BaseTable
332 333 334 335 336 337 338

	Address string
	Port    int

	DmlChannelNum               int64
	MaxPartitionNum             int64
	MinSegmentSizeToEnableIndex int64
339 340 341 342
	ImportTaskExpiration        float64
	ImportTaskRetention         float64
	ImportIndexCheckInterval    float64
	ImportIndexWaitLimit        float64
343

344 345 346
	// --- ETCD Path ---
	ImportTaskSubPath string

347 348 349 350
	CreatedTime time.Time
	UpdatedTime time.Time
}

351 352 353 354 355
func (p *rootCoordConfig) init(base *BaseTable) {
	p.Base = base
	p.DmlChannelNum = p.Base.ParseInt64WithDefault("rootCoord.dmlChannelNum", 256)
	p.MaxPartitionNum = p.Base.ParseInt64WithDefault("rootCoord.maxPartitionNum", 4096)
	p.MinSegmentSizeToEnableIndex = p.Base.ParseInt64WithDefault("rootCoord.minSegmentSizeToEnableIndex", 1024)
356 357 358 359
	p.ImportTaskExpiration = p.Base.ParseFloatWithDefault("rootCoord.importTaskExpiration", 3600)
	p.ImportTaskRetention = p.Base.ParseFloatWithDefault("rootCoord.importTaskRetention", 3600*24)
	p.ImportIndexCheckInterval = p.Base.ParseFloatWithDefault("rootCoord.importIndexCheckInterval", 60*5)
	p.ImportIndexWaitLimit = p.Base.ParseFloatWithDefault("rootCoord.importIndexWaitLimit", 60*20)
360
	p.ImportTaskSubPath = "importtask"
361 362 363 364 365
}

///////////////////////////////////////////////////////////////////////////////
// --- proxy ---
type proxyConfig struct {
366
	Base *BaseTable
367 368

	// NetworkPort & IP are not used
369 370
	NetworkPort    int
	IP             string
371 372 373 374 375 376 377 378 379 380 381 382 383
	NetworkAddress string

	Alias string

	ProxyID                  UniqueID
	TimeTickInterval         time.Duration
	MsgStreamTimeTickBufSize int64
	MaxNameLength            int64
	MaxFieldNum              int64
	MaxShardNum              int32
	MaxDimension             int64
	BufFlagExpireTime        time.Duration
	BufFlagCleanupInterval   time.Duration
384
	GinLogging               bool
385

386
	// required from QueryCoord
387 388 389 390 391 392 393 394 395
	SearchResultChannelNames   []string
	RetrieveResultChannelNames []string

	MaxTaskNum int64

	CreatedTime time.Time
	UpdatedTime time.Time
}

396 397
func (p *proxyConfig) init(base *BaseTable) {
	p.Base = base
398 399 400 401 402 403 404 405 406 407 408 409

	p.initTimeTickInterval()

	p.initMsgStreamTimeTickBufSize()
	p.initMaxNameLength()
	p.initMaxFieldNum()
	p.initMaxShardNum()
	p.initMaxDimension()

	p.initMaxTaskNum()
	p.initBufFlagExpireTime()
	p.initBufFlagCleanupInterval()
410
	p.initGinLogging()
411 412 413 414 415 416 417 418
}

// InitAlias initialize Alias member.
func (p *proxyConfig) InitAlias(alias string) {
	p.Alias = alias
}

func (p *proxyConfig) initTimeTickInterval() {
419
	interval := p.Base.ParseIntWithDefault("proxy.timeTickInterval", 200)
420 421 422 423
	p.TimeTickInterval = time.Duration(interval) * time.Millisecond
}

func (p *proxyConfig) initMsgStreamTimeTickBufSize() {
424
	p.MsgStreamTimeTickBufSize = p.Base.ParseInt64WithDefault("proxy.msgStream.timeTick.bufSize", 512)
425 426 427
}

func (p *proxyConfig) initMaxNameLength() {
428
	str := p.Base.LoadWithDefault("proxy.maxNameLength", "255")
429 430 431 432 433 434 435 436
	maxNameLength, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		panic(err)
	}
	p.MaxNameLength = maxNameLength
}

func (p *proxyConfig) initMaxShardNum() {
437
	str := p.Base.LoadWithDefault("proxy.maxShardNum", "256")
438 439 440 441 442 443 444 445
	maxShardNum, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		panic(err)
	}
	p.MaxShardNum = int32(maxShardNum)
}

func (p *proxyConfig) initMaxFieldNum() {
446
	str := p.Base.LoadWithDefault("proxy.maxFieldNum", "64")
447 448 449 450 451 452 453 454
	maxFieldNum, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		panic(err)
	}
	p.MaxFieldNum = maxFieldNum
}

func (p *proxyConfig) initMaxDimension() {
455
	str := p.Base.LoadWithDefault("proxy.maxDimension", "32768")
456 457 458 459 460 461 462 463
	maxDimension, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		panic(err)
	}
	p.MaxDimension = maxDimension
}

func (p *proxyConfig) initMaxTaskNum() {
464
	p.MaxTaskNum = p.Base.ParseInt64WithDefault("proxy.maxTaskNum", 1024)
465 466 467
}

func (p *proxyConfig) initBufFlagExpireTime() {
468
	expireTime := p.Base.ParseInt64WithDefault("proxy.bufFlagExpireTime", 3600)
469 470 471 472
	p.BufFlagExpireTime = time.Duration(expireTime) * time.Second
}

func (p *proxyConfig) initBufFlagCleanupInterval() {
473
	interval := p.Base.ParseInt64WithDefault("proxy.bufFlagCleanupInterval", 600)
474 475 476
	p.BufFlagCleanupInterval = time.Duration(interval) * time.Second
}

477 478 479 480 481
func (p *proxyConfig) initGinLogging() {
	// Gin logging is on by default.
	p.GinLogging = p.Base.ParseBool("proxy.ginLogging", true)
}

482 483 484
///////////////////////////////////////////////////////////////////////////////
// --- querycoord ---
type queryCoordConfig struct {
485
	Base *BaseTable
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505

	NodeID uint64

	Address      string
	Port         int
	QueryCoordID UniqueID

	CreatedTime time.Time
	UpdatedTime time.Time

	//---- Handoff ---
	AutoHandoff bool

	//---- Balance ---
	AutoBalance                         bool
	OverloadedMemoryThresholdPercentage float64
	BalanceIntervalSeconds              int64
	MemoryUsageMaxDifferencePercentage  float64
}

506 507
func (p *queryCoordConfig) init(base *BaseTable) {
	p.Base = base
508 509 510 511 512 513 514 515 516 517 518 519

	//---- Handoff ---
	p.initAutoHandoff()

	//---- Balance ---
	p.initAutoBalance()
	p.initOverloadedMemoryThresholdPercentage()
	p.initBalanceIntervalSeconds()
	p.initMemoryUsageMaxDifferencePercentage()
}

func (p *queryCoordConfig) initAutoHandoff() {
520
	handoff, err := p.Base.Load("queryCoord.autoHandoff")
521 522 523 524 525 526 527 528 529 530
	if err != nil {
		panic(err)
	}
	p.AutoHandoff, err = strconv.ParseBool(handoff)
	if err != nil {
		panic(err)
	}
}

func (p *queryCoordConfig) initAutoBalance() {
531
	balanceStr := p.Base.LoadWithDefault("queryCoord.autoBalance", "false")
532 533 534 535 536 537 538 539
	autoBalance, err := strconv.ParseBool(balanceStr)
	if err != nil {
		panic(err)
	}
	p.AutoBalance = autoBalance
}

func (p *queryCoordConfig) initOverloadedMemoryThresholdPercentage() {
540
	overloadedMemoryThresholdPercentage := p.Base.LoadWithDefault("queryCoord.overloadedMemoryThresholdPercentage", "90")
541 542 543 544 545 546 547 548
	thresholdPercentage, err := strconv.ParseInt(overloadedMemoryThresholdPercentage, 10, 64)
	if err != nil {
		panic(err)
	}
	p.OverloadedMemoryThresholdPercentage = float64(thresholdPercentage) / 100
}

func (p *queryCoordConfig) initBalanceIntervalSeconds() {
549
	balanceInterval := p.Base.LoadWithDefault("queryCoord.balanceIntervalSeconds", "60")
550 551 552 553 554 555 556 557
	interval, err := strconv.ParseInt(balanceInterval, 10, 64)
	if err != nil {
		panic(err)
	}
	p.BalanceIntervalSeconds = interval
}

func (p *queryCoordConfig) initMemoryUsageMaxDifferencePercentage() {
558
	maxDiff := p.Base.LoadWithDefault("queryCoord.memoryUsageMaxDifferencePercentage", "30")
559 560 561 562 563 564 565 566 567 568
	diffPercentage, err := strconv.ParseInt(maxDiff, 10, 64)
	if err != nil {
		panic(err)
	}
	p.MemoryUsageMaxDifferencePercentage = float64(diffPercentage) / 100
}

///////////////////////////////////////////////////////////////////////////////
// --- querynode ---
type queryNodeConfig struct {
569
	Base *BaseTable
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608

	Alias         string
	QueryNodeIP   string
	QueryNodePort int64
	QueryNodeID   UniqueID
	// TODO: remove cacheSize
	CacheSize int64 // deprecated

	FlowGraphMaxQueueLength int32
	FlowGraphMaxParallelism int32

	// search
	SearchChannelNames         []string
	SearchResultChannelNames   []string
	SearchReceiveBufSize       int64
	SearchPulsarBufSize        int64
	SearchResultReceiveBufSize int64

	// Retrieve
	RetrieveChannelNames         []string
	RetrieveResultChannelNames   []string
	RetrieveReceiveBufSize       int64
	RetrievePulsarBufSize        int64
	RetrieveResultReceiveBufSize int64

	// stats
	StatsPublishInterval int

	GracefulTime int64
	SliceIndex   int

	// segcore
	ChunkRows int64

	CreatedTime time.Time
	UpdatedTime time.Time

	// memory limit
	OverloadedMemoryThresholdPercentage float64
G
godchen 已提交
609 610

	// cache limit
G
godchen 已提交
611 612
	CacheEnabled     bool
	CacheMemoryLimit int64
613 614
}

615 616
func (p *queryNodeConfig) init(base *BaseTable) {
	p.Base = base
617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632

	p.initCacheSize()
	p.initGracefulTime()

	p.initFlowGraphMaxQueueLength()
	p.initFlowGraphMaxParallelism()

	p.initSearchReceiveBufSize()
	p.initSearchPulsarBufSize()
	p.initSearchResultReceiveBufSize()

	p.initStatsPublishInterval()

	p.initSegcoreChunkRows()

	p.initOverloadedMemoryThresholdPercentage()
G
godchen 已提交
633

G
godchen 已提交
634 635
	p.initCacheMemoryLimit()
	p.initCacheEnabled()
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651
}

// InitAlias initializes an alias for the QueryNode role.
func (p *queryNodeConfig) InitAlias(alias string) {
	p.Alias = alias
}

func (p *queryNodeConfig) initCacheSize() {
	defer log.Debug("init cacheSize", zap.Any("cacheSize (GB)", p.CacheSize))

	const defaultCacheSize = 32 // GB
	p.CacheSize = defaultCacheSize

	var err error
	cacheSize := os.Getenv("CACHE_SIZE")
	if cacheSize == "" {
652
		cacheSize, err = p.Base.Load("queryNode.cacheSize")
653 654 655 656 657 658 659 660 661 662 663 664 665 666
		if err != nil {
			return
		}
	}
	value, err := strconv.ParseInt(cacheSize, 10, 64)
	if err != nil {
		return
	}
	p.CacheSize = value
}

// advanced params
// stats
func (p *queryNodeConfig) initStatsPublishInterval() {
667
	p.StatsPublishInterval = p.Base.ParseIntWithDefault("queryNode.stats.publishInterval", 1000)
668 669 670 671
}

// dataSync:
func (p *queryNodeConfig) initFlowGraphMaxQueueLength() {
672
	p.FlowGraphMaxQueueLength = p.Base.ParseInt32WithDefault("queryNode.dataSync.flowGraph.maxQueueLength", 1024)
673 674 675
}

func (p *queryNodeConfig) initFlowGraphMaxParallelism() {
676
	p.FlowGraphMaxParallelism = p.Base.ParseInt32WithDefault("queryNode.dataSync.flowGraph.maxParallelism", 1024)
677 678 679 680
}

// msgStream
func (p *queryNodeConfig) initSearchReceiveBufSize() {
681
	p.SearchReceiveBufSize = p.Base.ParseInt64WithDefault("queryNode.msgStream.search.recvBufSize", 512)
682 683 684
}

func (p *queryNodeConfig) initSearchPulsarBufSize() {
685
	p.SearchPulsarBufSize = p.Base.ParseInt64WithDefault("queryNode.msgStream.search.pulsarBufSize", 512)
686 687 688
}

func (p *queryNodeConfig) initSearchResultReceiveBufSize() {
689
	p.SearchResultReceiveBufSize = p.Base.ParseInt64WithDefault("queryNode.msgStream.searchResult.recvBufSize", 64)
690 691 692
}

func (p *queryNodeConfig) initGracefulTime() {
693
	p.GracefulTime = p.Base.ParseInt64("queryNode.gracefulTime")
694 695 696 697
	log.Debug("query node init gracefulTime", zap.Any("gracefulTime", p.GracefulTime))
}

func (p *queryNodeConfig) initSegcoreChunkRows() {
698
	p.ChunkRows = p.Base.ParseInt64WithDefault("queryNode.segcore.chunkRows", 32768)
699 700 701
}

func (p *queryNodeConfig) initOverloadedMemoryThresholdPercentage() {
702
	overloadedMemoryThresholdPercentage := p.Base.LoadWithDefault("queryCoord.overloadedMemoryThresholdPercentage", "90")
703 704 705 706 707 708 709
	thresholdPercentage, err := strconv.ParseInt(overloadedMemoryThresholdPercentage, 10, 64)
	if err != nil {
		panic(err)
	}
	p.OverloadedMemoryThresholdPercentage = float64(thresholdPercentage) / 100
}

G
godchen 已提交
710 711 712 713 714 715 716 717 718 719 720 721
func (p *queryNodeConfig) initCacheMemoryLimit() {
	overloadedMemoryThresholdPercentage := p.Base.LoadWithDefault("queryNode.cache.memoryLimit", "2147483648")
	cacheMemoryLimit, err := strconv.ParseInt(overloadedMemoryThresholdPercentage, 10, 64)
	if err != nil {
		panic(err)
	}
	p.CacheMemoryLimit = cacheMemoryLimit
}
func (p *queryNodeConfig) initCacheEnabled() {
	var err error
	cacheEnabled := p.Base.LoadWithDefault("queryNode.cache.enabled", "true")
	p.CacheEnabled, err = strconv.ParseBool(cacheEnabled)
G
godchen 已提交
722 723 724 725 726
	if err != nil {
		panic(err)
	}
}

727 728 729
///////////////////////////////////////////////////////////////////////////////
// --- datacoord ---
type dataCoordConfig struct {
730
	Base *BaseTable
731 732 733 734 735 736 737 738

	NodeID int64

	IP      string
	Port    int
	Address string

	// --- ETCD ---
X
XuanYang-cn 已提交
739
	ChannelWatchSubPath string
740 741 742 743 744 745 746 747 748 749

	// --- SEGMENTS ---
	SegmentMaxSize          float64
	SegmentSealProportion   float64
	SegAssignmentExpiration int64

	CreatedTime time.Time
	UpdatedTime time.Time

	EnableCompaction        bool
750
	EnableAutoCompaction    bool
751 752
	EnableGarbageCollection bool

753 754 755
	RetentionDuration          int64
	CompactionEntityExpiration int64

756 757 758 759 760 761
	// Garbage Collection
	GCInterval         time.Duration
	GCMissingTolerance time.Duration
	GCDropTolerance    time.Duration
}

762 763
func (p *dataCoordConfig) init(base *BaseTable) {
	p.Base = base
764 765 766 767 768 769 770 771 772

	p.initChannelWatchPrefix()

	p.initSegmentMaxSize()
	p.initSegmentSealProportion()
	p.initSegAssignmentExpiration()

	p.initEnableCompaction()
	p.initEnableAutoCompaction()
773
	p.initCompactionEntityExpiration()
774 775 776 777 778 779 780 781

	p.initEnableGarbageCollection()
	p.initGCInterval()
	p.initGCMissingTolerance()
	p.initGCDropTolerance()
}

func (p *dataCoordConfig) initSegmentMaxSize() {
782
	p.SegmentMaxSize = p.Base.ParseFloatWithDefault("dataCoord.segment.maxSize", 512.0)
783 784 785
}

func (p *dataCoordConfig) initSegmentSealProportion() {
786
	p.SegmentSealProportion = p.Base.ParseFloatWithDefault("dataCoord.segment.sealProportion", 0.75)
787 788 789
}

func (p *dataCoordConfig) initSegAssignmentExpiration() {
790
	p.SegAssignmentExpiration = p.Base.ParseInt64WithDefault("dataCoord.segment.assignmentExpiration", 2000)
791 792 793 794 795 796 797 798 799
}

func (p *dataCoordConfig) initChannelWatchPrefix() {
	// WARN: this value should not be put to milvus.yaml. It's a default value for channel watch path.
	// This will be removed after we reconstruct our config module.
	p.ChannelWatchSubPath = "channelwatch"
}

func (p *dataCoordConfig) initEnableCompaction() {
800
	p.EnableCompaction = p.Base.ParseBool("dataCoord.enableCompaction", false)
801 802 803 804
}

// -- GC --
func (p *dataCoordConfig) initEnableGarbageCollection() {
805
	p.EnableGarbageCollection = p.Base.ParseBool("dataCoord.enableGarbageCollection", false)
806 807 808
}

func (p *dataCoordConfig) initGCInterval() {
809
	p.GCInterval = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.gc.interval", 60*60)) * time.Second
810 811 812
}

func (p *dataCoordConfig) initGCMissingTolerance() {
813
	p.GCMissingTolerance = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.gc.missingTolerance", 24*60*60)) * time.Second
814 815 816
}

func (p *dataCoordConfig) initGCDropTolerance() {
817
	p.GCDropTolerance = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.gc.dropTolerance", 24*60*60)) * time.Second
818 819 820
}

func (p *dataCoordConfig) initEnableAutoCompaction() {
821
	p.EnableAutoCompaction = p.Base.ParseBool("dataCoord.compaction.enableAutoCompaction", false)
822 823
}

824
func (p *dataCoordConfig) initCompactionEntityExpiration() {
825
	p.CompactionEntityExpiration = p.Base.ParseInt64WithDefault("dataCoord.compaction.entityExpiration", math.MaxInt64)
826 827 828 829 830 831 832 833
	p.CompactionEntityExpiration = func(x, y int64) int64 {
		if x > y {
			return x
		}
		return y
	}(p.CompactionEntityExpiration, p.RetentionDuration)
}

834 835 836
///////////////////////////////////////////////////////////////////////////////
// --- datanode ---
type dataNodeConfig struct {
837
	Base *BaseTable
838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861

	// ID of the current DataNode
	NodeID UniqueID

	// IP of the current DataNode
	IP string

	// Port of the current DataNode
	Port                    int
	FlowGraphMaxQueueLength int32
	FlowGraphMaxParallelism int32
	FlushInsertBufferSize   int64
	InsertBinlogRootPath    string
	StatsBinlogRootPath     string
	DeleteBinlogRootPath    string
	Alias                   string // Different datanode in one machine

	// etcd
	ChannelWatchSubPath string

	CreatedTime time.Time
	UpdatedTime time.Time
}

862 863
func (p *dataNodeConfig) init(base *BaseTable) {
	p.Base = base
864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880

	p.initFlowGraphMaxQueueLength()
	p.initFlowGraphMaxParallelism()
	p.initFlushInsertBufferSize()
	p.initInsertBinlogRootPath()
	p.initStatsBinlogRootPath()
	p.initDeleteBinlogRootPath()

	p.initChannelWatchPath()
}

// InitAlias init this DataNode alias
func (p *dataNodeConfig) InitAlias(alias string) {
	p.Alias = alias
}

func (p *dataNodeConfig) initFlowGraphMaxQueueLength() {
881
	p.FlowGraphMaxQueueLength = p.Base.ParseInt32WithDefault("dataNode.dataSync.flowGraph.maxQueueLength", 1024)
882 883 884
}

func (p *dataNodeConfig) initFlowGraphMaxParallelism() {
885
	p.FlowGraphMaxParallelism = p.Base.ParseInt32WithDefault("dataNode.dataSync.flowGraph.maxParallelism", 1024)
886 887 888
}

func (p *dataNodeConfig) initFlushInsertBufferSize() {
889
	p.FlushInsertBufferSize = p.Base.ParseInt64("_DATANODE_INSERTBUFSIZE")
890 891 892
}

func (p *dataNodeConfig) initInsertBinlogRootPath() {
893
	// GOOSE TODO: rootPath change to TenentID
894
	rootPath, err := p.Base.Load("minio.rootPath")
895 896 897 898 899 900 901
	if err != nil {
		panic(err)
	}
	p.InsertBinlogRootPath = path.Join(rootPath, "insert_log")
}

func (p *dataNodeConfig) initStatsBinlogRootPath() {
902
	rootPath, err := p.Base.Load("minio.rootPath")
903 904 905 906 907 908 909
	if err != nil {
		panic(err)
	}
	p.StatsBinlogRootPath = path.Join(rootPath, "stats_log")
}

func (p *dataNodeConfig) initDeleteBinlogRootPath() {
910
	rootPath, err := p.Base.Load("minio.rootPath")
911 912 913 914 915 916 917 918 919 920 921 922 923
	if err != nil {
		panic(err)
	}
	p.DeleteBinlogRootPath = path.Join(rootPath, "delta_log")
}

func (p *dataNodeConfig) initChannelWatchPath() {
	p.ChannelWatchSubPath = "channelwatch"
}

///////////////////////////////////////////////////////////////////////////////
// --- indexcoord ---
type indexCoordConfig struct {
924
	Base *BaseTable
925 926 927 928 929 930 931 932 933 934

	Address string
	Port    int

	IndexStorageRootPath string

	CreatedTime time.Time
	UpdatedTime time.Time
}

935 936
func (p *indexCoordConfig) init(base *BaseTable) {
	p.Base = base
937 938 939 940 941 942

	p.initIndexStorageRootPath()
}

// initIndexStorageRootPath initializes the root path of index files.
func (p *indexCoordConfig) initIndexStorageRootPath() {
943
	rootPath, err := p.Base.Load("minio.rootPath")
944 945 946 947 948 949 950 951 952
	if err != nil {
		panic(err)
	}
	p.IndexStorageRootPath = path.Join(rootPath, "index_files")
}

///////////////////////////////////////////////////////////////////////////////
// --- indexnode ---
type indexNodeConfig struct {
953
	Base *BaseTable
954 955 956 957 958 959 960 961 962 963 964 965 966 967

	IP      string
	Address string
	Port    int

	NodeID int64
	Alias  string

	IndexStorageRootPath string

	CreatedTime time.Time
	UpdatedTime time.Time
}

968 969
func (p *indexNodeConfig) init(base *BaseTable) {
	p.Base = base
970 971 972 973 974 975 976 977 978 979

	p.initIndexStorageRootPath()
}

// InitAlias initializes an alias for the IndexNode role.
func (p *indexNodeConfig) InitAlias(alias string) {
	p.Alias = alias
}

func (p *indexNodeConfig) initIndexStorageRootPath() {
980
	rootPath, err := p.Base.Load("minio.rootPath")
981 982 983 984 985
	if err != nil {
		panic(err)
	}
	p.IndexStorageRootPath = path.Join(rootPath, "index_files")
}