paramtable.go 11.2 KB
Newer Older
1
package proxynode
C
cai.zhang 已提交
2 3

import (
Z
zhenshan.cao 已提交
4
	"bytes"
5 6
	"fmt"
	"path"
Z
zhenshan.cao 已提交
7 8
	"strconv"
	"strings"
9
	"sync"
Z
zhenshan.cao 已提交
10 11
	"time"

Z
zhenshan.cao 已提交
12 13
	"github.com/spf13/cast"
	"github.com/spf13/viper"
14
	"go.uber.org/zap"
Z
zhenshan.cao 已提交
15

16
	"github.com/zilliztech/milvus-distributed/internal/log"
G
godchen 已提交
17
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
C
cai.zhang 已提交
18 19 20
	"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)

Z
zhenshan.cao 已提交
21
const (
22 23 24
	StartParamsKey                 = "START_PARAMS"
	PulsarMaxMessageSizeKey        = "maxMessageSize"
	SuggestPulsarMaxMessageSizeKey = 5 * 1024 * 1024
Z
zhenshan.cao 已提交
25 26
)

C
cai.zhang 已提交
27 28
type ParamTable struct {
	paramtable.BaseTable
29

Z
zhenshan.cao 已提交
30 31 32 33 34 35 36
	NetworkPort    int
	IP             string
	NetworkAddress string

	MasterAddress string
	PulsarAddress string

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
	QueryNodeNum                       int
	QueryNodeIDList                    []UniqueID
	ProxyID                            UniqueID
	TimeTickInterval                   time.Duration
	InsertChannelNames                 []string
	DeleteChannelNames                 []string
	K2SChannelNames                    []string
	SearchChannelNames                 []string
	SearchResultChannelNames           []string
	ProxySubName                       string
	ProxyTimeTickChannelNames          []string
	DataDefinitionChannelNames         []string
	MsgStreamInsertBufSize             int64
	MsgStreamSearchBufSize             int64
	MsgStreamSearchResultBufSize       int64
	MsgStreamSearchResultPulsarBufSize int64
	MsgStreamTimeTickBufSize           int64
	MaxNameLength                      int64
	MaxFieldNum                        int64
	MaxDimension                       int64
57
	DefaultPartitionName               string
58
	DefaultIndexName                   string
59

60 61
	PulsarMaxMessageSize int
	Log                  log.Config
S
sunby 已提交
62
	RoleName             string
C
cai.zhang 已提交
63 64 65
}

var Params ParamTable
66
var once sync.Once
C
cai.zhang 已提交
67

G
godchen 已提交
68
func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb.InitParams) error {
Z
zhenshan.cao 已提交
69 70 71 72
	pt.ProxyID = initParams.NodeID

	config := viper.New()
	config.SetConfigType("yaml")
73 74 75 76
	save := func() error {
		for _, key := range config.AllKeys() {
			val := config.Get(key)
			str, err := cast.ToStringE(val)
Z
zhenshan.cao 已提交
77
			if err != nil {
78 79 80 81 82 83
				switch val := val.(type) {
				case []interface{}:
					str = str[:0]
					for _, v := range val {
						ss, err := cast.ToStringE(v)
						if err != nil {
84
							log.Warn("proxynode", zap.String("error", err.Error()))
85 86 87 88 89 90 91 92 93
						}
						if len(str) == 0 {
							str = ss
						} else {
							str = str + "," + ss
						}
					}

				default:
94
					log.Debug("proxynode", zap.String("error", "Undefined config type, key="+key))
95 96 97 98 99
				}
			}
			err = pt.Save(key, str)
			if err != nil {
				panic(err)
Z
zhenshan.cao 已提交
100 101
			}
		}
102
		return nil
Z
zhenshan.cao 已提交
103 104
	}

105 106 107 108 109 110 111 112 113
	for _, pair := range initParams.StartParams {
		if strings.HasPrefix(pair.Key, StartParamsKey) {
			err := config.ReadConfig(bytes.NewBuffer([]byte(pair.Value)))
			if err != nil {
				return err
			}
			err = save()
			if err != nil {
				return err
Z
zhenshan.cao 已提交
114 115 116 117 118 119 120 121 122
			}
		}
	}

	pt.initParams()

	return nil
}

Z
zhenshan.cao 已提交
123
func (pt *ParamTable) Init() {
124 125
	once.Do(func() {
		pt.BaseTable.Init()
126
		pt.initLogCfg()
127 128 129 130 131 132
		// err := pt.LoadYaml("advanced/proxy_node.yaml")
		// if err != nil {
		// 	panic(err)
		// }
		// pt.initParams()
	})
Z
zhenshan.cao 已提交
133
}
X
XuanYang-cn 已提交
134

