param.go 7.9 KB
Newer Older
S
sunby 已提交
1 2 3 4 5 6 7 8 9 10
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
S
sunby 已提交
11 12 13
package dataservice

import (
14
	"path"
15
	"strconv"
16
	"sync"
17

X
Xiangyu Wang 已提交
18
	"github.com/milvus-io/milvus/internal/log"
19

X
Xiangyu Wang 已提交
20
	"github.com/milvus-io/milvus/internal/util/paramtable"
S
sunby 已提交
21 22 23 24 25
)

type ParamTable struct {
	paramtable.BaseTable

26
	NodeID int64
S
sunby 已提交
27

G
godchen 已提交
28 29 30
	IP   string
	Port int

31 32 33 34 35 36
	// --- ETCD ---
	EtcdAddress             string
	MetaRootPath            string
	KvRootPath              string
	SegmentBinlogSubPath    string
	CollectionBinlogSubPath string
37 38
	SegmentDmlPosSubPath    string
	SegmentDdlPosSubPath    string
39 40
	DmlChannelPosSubPath    string
	DdlChannelPosSubPath    string
41 42

	// --- Pulsar ---
S
sunby 已提交
43
	PulsarAddress string
S
sunby 已提交
44

45 46 47
	FlushStreamPosSubPath string
	StatsStreamPosSubPath string

S
sunby 已提交
48 49 50 51 52
	// segment
	SegmentSize           float64
	SegmentSizeFactor     float64
	SegIDAssignExpiration int64

53 54 55 56 57 58 59 60
	InsertChannelPrefixName     string
	InsertChannelNum            int64
	StatisticsChannelName       string
	TimeTickChannelName         string
	DataNodeNum                 int
	SegmentInfoChannelName      string
	DataServiceSubscriptionName string
	K2SChannelNames             []string
61
	ProxyTimeTickChannelName    string
S
sunby 已提交
62 63

	SegmentFlushMetaPath string
64
	Log                  log.Config
S
sunby 已提交
65 66 67
}

var Params ParamTable
68
var once sync.Once
S
sunby 已提交
69 70

func (p *ParamTable) Init() {
71 72 73 74 75 76 77 78 79 80 81 82 83 84
	once.Do(func() {
		// load yaml
		p.BaseTable.Init()

		if err := p.LoadYaml("advanced/data_service.yaml"); err != nil {
			panic(err)
		}

		// set members
		p.initNodeID()

		p.initEtcdAddress()
		p.initMetaRootPath()
		p.initKvRootPath()
85 86 87
		p.initSegmentBinlogSubPath()
		p.initCollectionBinlogSubPath()

88 89 90 91 92 93 94 95 96 97 98 99 100 101
		p.initPulsarAddress()

		p.initSegmentSize()
		p.initSegmentSizeFactor()
		p.initSegIDAssignExpiration()
		p.initInsertChannelPrefixName()
		p.initInsertChannelNum()
		p.initStatisticsChannelName()
		p.initTimeTickChannelName()
		p.initDataNodeNum()
		p.initSegmentInfoChannelName()
		p.initDataServiceSubscriptionName()
		p.initK2SChannelNames()
		p.initSegmentFlushMetaPath()
102
		p.initLogCfg()
103
		p.initProxyServiceTimeTickChannelName()
104 105 106

		p.initFlushStreamPosSubPath()
		p.initStatsStreamPosSubPath()
107 108
		p.initSegmentDmlPosSubPath()
		p.initSegmentDdlPosSubPath()
109
	})
S
sunby 已提交
110 111
}

112 113
func (p *ParamTable) initNodeID() {
	p.NodeID = p.ParseInt64("dataservice.nodeID")
S
sunby 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
}

func (p *ParamTable) initEtcdAddress() {
	addr, err := p.Load("_EtcdAddress")
	if err != nil {
		panic(err)
	}
	p.EtcdAddress = addr
}

func (p *ParamTable) initPulsarAddress() {
	addr, err := p.Load("_PulsarAddress")
	if err != nil {
		panic(err)
	}
	p.PulsarAddress = addr
}

func (p *ParamTable) initMetaRootPath() {
	rootPath, err := p.Load("etcd.rootPath")
	if err != nil {
		panic(err)
	}
	subPath, err := p.Load("etcd.metaSubPath")
	if err != nil {
		panic(err)
	}
	p.MetaRootPath = rootPath + "/" + subPath
}

func (p *ParamTable) initKvRootPath() {
	rootPath, err := p.Load("etcd.rootPath")
	if err != nil {
		panic(err)
	}
	subPath, err := p.Load("etcd.kvSubPath")
	if err != nil {
		panic(err)
	}
	p.KvRootPath = rootPath + "/" + subPath
}
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171

func (p *ParamTable) initSegmentBinlogSubPath() {
	subPath, err := p.Load("etcd.segmentBinlogSubPath")
	if err != nil {
		panic(err)
	}
	p.SegmentBinlogSubPath = subPath
}

func (p *ParamTable) initCollectionBinlogSubPath() {
	subPath, err := p.Load("etcd.collectionBinlogSubPath")
	if err != nil {
		panic(err)
	}
	p.CollectionBinlogSubPath = subPath
}

S
sunby 已提交
172
func (p *ParamTable) initSegmentSize() {
173
	p.SegmentSize = p.ParseFloat("dataservice.segment.size")
S
sunby 已提交
174 175 176
}

func (p *ParamTable) initSegmentSizeFactor() {
177
	p.SegmentSizeFactor = p.ParseFloat("dataservice.segment.sizeFactor")
S
sunby 已提交
178 179
}

