paramtable.go 9.3 KB
Newer Older
1
package proxynode
C
cai.zhang 已提交
2 3

import (
Z
zhenshan.cao 已提交
4
	"bytes"
Z
zhenshan.cao 已提交
5 6 7
	"log"
	"strconv"
	"strings"
8
	"sync"
Z
zhenshan.cao 已提交
9 10
	"time"

Z
zhenshan.cao 已提交
11
	"github.com/spf13/cast"
12

Z
zhenshan.cao 已提交
13 14 15
	"github.com/spf13/viper"
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"

C
cai.zhang 已提交
16 17 18
	"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)

Z
zhenshan.cao 已提交
19 20 21 22
const (
	StartParamsKey = "START_PARAMS"
)

C
cai.zhang 已提交
23 24
type ParamTable struct {
	paramtable.BaseTable
25

Z
zhenshan.cao 已提交
26 27 28 29 30 31 32
	NetworkPort    int
	IP             string
	NetworkAddress string

	MasterAddress string
	PulsarAddress string

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
	QueryNodeNum                       int
	QueryNodeIDList                    []UniqueID
	ProxyID                            UniqueID
	TimeTickInterval                   time.Duration
	InsertChannelNames                 []string
	DeleteChannelNames                 []string
	K2SChannelNames                    []string
	SearchChannelNames                 []string
	SearchResultChannelNames           []string
	ProxySubName                       string
	ProxyTimeTickChannelNames          []string
	DataDefinitionChannelNames         []string
	MsgStreamInsertBufSize             int64
	MsgStreamSearchBufSize             int64
	MsgStreamSearchResultBufSize       int64
	MsgStreamSearchResultPulsarBufSize int64
	MsgStreamTimeTickBufSize           int64
	MaxNameLength                      int64
	MaxFieldNum                        int64
	MaxDimension                       int64
	DefaultPartitionTag                string
54
	DefaultIndexName                   string
C
cai.zhang 已提交
55 56 57
}

var Params ParamTable
58
var once sync.Once
C
cai.zhang 已提交
59

Z
zhenshan.cao 已提交
60 61 62 63 64
func (pt *ParamTable) LoadConfigFromInitParams(initParams *internalpb2.InitParams) error {
	pt.ProxyID = initParams.NodeID

	config := viper.New()
	config.SetConfigType("yaml")
65 66 67 68
	save := func() error {
		for _, key := range config.AllKeys() {
			val := config.Get(key)
			str, err := cast.ToStringE(val)
Z
zhenshan.cao 已提交
69
			if err != nil {
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
				switch val := val.(type) {
				case []interface{}:
					str = str[:0]
					for _, v := range val {
						ss, err := cast.ToStringE(v)
						if err != nil {
							log.Panic(err)
						}
						if len(str) == 0 {
							str = ss
						} else {
							str = str + "," + ss
						}
					}

				default:
					log.Panicf("undefine config type, key=%s", key)
				}
			}
			log.Println("key: ", key, ", value: ", str)
			err = pt.Save(key, str)
			if err != nil {
				panic(err)
Z
zhenshan.cao 已提交
93 94
			}
		}
95
		return nil
Z
zhenshan.cao 已提交
96 97
	}

98 99 100 101 102 103 104 105 106
	for _, pair := range initParams.StartParams {
		if strings.HasPrefix(pair.Key, StartParamsKey) {
			err := config.ReadConfig(bytes.NewBuffer([]byte(pair.Value)))
			if err != nil {
				return err
			}
			err = save()
			if err != nil {
				return err
Z
zhenshan.cao 已提交
107 108 109 110 111 112 113 114 115
			}
		}
	}

	pt.initParams()

	return nil
}

Z
zhenshan.cao 已提交
116
func (pt *ParamTable) Init() {
117 118 119 120 121 122 123 124
	once.Do(func() {
		pt.BaseTable.Init()
		// err := pt.LoadYaml("advanced/proxy_node.yaml")
		// if err != nil {
		// 	panic(err)
		// }
		// pt.initParams()
	})
Z
zhenshan.cao 已提交
125
}
X
XuanYang-cn 已提交
126

Z
zhenshan.cao 已提交
127
func (pt *ParamTable) initParams() {
128 129 130
	pt.initPulsarAddress()
	pt.initQueryNodeIDList()
	pt.initQueryNodeNum()
131
	// pt.initProxyID()
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
	pt.initTimeTickInterval()
	pt.initInsertChannelNames()
	pt.initDeleteChannelNames()
	pt.initK2SChannelNames()
	pt.initSearchChannelNames()
	pt.initSearchResultChannelNames()
	pt.initProxySubName()
	pt.initProxyTimeTickChannelNames()
	pt.initDataDefinitionChannelNames()
	pt.initMsgStreamInsertBufSize()
	pt.initMsgStreamSearchBufSize()
	pt.initMsgStreamSearchResultBufSize()
	pt.initMsgStreamSearchResultPulsarBufSize()
	pt.initMsgStreamTimeTickBufSize()
	pt.initMaxNameLength()
	pt.initMaxFieldNum()
	pt.initMaxDimension()
	pt.initDefaultPartitionTag()
150 151
	pt.initDefaultIndexName()

Z
zhenshan.cao 已提交
152 153
}

154
func (pt *ParamTable) initPulsarAddress() {
Z
zhenshan.cao 已提交
155 156 157 158
	ret, err := pt.Load("_PulsarAddress")
	if err != nil {
		panic(err)
	}
159
	pt.PulsarAddress = ret
Z
zhenshan.cao 已提交
160 161
}

162 163
func (pt *ParamTable) initQueryNodeNum() {
	pt.QueryNodeNum = len(pt.QueryNodeIDList)
N
neza2017 已提交
164 165
}

166
func (pt *ParamTable) initQueryNodeIDList() []UniqueID {
N
neza2017 已提交
167 168 169 170 171 172 173 174 175
	queryNodeIDStr, err := pt.Load("nodeID.queryNodeIDList")
	if err != nil {
		panic(err)
	}
	var ret []UniqueID
	queryNodeIDs := strings.Split(queryNodeIDStr, ",")
	for _, i := range queryNodeIDs {
		v, err := strconv.Atoi(i)
		if err != nil {
176
			log.Panicf("load proxynode id list error, %s", err.Error())
N
neza2017 已提交
177 178 179 180 181 182
		}
		ret = append(ret, UniqueID(v))
	}
	return ret
}

183
func (pt *ParamTable) initProxyID() {
Z
zhenshan.cao 已提交
184 185 186 187 188 189 190 191
	proxyID, err := pt.Load("_proxyID")
	if err != nil {
		panic(err)
	}
	ID, err := strconv.Atoi(proxyID)
	if err != nil {
		panic(err)
	}
192
	pt.ProxyID = UniqueID(ID)
Z
zhenshan.cao 已提交
193 194
}

195
func (pt *ParamTable) initTimeTickInterval() {
196
	intervalStr, err := pt.Load("proxyNode.timeTickInterval")
Z
zhenshan.cao 已提交
197 198 199
	if err != nil {
		panic(err)
	}
200
	interval, err := strconv.Atoi(intervalStr)
Z
zhenshan.cao 已提交
201 202 203
	if err != nil {
		panic(err)
	}
204
	pt.TimeTickInterval = time.Duration(interval) * time.Millisecond
C
cai.zhang 已提交
205
}
C
cai.zhang 已提交
206

207
func (pt *ParamTable) initInsertChannelNames() {
Z
zhenshan.cao 已提交
208 209 210 211 212 213 214 215 216
	prefix, err := pt.Load("msgChannel.chanNamePrefix.insert")
	if err != nil {
		panic(err)
	}
	prefix += "-"
	iRangeStr, err := pt.Load("msgChannel.channelRange.insert")
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
217
	channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
Z
zhenshan.cao 已提交
218 219 220 221 222
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
	}

223
	pt.InsertChannelNames = ret
Z
zhenshan.cao 已提交
224 225
}

