param_table.go 7.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
package master

import (
	"log"
	"strconv"
	"strings"

	"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
	"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)

type ParamTable struct {
	paramtable.BaseTable

	Address string
	Port    int

	EtcdAddress   string
C
cai.zhang 已提交
20 21
	MetaRootPath  string
	KvRootPath    string
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
	PulsarAddress string

	// nodeID
	ProxyIDList     []typeutil.UniqueID
	WriteNodeIDList []typeutil.UniqueID

	TopicNum                    int
	QueryNodeNum                int
	SoftTimeTickBarrierInterval typeutil.Timestamp

	// segment
	SegmentSize           float64
	SegmentSizeFactor     float64
	DefaultRecordSize     int64
	MinSegIDAssignCnt     int64
	MaxSegIDAssignCnt     int64
	SegIDAssignExpiration int64

	// msgChannel
	ProxyTimeTickChannelNames     []string
	WriteNodeTimeTickChannelNames []string
Z
zhenshan.cao 已提交
43
	DDChannelNames                []string
44 45 46 47
	InsertChannelNames            []string
	K2SChannelNames               []string
	QueryNodeStatsChannelName     string
	MsgChannelSubName             string
N
neza2017 已提交
48 49 50

	MaxPartitionNum     int64
	DefaultPartitionTag string
51 52 53 54 55 56 57
}

var Params ParamTable

func (p *ParamTable) Init() {
	// load yaml
	p.BaseTable.Init()
X
XuanYang-cn 已提交
58 59

	err := p.LoadYaml("advanced/master.yaml")
N
neza2017 已提交
60 61 62
	if err != nil {
		panic(err)
	}
63 64 65 66 67 68

	// set members
	p.initAddress()
	p.initPort()

	p.initEtcdAddress()
C
cai.zhang 已提交
69 70
	p.initMetaRootPath()
	p.initKvRootPath()
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
	p.initPulsarAddress()

	p.initProxyIDList()
	p.initWriteNodeIDList()

	p.initTopicNum()
	p.initQueryNodeNum()
	p.initSoftTimeTickBarrierInterval()

	p.initSegmentSize()
	p.initSegmentSizeFactor()
	p.initDefaultRecordSize()
	p.initMinSegIDAssignCnt()
	p.initMaxSegIDAssignCnt()
	p.initSegIDAssignExpiration()

	p.initProxyTimeTickChannelNames()
	p.initWriteNodeTimeTickChannelNames()
	p.initInsertChannelNames()
Z
zhenshan.cao 已提交
90
	p.initDDChannelNames()
91 92 93
	p.initK2SChannelNames()
	p.initQueryNodeStatsChannelName()
	p.initMsgChannelSubName()
N
neza2017 已提交
94 95
	p.initMaxPartitionNum()
	p.initDefaultPartitionTag()
96 97 98 99 100 101 102 103 104 105 106
}

func (p *ParamTable) initAddress() {
	masterAddress, err := p.Load("master.address")
	if err != nil {
		panic(err)
	}
	p.Address = masterAddress
}

func (p *ParamTable) initPort() {
X
XuanYang-cn 已提交
107
	p.Port = p.ParseInt("master.port")
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
}

func (p *ParamTable) initEtcdAddress() {
	addr, err := p.Load("_EtcdAddress")
	if err != nil {
		panic(err)
	}
	p.EtcdAddress = addr
}

func (p *ParamTable) initPulsarAddress() {
	addr, err := p.Load("_PulsarAddress")
	if err != nil {
		panic(err)
	}
	p.PulsarAddress = addr
}

C
cai.zhang 已提交
126 127
func (p *ParamTable) initMetaRootPath() {
	rootPath, err := p.Load("etcd.rootPath")
128 129 130
	if err != nil {
		panic(err)
	}
C
cai.zhang 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
	subPath, err := p.Load("etcd.metaSubPath")
	if err != nil {
		panic(err)
	}
	p.MetaRootPath = rootPath + "/" + subPath
}

func (p *ParamTable) initKvRootPath() {
	rootPath, err := p.Load("etcd.rootPath")
	if err != nil {
		panic(err)
	}
	subPath, err := p.Load("etcd.kvSubPath")
	if err != nil {
		panic(err)
	}
	p.KvRootPath = rootPath + "/" + subPath
148 149 150
}

func (p *ParamTable) initTopicNum() {
X
XuanYang-cn 已提交
151
	iRangeStr, err := p.Load("msgChannel.channelRange.insert")
152 153 154
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
155 156
	rangeSlice := paramtable.ConvertRangeToIntRange(iRangeStr, ",")
	p.TopicNum = rangeSlice[1] - rangeSlice[0]
157 158 159
}

func (p *ParamTable) initSegmentSize() {
X
XuanYang-cn 已提交
160
	p.SegmentSize = p.ParseFloat("master.segment.size")
161 162 163
}

func (p *ParamTable) initSegmentSizeFactor() {
X
XuanYang-cn 已提交
164
	p.SegmentSizeFactor = p.ParseFloat("master.segment.sizeFactor")
165 166 167
}

func (p *ParamTable) initDefaultRecordSize() {
X
XuanYang-cn 已提交
168
	p.DefaultRecordSize = p.ParseInt64("master.segment.defaultSizePerRecord")
169 170 171
}

func (p *ParamTable) initMinSegIDAssignCnt() {
X
XuanYang-cn 已提交
172
	p.MinSegIDAssignCnt = p.ParseInt64("master.segment.minIDAssignCnt")
173 174 175
}

func (p *ParamTable) initMaxSegIDAssignCnt() {
X
XuanYang-cn 已提交
176
	p.MaxSegIDAssignCnt = p.ParseInt64("master.segment.maxIDAssignCnt")
177 178 179
}

func (p *ParamTable) initSegIDAssignExpiration() {
X
XuanYang-cn 已提交
180
	p.SegIDAssignExpiration = p.ParseInt64("master.segment.IDAssignExpiration")
181 182 183
}

func (p *ParamTable) initQueryNodeNum() {
X
XuanYang-cn 已提交
184
	p.QueryNodeNum = len(p.QueryNodeIDList())
185 186 187 188 189 190 191 192 193 194 195
}

func (p *ParamTable) initQueryNodeStatsChannelName() {
	channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats")
	if err != nil {
		panic(err)
	}
	p.QueryNodeStatsChannelName = channels
}

func (p *ParamTable) initProxyIDList() {
X
XuanYang-cn 已提交
196
	p.ProxyIDList = p.BaseTable.ProxyIDList()
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
}

func (p *ParamTable) initProxyTimeTickChannelNames() {
	ch, err := p.Load("msgChannel.chanNamePrefix.proxyTimeTick")
	if err != nil {
		log.Panic(err)
	}
	id, err := p.Load("nodeID.proxyIDList")
	if err != nil {
		log.Panicf("load proxy id list error, %s", err.Error())
	}
	ids := strings.Split(id, ",")
	channels := make([]string, 0, len(ids))
	for _, i := range ids {
		_, err := strconv.ParseInt(i, 10, 64)
		if err != nil {
			log.Panicf("load proxy id list error, %s", err.Error())
		}
		channels = append(channels, ch+"-"+i)
	}
	p.ProxyTimeTickChannelNames = channels
}

func (p *ParamTable) initMsgChannelSubName() {
	name, err := p.Load("msgChannel.subNamePrefix.masterSubNamePrefix")
	if err != nil {
		log.Panic(err)
	}
	p.MsgChannelSubName = name
}

func (p *ParamTable) initSoftTimeTickBarrierInterval() {
	t, err := p.Load("master.timeSync.softTimeTickBarrierInterval")
	if err != nil {
		log.Panic(err)
	}
	v, err := strconv.ParseInt(t, 10, 64)
	if err != nil {
		log.Panic(err)
	}
	p.SoftTimeTickBarrierInterval = tsoutil.ComposeTS(v, 0)
}

func (p *ParamTable) initWriteNodeIDList() {
X
XuanYang-cn 已提交
241
	p.WriteNodeIDList = p.BaseTable.WriteNodeIDList()
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
}

func (p *ParamTable) initWriteNodeTimeTickChannelNames() {
	ch, err := p.Load("msgChannel.chanNamePrefix.writeNodeTimeTick")
	if err != nil {
		log.Fatal(err)
	}
	id, err := p.Load("nodeID.writeNodeIDList")
	if err != nil {
		log.Panicf("load write node id list error, %s", err.Error())
	}
	ids := strings.Split(id, ",")
	channels := make([]string, 0, len(ids))
	for _, i := range ids {
		_, err := strconv.ParseInt(i, 10, 64)
		if err != nil {
			log.Panicf("load write node id list error, %s", err.Error())
		}
		channels = append(channels, ch+"-"+i)
	}
	p.WriteNodeTimeTickChannelNames = channels
}

Z
zhenshan.cao 已提交
265
func (p *ParamTable) initDDChannelNames() {
X
XuanYang-cn 已提交
266
	prefix, err := p.Load("msgChannel.chanNamePrefix.dataDefinition")
Z
zhenshan.cao 已提交
267
	if err != nil {
X
XuanYang-cn 已提交
268
		panic(err)
Z
zhenshan.cao 已提交
269
	}
X
XuanYang-cn 已提交
270 271
	prefix += "-"
	iRangeStr, err := p.Load("msgChannel.channelRange.dataDefinition")
Z
zhenshan.cao 已提交
272
	if err != nil {
X
XuanYang-cn 已提交
273
		panic(err)
Z
zhenshan.cao 已提交
274
	}
X
XuanYang-cn 已提交
275 276 277 278
	channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
Z
zhenshan.cao 已提交
279
	}
X
XuanYang-cn 已提交
280
	p.DDChannelNames = ret
Z
zhenshan.cao 已提交
281 282
}

