param_table.go 7.3 KB
Newer Older
N
neza2017 已提交
1 2 3 4 5
package querynode

import (
	"log"
	"strconv"
N
neza2017 已提交
6
	"strings"
N
neza2017 已提交
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23

	"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)

type ParamTable struct {
	paramtable.BaseTable
}

var Params ParamTable

func (p *ParamTable) Init() {
	p.BaseTable.Init()
	err := p.LoadYaml("advanced/query_node.yaml")
	if err != nil {
		panic(err)
	}

N
neza2017 已提交
24 25 26 27 28 29 30 31
	err = p.LoadYaml("milvus.yaml")
	if err != nil {
		panic(err)
	}

	err = p.LoadYaml("advanced/channel.yaml")
	if err != nil {
		panic(err)
N
neza2017 已提交
32 33 34 35 36 37 38 39 40 41 42
	}
}

func (p *ParamTable) pulsarAddress() (string, error) {
	url, err := p.Load("_PulsarAddress")
	if err != nil {
		panic(err)
	}
	return url, nil
}

N
neza2017 已提交
43 44
func (p *ParamTable) queryNodeID() int {
	queryNodeID, err := p.Load("reader.clientid")
N
neza2017 已提交
45 46 47
	if err != nil {
		panic(err)
	}
Z
zhenshan.cao 已提交
48
	id, err := strconv.Atoi(queryNodeID)
N
neza2017 已提交
49 50 51
	if err != nil {
		panic(err)
	}
N
neza2017 已提交
52
	return id
Z
zhenshan.cao 已提交
53 54 55 56
}

func (p *ParamTable) insertChannelRange() []int {
	insertChannelRange, err := p.Load("msgChannel.channelRange.insert")
N
neza2017 已提交
57 58 59
	if err != nil {
		panic(err)
	}
N
neza2017 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79

	channelRange := strings.Split(insertChannelRange, ",")
	if len(channelRange) != 2 {
		panic("Illegal channel range num")
	}
	channelBegin, err := strconv.Atoi(channelRange[0])
	if err != nil {
		panic(err)
	}
	channelEnd, err := strconv.Atoi(channelRange[1])
	if err != nil {
		panic(err)
	}
	if channelBegin < 0 || channelEnd < 0 {
		panic("Illegal channel range value")
	}
	if channelBegin > channelEnd {
		panic("Illegal channel range value")
	}
	return []int{channelBegin, channelEnd}
N
neza2017 已提交
80 81 82 83 84
}

// advanced params
// stats
func (p *ParamTable) statsPublishInterval() int {
N
neza2017 已提交
85 86 87 88 89 90 91 92 93
	timeInterval, err := p.Load("queryNode.stats.publishInterval")
	if err != nil {
		panic(err)
	}
	interval, err := strconv.Atoi(timeInterval)
	if err != nil {
		panic(err)
	}
	return interval
N
neza2017 已提交
94 95 96 97
}

// dataSync:
func (p *ParamTable) flowGraphMaxQueueLength() int32 {
N
neza2017 已提交
98 99 100 101 102 103 104 105 106
	queueLength, err := p.Load("queryNode.dataSync.flowGraph.maxQueueLength")
	if err != nil {
		panic(err)
	}
	length, err := strconv.Atoi(queueLength)
	if err != nil {
		panic(err)
	}
	return int32(length)
N
neza2017 已提交
107 108 109
}

func (p *ParamTable) flowGraphMaxParallelism() int32 {
N
neza2017 已提交
110 111 112 113 114 115 116 117 118
	maxParallelism, err := p.Load("queryNode.dataSync.flowGraph.maxParallelism")
	if err != nil {
		panic(err)
	}
	maxPara, err := strconv.Atoi(maxParallelism)
	if err != nil {
		panic(err)
	}
	return int32(maxPara)
N
neza2017 已提交
119 120 121 122
}

// msgStream
func (p *ParamTable) insertReceiveBufSize() int64 {
N
neza2017 已提交
123 124 125 126 127 128 129 130 131
	revBufSize, err := p.Load("queryNode.msgStream.insert.recvBufSize")
	if err != nil {
		panic(err)
	}
	bufSize, err := strconv.Atoi(revBufSize)
	if err != nil {
		panic(err)
	}
	return int64(bufSize)
N
neza2017 已提交
132 133 134
}

func (p *ParamTable) insertPulsarBufSize() int64 {
N
neza2017 已提交
135 136 137 138 139 140 141 142 143
	pulsarBufSize, err := p.Load("queryNode.msgStream.insert.pulsarBufSize")
	if err != nil {
		panic(err)
	}
	bufSize, err := strconv.Atoi(pulsarBufSize)
	if err != nil {
		panic(err)
	}
	return int64(bufSize)
N
neza2017 已提交
144 145 146
}

func (p *ParamTable) searchReceiveBufSize() int64 {
N
neza2017 已提交
147 148 149 150 151 152 153 154 155
	revBufSize, err := p.Load("queryNode.msgStream.search.recvBufSize")
	if err != nil {
		panic(err)
	}
	bufSize, err := strconv.Atoi(revBufSize)
	if err != nil {
		panic(err)
	}
	return int64(bufSize)
N
neza2017 已提交
156 157 158
}

func (p *ParamTable) searchPulsarBufSize() int64 {
N
neza2017 已提交
159 160 161 162 163 164 165 166 167
	pulsarBufSize, err := p.Load("queryNode.msgStream.search.pulsarBufSize")
	if err != nil {
		panic(err)
	}
	bufSize, err := strconv.Atoi(pulsarBufSize)
	if err != nil {
		panic(err)
	}
	return int64(bufSize)
N
neza2017 已提交
168 169 170
}

func (p *ParamTable) searchResultReceiveBufSize() int64 {
N
neza2017 已提交
171 172 173 174 175 176 177 178 179
	revBufSize, err := p.Load("queryNode.msgStream.searchResult.recvBufSize")
	if err != nil {
		panic(err)
	}
	bufSize, err := strconv.Atoi(revBufSize)
	if err != nil {
		panic(err)
	}
	return int64(bufSize)
N
neza2017 已提交
180 181 182
}

func (p *ParamTable) statsReceiveBufSize() int64 {
N
neza2017 已提交
183 184 185 186 187 188 189 190 191
	revBufSize, err := p.Load("queryNode.msgStream.stats.recvBufSize")
	if err != nil {
		panic(err)
	}
	bufSize, err := strconv.Atoi(revBufSize)
	if err != nil {
		panic(err)
	}
	return int64(bufSize)
N
neza2017 已提交
192 193 194 195 196 197 198 199 200 201
}

func (p *ParamTable) etcdAddress() string {
	etcdAddress, err := p.Load("_EtcdAddress")
	if err != nil {
		panic(err)
	}
	return etcdAddress
}

C
cai.zhang 已提交
202 203
func (p *ParamTable) metaRootPath() string {
	rootPath, err := p.Load("etcd.rootPath")
N
neza2017 已提交
204 205 206
	if err != nil {
		panic(err)
	}
C
cai.zhang 已提交
207 208 209 210 211
	subPath, err := p.Load("etcd.metaSubPath")
	if err != nil {
		panic(err)
	}
	return rootPath + "/" + subPath
N
neza2017 已提交
212 213 214
}

func (p *ParamTable) gracefulTime() int64 {
N
neza2017 已提交
215 216 217 218 219 220 221 222 223
	gracefulTime, err := p.Load("queryNode.gracefulTime")
	if err != nil {
		panic(err)
	}
	time, err := strconv.Atoi(gracefulTime)
	if err != nil {
		panic(err)
	}
	return int64(time)
N
neza2017 已提交
224 225 226
}

func (p *ParamTable) insertChannelNames() []string {
N
neza2017 已提交
227
	ch, err := p.Load("msgChannel.chanNamePrefix.insert")
N
neza2017 已提交
228 229 230 231 232 233 234 235
	if err != nil {
		log.Fatal(err)
	}
	channelRange, err := p.Load("msgChannel.channelRange.insert")
	if err != nil {
		panic(err)
	}

N
neza2017 已提交
236 237 238 239 240 241 242
	chanRange := strings.Split(channelRange, ",")
	if len(chanRange) != 2 {
		panic("Illegal channel range num")
	}
	channelBegin, err := strconv.Atoi(chanRange[0])
	if err != nil {
		panic(err)
N
neza2017 已提交
243
	}
N
neza2017 已提交
244 245 246 247 248 249 250 251 252
	channelEnd, err := strconv.Atoi(chanRange[1])
	if err != nil {
		panic(err)
	}
	if channelBegin < 0 || channelEnd < 0 {
		panic("Illegal channel range value")
	}
	if channelBegin > channelEnd {
		panic("Illegal channel range value")
N
neza2017 已提交
253
	}
N
neza2017 已提交
254 255 256 257 258 259

	channels := make([]string, channelEnd-channelBegin)
	for i := 0; i < channelEnd-channelBegin; i++ {
		channels[i] = ch + "-" + strconv.Itoa(channelBegin+i)
	}
	return channels
N
neza2017 已提交
260 261 262
}

func (p *ParamTable) searchChannelNames() []string {
N
neza2017 已提交
263
	ch, err := p.Load("msgChannel.chanNamePrefix.search")
N
neza2017 已提交
264 265 266 267 268 269 270 271
	if err != nil {
		log.Fatal(err)
	}
	channelRange, err := p.Load("msgChannel.channelRange.search")
	if err != nil {
		panic(err)
	}

N
neza2017 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
	chanRange := strings.Split(channelRange, ",")
	if len(chanRange) != 2 {
		panic("Illegal channel range num")
	}
	channelBegin, err := strconv.Atoi(chanRange[0])
	if err != nil {
		panic(err)
	}
	channelEnd, err := strconv.Atoi(chanRange[1])
	if err != nil {
		panic(err)
	}
	if channelBegin < 0 || channelEnd < 0 {
		panic("Illegal channel range value")
	}
	if channelBegin > channelEnd {
		panic("Illegal channel range value")
	}
N
neza2017 已提交
290

N
neza2017 已提交
291 292 293
	channels := make([]string, channelEnd-channelBegin)
	for i := 0; i < channelEnd-channelBegin; i++ {
		channels[i] = ch + "-" + strconv.Itoa(channelBegin+i)
N
neza2017 已提交
294
	}
N
neza2017 已提交
295
	return channels
N
neza2017 已提交
296 297 298
}

func (p *ParamTable) searchResultChannelNames() []string {
N
neza2017 已提交
299
	ch, err := p.Load("msgChannel.chanNamePrefix.searchResult")
N
neza2017 已提交
300 301 302 303 304 305 306 307
	if err != nil {
		log.Fatal(err)
	}
	channelRange, err := p.Load("msgChannel.channelRange.searchResult")
	if err != nil {
		panic(err)
	}

N
neza2017 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
	chanRange := strings.Split(channelRange, ",")
	if len(chanRange) != 2 {
		panic("Illegal channel range num")
	}
	channelBegin, err := strconv.Atoi(chanRange[0])
	if err != nil {
		panic(err)
	}
	channelEnd, err := strconv.Atoi(chanRange[1])
	if err != nil {
		panic(err)
	}
	if channelBegin < 0 || channelEnd < 0 {
		panic("Illegal channel range value")
	}
	if channelBegin > channelEnd {
		panic("Illegal channel range value")
	}
N
neza2017 已提交
326

N
neza2017 已提交
327 328 329
	channels := make([]string, channelEnd-channelBegin)
	for i := 0; i < channelEnd-channelBegin; i++ {
		channels[i] = ch + "-" + strconv.Itoa(channelBegin+i)
N
neza2017 已提交
330
	}
N
neza2017 已提交
331
	return channels
N
neza2017 已提交
332 333 334 335 336 337 338 339
}

func (p *ParamTable) msgChannelSubName() string {
	// TODO: subName = namePrefix + "-" + queryNodeID, queryNodeID is assigned by master
	name, err := p.Load("msgChannel.subNamePrefix.queryNodeSubNamePrefix")
	if err != nil {
		log.Panic(err)
	}
N
neza2017 已提交
340
	return name
N
neza2017 已提交
341 342 343 344 345 346 347 348 349
}

func (p *ParamTable) statsChannelName() string {
	channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats")
	if err != nil {
		panic(err)
	}
	return channels
}