226
func (pt *ParamTable) initDeleteChannelNames() {
Z
zhenshan.cao 已提交
227
	prefix, err := pt.Load("msgChannel.chanNamePrefix.delete")
C
cai.zhang 已提交
228 229 230
	if err != nil {
		panic(err)
	}
Z
zhenshan.cao 已提交
231 232 233 234 235
	prefix += "-"
	dRangeStr, err := pt.Load("msgChannel.channelRange.delete")
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
236
	channelIDs := paramtable.ConvertRangeToIntSlice(dRangeStr, ",")
Z
zhenshan.cao 已提交
237 238 239 240
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
	}
241
	pt.DeleteChannelNames = ret
Z
zhenshan.cao 已提交
242 243
}

244
func (pt *ParamTable) initK2SChannelNames() {
Z
zhenshan.cao 已提交
245 246 247 248 249 250 251 252 253
	prefix, err := pt.Load("msgChannel.chanNamePrefix.k2s")
	if err != nil {
		panic(err)
	}
	prefix += "-"
	k2sRangeStr, err := pt.Load("msgChannel.channelRange.k2s")
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
254
	channelIDs := paramtable.ConvertRangeToIntSlice(k2sRangeStr, ",")
Z
zhenshan.cao 已提交
255 256 257 258
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
	}
259
	pt.K2SChannelNames = ret
Z
zhenshan.cao 已提交
260 261
}

262
func (pt *ParamTable) initSearchChannelNames() {
Z
zhenshan.cao 已提交
263 264 265 266
	prefix, err := pt.Load("msgChannel.chanNamePrefix.search")
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
267 268 269 270 271 272 273 274 275 276
	prefix += "-"
	sRangeStr, err := pt.Load("msgChannel.channelRange.search")
	if err != nil {
		panic(err)
	}
	channelIDs := paramtable.ConvertRangeToIntSlice(sRangeStr, ",")
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
	}
277
	pt.SearchChannelNames = ret
Z
zhenshan.cao 已提交
278 279
}