Z
zhenshan.cao 已提交
135
func (pt *ParamTable) initParams() {
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
	pt.initPulsarAddress()
	pt.initQueryNodeIDList()
	pt.initQueryNodeNum()
	pt.initTimeTickInterval()
	pt.initInsertChannelNames()
	pt.initDeleteChannelNames()
	pt.initK2SChannelNames()
	pt.initSearchChannelNames()
	pt.initSearchResultChannelNames()
	pt.initProxySubName()
	pt.initProxyTimeTickChannelNames()
	pt.initDataDefinitionChannelNames()
	pt.initMsgStreamInsertBufSize()
	pt.initMsgStreamSearchBufSize()
	pt.initMsgStreamSearchResultBufSize()
	pt.initMsgStreamSearchResultPulsarBufSize()
	pt.initMsgStreamTimeTickBufSize()
	pt.initMaxNameLength()
	pt.initMaxFieldNum()
	pt.initMaxDimension()
156
	pt.initDefaultPartitionName()
157 158
	pt.initDefaultIndexName()

159
	pt.initPulsarMaxMessageSize()
S
sunby 已提交
160
	pt.initRoleName()
Z
zhenshan.cao 已提交
161 162
}

163
func (pt *ParamTable) initPulsarAddress() {
Z
zhenshan.cao 已提交
164 165 166 167
	ret, err := pt.Load("_PulsarAddress")
	if err != nil {
		panic(err)
	}
168
	pt.PulsarAddress = ret
Z
zhenshan.cao 已提交
169 170
}

171 172
func (pt *ParamTable) initQueryNodeNum() {
	pt.QueryNodeNum = len(pt.QueryNodeIDList)
N
neza2017 已提交
173 174
}

175
func (pt *ParamTable) initQueryNodeIDList() []UniqueID {
N
neza2017 已提交
176 177 178 179 180 181 182 183 184
	queryNodeIDStr, err := pt.Load("nodeID.queryNodeIDList")
	if err != nil {
		panic(err)
	}
	var ret []UniqueID
	queryNodeIDs := strings.Split(queryNodeIDStr, ",")
	for _, i := range queryNodeIDs {
		v, err := strconv.Atoi(i)
		if err != nil {
185
			log.Error("proxynode", zap.String("load proxynode id list error", err.Error()))
N
neza2017 已提交
186 187 188
		}
		ret = append(ret, UniqueID(v))
	}
189
	pt.QueryNodeIDList = ret
N
neza2017 已提交
190 191 192
	return ret
}

193
func (pt *ParamTable) initTimeTickInterval() {
194
	intervalStr, err := pt.Load("proxyNode.timeTickInterval")
Z
zhenshan.cao 已提交
195 196 197
	if err != nil {
		panic(err)
	}
198
	interval, err := strconv.Atoi(intervalStr)
Z
zhenshan.cao 已提交
199 200 201
	if err != nil {
		panic(err)
	}
202
	pt.TimeTickInterval = time.Duration(interval) * time.Millisecond
C
cai.zhang 已提交
203
}
C
cai.zhang 已提交
204

205
func (pt *ParamTable) initInsertChannelNames() {
Z
zhenshan.cao 已提交
206 207 208 209 210 211 212 213 214
	prefix, err := pt.Load("msgChannel.chanNamePrefix.insert")
	if err != nil {
		panic(err)
	}
	prefix += "-"
	iRangeStr, err := pt.Load("msgChannel.channelRange.insert")
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
215
	channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
Z
zhenshan.cao 已提交
216 217 218 219 220
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
	}

221
	pt.InsertChannelNames = ret
Z
zhenshan.cao 已提交
222 223
}

224
func (pt *ParamTable) initDeleteChannelNames() {
Z
zhenshan.cao 已提交
225
	prefix, err := pt.Load("msgChannel.chanNamePrefix.delete")
C
cai.zhang 已提交
226 227 228
	if err != nil {
		panic(err)
	}
Z
zhenshan.cao 已提交
229 230 231 232 233
	prefix += "-"
	dRangeStr, err := pt.Load("msgChannel.channelRange.delete")
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
234
	channelIDs := paramtable.ConvertRangeToIntSlice(dRangeStr, ",")
Z
zhenshan.cao 已提交
235 236 237 238
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
	}
