impl.go 8.0 KB
Newer Older
D
dragondriver 已提交
1 2 3 4
package proxyservice

import (
	"context"
5 6 7 8 9
	"io/ioutil"
	"log"
	"os"
	"path"
	"runtime"
G
godchen 已提交
10
	"strconv"
D
dragondriver 已提交
11 12
	"time"

13 14
	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"

D
dragondriver 已提交
15 16 17 18 19 20 21 22 23
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"

	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"

	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
	"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
)

const (
24 25 26 27 28 29 30 31 32
	timeoutInterval      = time.Second * 10
	StartParamsKey       = "START_PARAMS"
	ChannelYamlContent   = "advanced/channel.yaml"
	CommonYamlContent    = "advanced/common.yaml"
	DataNodeYamlContent  = "advanced/data_node.yaml"
	MasterYamlContent    = "advanced/master.yaml"
	ProxyNodeYamlContent = "advanced/proxy_node.yaml"
	QueryNodeYamlContent = "advanced/query_node.yaml"
	MilvusYamlContent    = "milvus.yaml"
D
dragondriver 已提交
33 34
)

35 36
func (s *ServiceImpl) fillNodeInitParams() error {
	s.nodeStartParams = make([]*commonpb.KeyValuePair, 0)
37 38 39

	getConfigContentByName := func(fileName string) []byte {
		_, fpath, _, _ := runtime.Caller(0)
X
Xiangyu Wang 已提交
40
		configFile := path.Dir(fpath) + "/../../configs/" + fileName
41
		_, err := os.Stat(configFile)
X
Xiangyu Wang 已提交
42
		log.Printf("configFile = %s", configFile)
43 44 45 46 47 48 49 50 51 52 53
		if os.IsNotExist(err) {
			runPath, err := os.Getwd()
			if err != nil {
				panic(err)
			}
			configFile = runPath + "/configs/" + fileName
		}
		data, err := ioutil.ReadFile(configFile)
		if err != nil {
			panic(err)
		}
54
		return append(data, []byte("\n")...)
55 56
	}

57 58 59 60 61 62 63 64
	channelYamlContent := getConfigContentByName(ChannelYamlContent)
	commonYamlContent := getConfigContentByName(CommonYamlContent)
	dataNodeYamlContent := getConfigContentByName(DataNodeYamlContent)
	masterYamlContent := getConfigContentByName(MasterYamlContent)
	proxyNodeYamlContent := getConfigContentByName(ProxyNodeYamlContent)
	queryNodeYamlContent := getConfigContentByName(QueryNodeYamlContent)
	milvusYamlContent := getConfigContentByName(MilvusYamlContent)

65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
	appendContent := func(key string, content []byte) {
		s.nodeStartParams = append(s.nodeStartParams, &commonpb.KeyValuePair{
			Key:   StartParamsKey + "_" + key,
			Value: string(content),
		})
	}
	appendContent(ChannelYamlContent, channelYamlContent)
	appendContent(CommonYamlContent, commonYamlContent)
	appendContent(DataNodeYamlContent, dataNodeYamlContent)
	appendContent(MasterYamlContent, masterYamlContent)
	appendContent(ProxyNodeYamlContent, proxyNodeYamlContent)
	appendContent(QueryNodeYamlContent, queryNodeYamlContent)
	appendContent(MilvusYamlContent, milvusYamlContent)

	// var allContent []byte
	// allContent = append(allContent, channelYamlContent...)
	// allContent = append(allContent, commonYamlContent...)
	// allContent = append(allContent, dataNodeYamlContent...)
	// allContent = append(allContent, masterYamlContent...)
	// allContent = append(allContent, proxyNodeYamlContent...)
	// allContent = append(allContent, queryNodeYamlContent...)
	// allContent = append(allContent, writeNodeYamlContent...)
	// allContent = append(allContent, milvusYamlContent...)

	// s.nodeStartParams = append(s.nodeStartParams, &commonpb.KeyValuePair{
	// 	Key:   StartParamsKey,
	// 	Value: string(allContent),
	// })
93

94 95 96 97
	return nil
}

func (s *ServiceImpl) Init() error {
Z
zhenshan.cao 已提交
98
	factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
Z
zhenshan.cao 已提交
99

100 101 102 103
	err := s.fillNodeInitParams()
	if err != nil {
		return err
	}
104
	log.Println("fill node init params ...")
105

Z
zhenshan.cao 已提交
106 107
	serviceTimeTickMsgStream, _ := factory.NewTtMsgStream(s.ctx)
	serviceTimeTickMsgStream.AsProducer([]string{Params.ServiceTimeTickChannel})
108
	log.Println("create service time tick producer channel: ", []string{Params.ServiceTimeTickChannel})
109

G
godchen 已提交
110 111 112 113 114
	channels := make([]string, Params.InsertChannelNum)
	var i int64 = 0
	for ; i < Params.InsertChannelNum; i++ {
		channels[i] = Params.InsertChannelPrefixName + strconv.FormatInt(i, 10)
	}
Z
zhenshan.cao 已提交
115 116
	insertTickMsgStream, _ := factory.NewMsgStream(s.ctx)
	insertTickMsgStream.AsProducer(channels)
G
godchen 已提交
117
	log.Println("create insert time tick producer channel: ", channels)
G
godchen 已提交
118

Z
zhenshan.cao 已提交
119 120
	nodeTimeTickMsgStream, _ := factory.NewMsgStream(s.ctx)
	nodeTimeTickMsgStream.AsConsumer(Params.NodeTimeTickChannel,
121
		"proxyservicesub") // TODO: add config
122
	log.Println("create node time tick consumer channel: ", Params.NodeTimeTickChannel)
123 124

	ttBarrier := newSoftTimeTickBarrier(s.ctx, nodeTimeTickMsgStream, []UniqueID{0}, 10)
125
	log.Println("create soft time tick barrier ...")
G
godchen 已提交
126
	s.tick = newTimeTick(s.ctx, ttBarrier, serviceTimeTickMsgStream, insertTickMsgStream)
127
	log.Println("create time tick ...")
128

Z
zhenshan.cao 已提交
129
	s.stateCode = internalpb2.StateCode_HEALTHY
130

D
dragondriver 已提交
131 132 133 134 135
	return nil
}