283
func (p *ParamTable) initInsertChannelNames() {
X
XuanYang-cn 已提交
284
	prefix, err := p.Load("msgChannel.chanNamePrefix.insert")
N
neza2017 已提交
285 286 287
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
288 289
	prefix += "-"
	iRangeStr, err := p.Load("msgChannel.channelRange.insert")
N
neza2017 已提交
290 291 292
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
293 294 295 296
	channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
297
	}
X
XuanYang-cn 已提交
298
	p.InsertChannelNames = ret
299 300 301
}

func (p *ParamTable) initK2SChannelNames() {
X
XuanYang-cn 已提交
302
	prefix, err := p.Load("msgChannel.chanNamePrefix.k2s")
303
	if err != nil {
X
XuanYang-cn 已提交
304
		panic(err)
305
	}
X
XuanYang-cn 已提交
306 307
	prefix += "-"
	iRangeStr, err := p.Load("msgChannel.channelRange.k2s")
308
	if err != nil {
X
XuanYang-cn 已提交
309
		panic(err)
310
	}
X
XuanYang-cn 已提交
311 312 313 314
	channelIDs := paramtable.ConvertRangeToIntSlice(iRangeStr, ",")
	var ret []string
	for _, ID := range channelIDs {
		ret = append(ret, prefix+strconv.Itoa(ID))
315
	}
X
XuanYang-cn 已提交
316
	p.K2SChannelNames = ret
317
}
N
neza2017 已提交
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338

func (p *ParamTable) initMaxPartitionNum() {
	str, err := p.Load("master.maxPartitionNum")
	if err != nil {
		panic(err)
	}
	maxPartitionNum, err := strconv.ParseInt(str, 10, 64)
	if err != nil {
		panic(err)
	}
	p.MaxPartitionNum = maxPartitionNum
}

func (p *ParamTable) initDefaultPartitionTag() {
	defaultTag, err := p.Load("common.defaultPartitionTag")
	if err != nil {
		panic(err)
	}

	p.DefaultPartitionTag = defaultTag
}