239
	pt.DeleteChannelNames = ret
Z
zhenshan.cao 已提交
240 241
}

242
func (pt *ParamTable) initK2SChannelNames() {
Z
zhenshan.cao 已提交
243 244 245 246 247 248 249 250 251
	prefix, err := pt.Load("msgChannel.chanNamePrefix.k2s")
	if err != nil {
		panic(err)
	}
	prefix += "-"
	k2sRangeStr, err := pt.Load("msgChannel.channelRange.k2s")
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
252
	channelIDs := paramtable.ConvertRangeToIntSlice(k2sRangeStr, ",")
Z
zhenshan.cao 已提交
253 254 255 256
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
	}
257
	pt.K2SChannelNames = ret
Z
zhenshan.cao 已提交
258 259
}

260
func (pt *ParamTable) initSearchChannelNames() {
Z
zhenshan.cao 已提交
261 262 263 264
	prefix, err := pt.Load("msgChannel.chanNamePrefix.search")
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
265 266 267 268 269 270 271 272 273 274
	prefix += "-"
	sRangeStr, err := pt.Load("msgChannel.channelRange.search")
	if err != nil {
		panic(err)
	}
	channelIDs := paramtable.ConvertRangeToIntSlice(sRangeStr, ",")
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
	}
275
	pt.SearchChannelNames = ret
Z
zhenshan.cao 已提交
276 277
}

278
func (pt *ParamTable) initSearchResultChannelNames() {
Z
zhenshan.cao 已提交
279 280 281 282 283 284 285 286 287
	prefix, err := pt.Load("msgChannel.chanNamePrefix.searchResult")
	if err != nil {
		panic(err)
	}
	prefix += "-"
	sRangeStr, err := pt.Load("msgChannel.channelRange.searchResult")
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
288
	channelIDs := paramtable.ConvertRangeToIntSlice(sRangeStr, ",")
Z
zhenshan.cao 已提交
289 290 291 292
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
	}
293
	pt.SearchResultChannelNames = ret
Z
zhenshan.cao 已提交
294 295
}

296
func (pt *ParamTable) initProxySubName() {
Z
zhenshan.cao 已提交
297 298 299 300
	prefix, err := pt.Load("msgChannel.subNamePrefix.proxySubNamePrefix")
	if err != nil {
		panic(err)
	}
301
	pt.ProxySubName = prefix + "-" + strconv.Itoa(int(pt.ProxyID))
Z
zhenshan.cao 已提交
302 303
}

304
func (pt *ParamTable) initProxyTimeTickChannelNames() {
Z
zhenshan.cao 已提交
305 306 307 308 309
	prefix, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick")
	if err != nil {
		panic(err)
	}
	prefix += "-0"
310
	pt.ProxyTimeTickChannelNames = []string{prefix}
Z
zhenshan.cao 已提交
311 312
}

313
func (pt *ParamTable) initDataDefinitionChannelNames() {
Z
zhenshan.cao 已提交
314 315 316 317 318
	prefix, err := pt.Load("msgChannel.chanNamePrefix.dataDefinition")
	if err != nil {
		panic(err)
	}
	prefix += "-0"
319
	pt.DataDefinitionChannelNames = []string{prefix}
Z
zhenshan.cao 已提交
320 321
}

322 323
func (pt *ParamTable) initMsgStreamInsertBufSize() {
	pt.MsgStreamInsertBufSize = pt.ParseInt64("proxyNode.msgStream.insert.bufSize")
Z
zhenshan.cao 已提交
324 325
}

326 327
func (pt *ParamTable) initMsgStreamSearchBufSize() {
	pt.MsgStreamSearchBufSize = pt.ParseInt64("proxyNode.msgStream.search.bufSize")
Z
zhenshan.cao 已提交
328 329
}

330 331
func (pt *ParamTable) initMsgStreamSearchResultBufSize() {
	pt.MsgStreamSearchResultBufSize = pt.ParseInt64("proxyNode.msgStream.searchResult.recvBufSize")
Z
zhenshan.cao 已提交
332 333
}

334 335
func (pt *ParamTable) initMsgStreamSearchResultPulsarBufSize() {
	pt.MsgStreamSearchResultPulsarBufSize = pt.ParseInt64("proxyNode.msgStream.searchResult.pulsarBufSize")
Z
zhenshan.cao 已提交
336 337
}

338 339
func (pt *ParamTable) initMsgStreamTimeTickBufSize() {
	pt.MsgStreamTimeTickBufSize = pt.ParseInt64("proxyNode.msgStream.timeTick.bufSize")
N
neza2017 已提交
340 341
}