280
func (pt *ParamTable) initSearchResultChannelNames() {
Z
zhenshan.cao 已提交
281 282 283 284 285 286 287 288 289
	prefix, err := pt.Load("msgChannel.chanNamePrefix.searchResult")
	if err != nil {
		panic(err)
	}
	prefix += "-"
	sRangeStr, err := pt.Load("msgChannel.channelRange.searchResult")
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
290
	channelIDs := paramtable.ConvertRangeToIntSlice(sRangeStr, ",")
Z
zhenshan.cao 已提交
291 292 293 294
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
	}
295
	pt.SearchResultChannelNames = ret
Z
zhenshan.cao 已提交
296 297
}

298
func (pt *ParamTable) initProxySubName() {
Z
zhenshan.cao 已提交
299 300 301 302
	prefix, err := pt.Load("msgChannel.subNamePrefix.proxySubNamePrefix")
	if err != nil {
		panic(err)
	}
303 304 305 306 307 308
	pt.ProxySubName = prefix
	// proxyIDStr, err := pt.Load("_proxyID")
	// if err != nil {
	// 	panic(err)
	// }
	pt.ProxySubName = prefix + "-" + strconv.Itoa(int(pt.ProxyID))
Z
zhenshan.cao 已提交
309 310
}

311
func (pt *ParamTable) initProxyTimeTickChannelNames() {
Z
zhenshan.cao 已提交
312 313 314 315 316
	prefix, err := pt.Load("msgChannel.chanNamePrefix.proxyTimeTick")
	if err != nil {
		panic(err)
	}
	prefix += "-0"
317
	pt.ProxyTimeTickChannelNames = []string{prefix}
Z
zhenshan.cao 已提交
318 319
}

320
func (pt *ParamTable) initDataDefinitionChannelNames() {
Z
zhenshan.cao 已提交
321 322 323 324 325
	prefix, err := pt.Load("msgChannel.chanNamePrefix.dataDefinition")
	if err != nil {
		panic(err)
	}
	prefix += "-0"
326
	pt.DataDefinitionChannelNames = []string{prefix}
Z
zhenshan.cao 已提交
327 328
}

329 330
func (pt *ParamTable) initMsgStreamInsertBufSize() {
	pt.MsgStreamInsertBufSize = pt.ParseInt64("proxyNode.msgStream.insert.bufSize")
Z
zhenshan.cao 已提交
331 332
}

333 334
func (pt *ParamTable) initMsgStreamSearchBufSize() {
	pt.MsgStreamSearchBufSize = pt.ParseInt64("proxyNode.msgStream.search.bufSize")
Z
zhenshan.cao 已提交
335 336
}

337 338
func (pt *ParamTable) initMsgStreamSearchResultBufSize() {
	pt.MsgStreamSearchResultBufSize = pt.ParseInt64("proxyNode.msgStream.searchResult.recvBufSize")
Z
zhenshan.cao 已提交
339 340
}

341 342
func (pt *ParamTable) initMsgStreamSearchResultPulsarBufSize() {
	pt.MsgStreamSearchResultPulsarBufSize = pt.ParseInt64("proxyNode.msgStream.searchResult.pulsarBufSize")
Z
zhenshan.cao 已提交
343 344
}

345 346
func (pt *ParamTable) initMsgStreamTimeTickBufSize() {
	pt.MsgStreamTimeTickBufSize = pt.ParseInt64("proxyNode.msgStream.timeTick.bufSize")
N
neza2017 已提交
347 348
}

349
func (pt *ParamTable) initMaxNameLength() {
350
	str, err := pt.Load("proxyNode.maxNameLength")
N
neza2017 已提交
351 352 353 354 355 356 357
	if err != nil {
		panic(err)
	}
	maxNameLength, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		panic(err)
	}
358
	pt.MaxNameLength = maxNameLength
N
neza2017 已提交
359 360
}

361
func (pt *ParamTable) initMaxFieldNum() {
362
	str, err := pt.Load("proxyNode.maxFieldNum")
N
neza2017 已提交
363 364 365 366 367 368 369
	if err != nil {
		panic(err)
	}
	maxFieldNum, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		panic(err)
	}
370
	pt.MaxFieldNum = maxFieldNum
N
neza2017 已提交
371 372
}

373
func (pt *ParamTable) initMaxDimension() {
374
	str, err := pt.Load("proxyNode.maxDimension")
N
neza2017 已提交
375 376 377 378 379 380 381
	if err != nil {
		panic(err)
	}
	maxDimension, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		panic(err)
	}
382
	pt.MaxDimension = maxDimension
N
neza2017 已提交
383 384
}

385
func (pt *ParamTable) initDefaultPartitionTag() {
N
neza2017 已提交
386 387 388 389
	tag, err := pt.Load("common.defaultPartitionTag")
	if err != nil {
		panic(err)
	}
390
	pt.DefaultPartitionTag = tag
N
neza2017 已提交
391
}
392 393 394 395 396 397 398 399

func (pt *ParamTable) initDefaultIndexName() {
	name, err := pt.Load("common.defaultIndexName")
	if err != nil {
		panic(err)
	}
	pt.DefaultIndexName = name
}