S
sunby 已提交
180
func (p *ParamTable) initSegIDAssignExpiration() {
181
	p.SegIDAssignExpiration = p.ParseInt64("dataservice.segment.IDAssignExpiration") //ms
S
sunby 已提交
182 183 184
}

func (p *ParamTable) initInsertChannelPrefixName() {
185 186 187 188 189
	var err error
	p.InsertChannelPrefixName, err = p.Load("msgChannel.chanNamePrefix.dataServiceInsertChannel")
	if err != nil {
		panic(err)
	}
S
sunby 已提交
190 191
}

192 193
func (p *ParamTable) initInsertChannelNum() {
	p.InsertChannelNum = p.ParseInt64("dataservice.insertChannelNum")
S
sunby 已提交
194 195 196
}

func (p *ParamTable) initStatisticsChannelName() {
197 198 199 200 201
	var err error
	p.StatisticsChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceStatistic")
	if err != nil {
		panic(err)
	}
S
sunby 已提交
202 203 204
}

func (p *ParamTable) initTimeTickChannelName() {
205 206 207 208 209
	var err error
	p.TimeTickChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceTimeTick")
	if err != nil {
		panic(err)
	}
S
sunby 已提交
210 211 212
}

func (p *ParamTable) initDataNodeNum() {
213
	p.DataNodeNum = p.ParseInt("dataservice.dataNodeNum")
S
sunby 已提交
214
}
N
neza2017 已提交
215 216

func (p *ParamTable) initSegmentInfoChannelName() {
217 218 219 220 221
	var err error
	p.SegmentInfoChannelName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSegmentInfo")
	if err != nil {
		panic(err)
	}
N
neza2017 已提交
222 223 224
}

func (p *ParamTable) initDataServiceSubscriptionName() {
225
	var err error
226
	p.DataServiceSubscriptionName, err = p.Load("msgChannel.subNamePrefix.dataServiceSubNamePrefix")
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
	if err != nil {
		panic(err)
	}
}

func (p *ParamTable) initK2SChannelNames() {
	prefix, err := p.Load("msgChannel.chanNamePrefix.k2s")
	if err != nil {
		panic(err)
	}
	prefix += "-"
	iRangeStr, err := p.Load("msgChannel.channelRange.k2s")
	if err != nil {
		panic(err)
	}
	channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
	}
	p.K2SChannelNames = ret
N
neza2017 已提交
248
}
S
sunby 已提交
249 250 251 252 253 254 255 256

func (p *ParamTable) initSegmentFlushMetaPath() {
	subPath, err := p.Load("etcd.segFlushMetaSubPath")
	if err != nil {
		panic(err)
	}
	p.SegmentFlushMetaPath = subPath
}
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285

func (p *ParamTable) initLogCfg() {
	p.Log = log.Config{}
	format, err := p.Load("log.format")
	if err != nil {
		panic(err)
	}
	p.Log.Format = format
	level, err := p.Load("log.level")
	if err != nil {
		panic(err)
	}
	p.Log.Level = level
	devStr, err := p.Load("log.dev")
	if err != nil {
		panic(err)
	}
	dev, err := strconv.ParseBool(devStr)
	if err != nil {
		panic(err)
	}
	p.Log.Development = dev
	p.Log.File.MaxSize = p.ParseInt("log.file.maxSize")
	p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups")
	p.Log.File.MaxDays = p.ParseInt("log.file.maxAge")
	rootPath, err := p.Load("log.file.rootPath")
	if err != nil {
		panic(err)
	}
S
sunby 已提交
286 287 288 289 290
	if len(rootPath) != 0 {
		p.Log.File.Filename = path.Join(rootPath, "dataservice-"+strconv.FormatInt(p.NodeID, 10)+".log")
	} else {
		p.Log.File.Filename = ""
	}
291
}
292 293 294 295 296 297 298 299

func (p *ParamTable) initProxyServiceTimeTickChannelName() {
	ch, err := p.Load("msgChannel.chanNamePrefix.proxyServiceTimeTick")
	if err != nil {
		panic(err)
	}
	p.ProxyTimeTickChannelName = ch
}
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315

func (p *ParamTable) initFlushStreamPosSubPath() {
	subPath, err := p.Load("etcd.flushStreamPosSubPath")
	if err != nil {
		panic(err)
	}
	p.FlushStreamPosSubPath = subPath
}

func (p *ParamTable) initStatsStreamPosSubPath() {
	subPath, err := p.Load("etcd.statsStreamPosSubPath")
	if err != nil {
		panic(err)
	}
	p.StatsStreamPosSubPath = subPath
}
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331

func (p *ParamTable) initSegmentDmlPosSubPath() {
	subPath, err := p.Load("etcd.segmentDmlPosSubPath")
	if err != nil {
		panic(err)
	}
	p.SegmentDmlPosSubPath = subPath
}

func (p *ParamTable) initSegmentDdlPosSubPath() {
	subPath, err := p.Load("etcd.segmentDdlPosSubPath")
	if err != nil {
		panic(err)
	}
	p.SegmentDdlPosSubPath = subPath
}
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347

func (p *ParamTable) initDmlChannelPosSubPath() {
	subPath, err := p.Load("etcd.dmlChanPosSubPath")
	if err != nil {
		panic(err)
	}
	p.DmlChannelPosSubPath = subPath
}

func (p *ParamTable) initDdlChannelPosSubPath() {
	subPath, err := p.Load("etcd.ddlChanPosSubPath")
	if err != nil {
		panic(err)
	}
	p.DdlChannelPosSubPath = subPath
}