342
func (pt *ParamTable) initMaxNameLength() {
343
	str, err := pt.Load("proxyNode.maxNameLength")
N
neza2017 已提交
344 345 346 347 348 349 350
	if err != nil {
		panic(err)
	}
	maxNameLength, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		panic(err)
	}
351
	pt.MaxNameLength = maxNameLength
N
neza2017 已提交
352 353
}

354
func (pt *ParamTable) initMaxFieldNum() {
355
	str, err := pt.Load("proxyNode.maxFieldNum")
N
neza2017 已提交
356 357 358 359 360 361 362
	if err != nil {
		panic(err)
	}
	maxFieldNum, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		panic(err)
	}
363
	pt.MaxFieldNum = maxFieldNum
N
neza2017 已提交
364 365
}

366
func (pt *ParamTable) initMaxDimension() {
367
	str, err := pt.Load("proxyNode.maxDimension")
N
neza2017 已提交
368 369 370 371 372 373 374
	if err != nil {
		panic(err)
	}
	maxDimension, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		panic(err)
	}
375
	pt.MaxDimension = maxDimension
N
neza2017 已提交
376 377
}

378 379
func (pt *ParamTable) initDefaultPartitionName() {
	name, err := pt.Load("common.defaultPartitionName")
N
neza2017 已提交
380 381 382
	if err != nil {
		panic(err)
	}
383
	pt.DefaultPartitionName = name
N
neza2017 已提交
384
}
385 386 387 388 389 390 391 392

func (pt *ParamTable) initDefaultIndexName() {
	name, err := pt.Load("common.defaultIndexName")
	if err != nil {
		panic(err)
	}
	pt.DefaultIndexName = name
}
393

394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
func (pt *ParamTable) initPulsarMaxMessageSize() {
	// pulsarHost, err := pt.Load("pulsar.address")
	// if err != nil {
	// 	panic(err)
	// }

	// pulsarRestPort, err := pt.Load("pulsar.rest-port")
	// if err != nil {
	// 	panic(err)
	// }

	// protocol := "http"
	// url := "/admin/v2/brokers/configuration/runtime"
	// runtimeConfig, err := GetPulsarConfig(protocol, pulsarHost, pulsarRestPort, url)
	// if err != nil {
	// 	panic(err)
	// }
	// maxMessageSizeStr := fmt.Sprintf("%v", runtimeConfig[PulsarMaxMessageSizeKey])
	// pt.PulsarMaxMessageSize, err = strconv.Atoi(maxMessageSizeStr)
	// if err != nil {
	// 	panic(err)
	// }

	maxMessageSizeStr, err := pt.Load("pulsar.maxMessageSize")
	if err != nil {
		pt.PulsarMaxMessageSize = SuggestPulsarMaxMessageSizeKey
	} else {
		maxMessageSize, err := strconv.Atoi(maxMessageSizeStr)
		if err != nil {
			pt.PulsarMaxMessageSize = SuggestPulsarMaxMessageSizeKey
		} else {
			pt.PulsarMaxMessageSize = maxMessageSize
		}
	}
}

430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457
func (pt *ParamTable) initLogCfg() {
	pt.Log = log.Config{}
	format, err := pt.Load("log.format")
	if err != nil {
		panic(err)
	}
	pt.Log.Format = format
	level, err := pt.Load("log.level")
	if err != nil {
		panic(err)
	}
	pt.Log.Level = level
	devStr, err := pt.Load("log.dev")
	if err != nil {
		panic(err)
	}
	dev, err := strconv.ParseBool(devStr)
	if err != nil {
		panic(err)
	}
	pt.Log.Development = dev
	pt.Log.File.MaxSize = pt.ParseInt("log.file.maxSize")
	pt.Log.File.MaxBackups = pt.ParseInt("log.file.maxBackups")
	pt.Log.File.MaxDays = pt.ParseInt("log.file.maxAge")
	rootPath, err := pt.Load("log.file.rootPath")
	if err != nil {
		panic(err)
	}
458 459 460 461 462
	if len(rootPath) != 0 {
		pt.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("proxynode-%d.log", pt.ProxyID))
	} else {
		pt.Log.File.Filename = ""
	}
463
}
S
sunby 已提交
464 465 466 467

func (pt *ParamTable) initRoleName() {
	pt.RoleName = fmt.Sprintf("%s-%d", "ProxyNode", pt.ProxyID)
}