func (s *ServiceImpl) Start() error {
	s.sched.Start()
136
	log.Println("start scheduler ...")
137
	return s.tick.Start()
D
dragondriver 已提交
138 139 140 141
}

func (s *ServiceImpl) Stop() error {
	s.sched.Close()
142
	log.Println("close scheduler ...")
143
	s.tick.Close()
144 145
	log.Println("close time tick")

146
	err := s.nodeInfos.ReleaseAllClients()
147
	if err != nil {
148
		panic(err)
149 150 151 152 153
	}
	log.Println("stop all node clients ...")

	s.cancel()

D
dragondriver 已提交
154 155 156 157
	return nil
}

func (s *ServiceImpl) GetComponentStates() (*internalpb2.ComponentStates, error) {
Z
zhenshan.cao 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
	stateInfo := &internalpb2.ComponentInfo{
		NodeID:    UniqueID(0),
		Role:      "ProxyService",
		StateCode: s.stateCode,
	}

	ret := &internalpb2.ComponentStates{
		State:              stateInfo,
		SubcomponentStates: nil, // todo add subcomponents states
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
	}
	return ret, nil
}

func (s *ServiceImpl) UpdateStateCode(code internalpb2.StateCode) {
	s.stateCode = code
D
dragondriver 已提交
176 177
}

G
godchen 已提交
178 179 180 181 182 183 184
func (s *ServiceImpl) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Value: Params.ServiceTimeTickChannel,
	}, nil
D
dragondriver 已提交
185 186
}

G
godchen 已提交
187
func (s *ServiceImpl) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
D
dragondriver 已提交
188 189 190 191
	panic("implement me")
}

func (s *ServiceImpl) RegisterLink() (*milvuspb.RegisterLinkResponse, error) {
192
	log.Println("register link")
D
dragondriver 已提交
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
	ctx, cancel := context.WithTimeout(s.ctx, timeoutInterval)
	defer cancel()

	t := &RegisterLinkTask{
		Condition: NewTaskCondition(ctx),
		nodeInfos: s.nodeInfos,
	}

	var err error

	err = s.sched.RegisterLinkTaskQueue.Enqueue(t)
	if err != nil {
		return &milvuspb.RegisterLinkResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
			Address: nil,
		}, nil
	}

	err = t.WaitToFinish()
	if err != nil {
		return &milvuspb.RegisterLinkResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
			Address: nil,
		}, nil
	}

	return t.response, nil
}

func (s *ServiceImpl) RegisterNode(request *proxypb.RegisterNodeRequest) (*proxypb.RegisterNodeResponse, error) {
229
	log.Println("RegisterNode: ", request)
D
dragondriver 已提交
230 231 232 233
	ctx, cancel := context.WithTimeout(s.ctx, timeoutInterval)
	defer cancel()

	t := &RegisterNodeTask{
234 235 236 237 238
		request:     request,
		startParams: s.nodeStartParams,
		Condition:   NewTaskCondition(ctx),
		allocator:   s.allocator,
		nodeInfos:   s.nodeInfos,
D
dragondriver 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
	}

	var err error

	err = s.sched.RegisterNodeTaskQueue.Enqueue(t)
	if err != nil {
		return &proxypb.RegisterNodeResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
			InitParams: nil,
		}, nil
	}

	err = t.WaitToFinish()
	if err != nil {
		return &proxypb.RegisterNodeResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
			InitParams: nil,
		}, nil
	}

	return t.response, nil
}

G
godchen 已提交
268
func (s *ServiceImpl) InvalidateCollectionMetaCache(request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
269
	log.Println("InvalidateCollectionMetaCache")
D
dragondriver 已提交
270 271 272 273 274 275
	ctx, cancel := context.WithTimeout(s.ctx, timeoutInterval)
	defer cancel()

	t := &InvalidateCollectionMetaCacheTask{
		request:   request,
		Condition: NewTaskCondition(ctx),
G
godchen 已提交
276
		nodeInfos: s.nodeInfos,
D
dragondriver 已提交
277 278 279 280
	}

	var err error

281
	err = s.sched.InvalidateCollectionMetaCacheTaskQueue.Enqueue(t)
D
dragondriver 已提交
282
	if err != nil {
G
godchen 已提交
283
		return nil, err
D
dragondriver 已提交
284 285 286 287
	}

	err = t.WaitToFinish()
	if err != nil {
G
godchen 已提交
288
		return nil, err
D
dragondriver 已提交
289 290
	}

G
godchen 已提交
291
	return nil, nil
D
dragondriver 已提交
292
}