impl.go 12.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package querycoord
13 14 15 16 17 18 19 20

import (
	"context"
	"errors"
	"fmt"

	"go.uber.org/zap"

X
Xiangyu Wang 已提交
21 22 23 24 25
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
26 27
)

28
func (qc *QueryCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
29
	serviceComponentInfo := &internalpb.ComponentInfo{
30 31
		NodeID:    Params.QueryCoordID,
		StateCode: qc.stateCode.Load().(internalpb.StateCode),
32
	}
33 34 35 36 37 38 39 40 41 42

	//subComponentInfos, err := qs.cluster.GetComponentInfos(ctx)
	//if err != nil {
	//	return &internalpb.ComponentStates{
	//		Status: &commonpb.Status{
	//			ErrorCode: commonpb.ErrorCode_UnexpectedError,
	//			Reason:    err.Error(),
	//		},
	//	}, err
	//}
43 44 45 46
	return &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
47 48
		State: serviceComponentInfo,
		//SubcomponentStates: subComponentInfos,
49 50 51
	}, nil
}

52
func (qc *QueryCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
53 54 55 56 57 58 59 60 61
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.TimeTickChannelName,
	}, nil
}

62
func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
63 64 65 66 67 68 69 70 71
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.StatsChannelName,
	}, nil
}

72
func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
73
	dbID := req.DbID
74
	log.Debug("show collection start", zap.Int64("dbID", dbID))
75
	collectionIDs := qc.meta.showCollections()
76
	log.Debug("show collection end", zap.Int64s("collections", collectionIDs))
77 78 79 80 81 82 83 84
	return &querypb.ShowCollectionsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		CollectionIDs: collectionIDs,
	}, nil
}

85
func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
86
	collectionID := req.CollectionID
87
	//schema := req.Schema
88 89 90
	log.Debug("LoadCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
		zap.Stringer("schema", req.Schema))
	status := &commonpb.Status{
91
		ErrorCode: commonpb.ErrorCode_Success,
92 93
	}

94
	hasCollection := qc.meta.hasCollection(collectionID)
95
	if hasCollection {
96 97 98 99 100
		loadCollection, _ := qc.meta.getLoadCollection(collectionID)
		if loadCollection {
			status.Reason = "collection has been loaded"
			return status, nil
		}
101
	}
102

103 104
	loadCollectionTask := &LoadCollectionTask{
		BaseTask: BaseTask{
105 106
			ctx:              qc.loopCtx,
			Condition:        NewTaskCondition(qc.loopCtx),
107
			triggerCondition: querypb.TriggerCondition_grpcRequest,
108 109
		},
		LoadCollectionRequest: req,
110 111 112 113
		rootCoord:             qc.rootCoordClient,
		dataCoord:             qc.dataCoordClient,
		cluster:               qc.cluster,
		meta:                  qc.meta,
114
	}
115
	qc.scheduler.Enqueue([]task{loadCollectionTask})
116

117 118
	err := loadCollectionTask.WaitToFinish()
	if err != nil {
119
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
120 121 122
		status.Reason = err.Error()
		return status, err
	}
123 124 125 126 127

	log.Debug("LoadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
	return status, nil
}

128
func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
129
	//dbID := req.DbID
130 131 132 133 134
	collectionID := req.CollectionID
	log.Debug("ReleaseCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
135
	hasCollection := qc.meta.hasCollection(collectionID)
136
	if !hasCollection {
137
		log.Warn("release collection end, query coordinator don't have the log of", zap.String("collectionID", fmt.Sprintln(collectionID)))
138 139 140 141 142
		return status, nil
	}

	releaseCollectionTask := &ReleaseCollectionTask{
		BaseTask: BaseTask{
143 144
			ctx:              qc.loopCtx,
			Condition:        NewTaskCondition(qc.loopCtx),
145
			triggerCondition: querypb.TriggerCondition_grpcRequest,
146 147
		},
		ReleaseCollectionRequest: req,
148
		cluster:                  qc.cluster,
149
	}
150
	qc.scheduler.Enqueue([]task{releaseCollectionTask})
151 152

	err := releaseCollectionTask.WaitToFinish()
153 154 155 156 157 158 159
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		return status, err
	}

	log.Debug("ReleaseCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
160 161
	qc.meta.printMeta()
	qc.cluster.printMeta()
162 163 164
	return status, nil
}

165
func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
166
	collectionID := req.CollectionID
167
	log.Debug("show partitions start, ", zap.Int64("collectionID", collectionID))
168
	partitionIDs, err := qc.meta.showPartitions(collectionID)
169 170 171 172 173 174 175 176
	if err != nil {
		return &querypb.ShowPartitionsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, err
	}
177

178 179
	log.Debug("show partitions end", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))

180 181 182 183 184 185 186 187
	return &querypb.ShowPartitionsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		PartitionIDs: partitionIDs,
	}, nil
}

188
func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
189 190 191 192
	collectionID := req.CollectionID
	partitionIDs := req.PartitionIDs
	log.Debug("LoadPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
	status := &commonpb.Status{
193
		ErrorCode: commonpb.ErrorCode_Success,
194
	}
195 196

	if len(partitionIDs) == 0 {
197
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
198 199
		err := errors.New("partitionIDs are empty")
		status.Reason = err.Error()
200
		log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
201
		return status, err
202 203
	}

204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
	hasCollection := qc.meta.hasCollection(collectionID)
	if hasCollection {
		partitionIDsToLoad := make([]UniqueID, 0)
		loadCollection, _ := qc.meta.getLoadCollection(collectionID)
		if loadCollection {
			for _, partitionID := range partitionIDs {
				hasReleasePartition := qc.meta.hasReleasePartition(collectionID, partitionID)
				if hasReleasePartition {
					partitionIDsToLoad = append(partitionIDsToLoad, partitionID)
				}
			}
		} else {
			for _, partitionID := range partitionIDs {
				hasPartition := qc.meta.hasPartition(collectionID, partitionID)
				if !hasPartition {
					partitionIDsToLoad = append(partitionIDsToLoad, partitionID)
				}
			}
222 223
		}

224 225 226
		if len(partitionIDsToLoad) == 0 {
			log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
			return status, nil
227
		}
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
		req.PartitionIDs = partitionIDsToLoad
	}

	loadPartitionTask := &LoadPartitionTask{
		BaseTask: BaseTask{
			ctx:              qc.loopCtx,
			Condition:        NewTaskCondition(qc.loopCtx),
			triggerCondition: querypb.TriggerCondition_grpcRequest,
		},
		LoadPartitionsRequest: req,
		dataCoord:             qc.dataCoordClient,
		cluster:               qc.cluster,
		meta:                  qc.meta,
	}
	qc.scheduler.Enqueue([]task{loadPartitionTask})

	err := loadPartitionTask.WaitToFinish()
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
		return status, err
250 251 252
	}

	log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
253
	return status, nil
254 255
}

256
func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
257
	//dbID := req.DbID
258 259 260 261 262 263
	collectionID := req.CollectionID
	partitionIDs := req.PartitionIDs
	log.Debug("ReleasePartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
264 265 266 267
	hasCollection := qc.meta.hasCollection(collectionID)
	if !hasCollection {
		log.Warn("release partitions end, query coordinator don't have the log of", zap.String("collectionID", fmt.Sprintln(collectionID)))
		return status, nil
268 269
	}

270 271 272 273 274 275 276
	if len(partitionIDs) == 0 {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("partitionIDs are empty")
		status.Reason = err.Error()
		log.Debug("releasePartitionsRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
		return status, err
	}
277

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
	releasePartitionTask := &ReleasePartitionTask{
		BaseTask: BaseTask{
			ctx:              qc.loopCtx,
			Condition:        NewTaskCondition(qc.loopCtx),
			triggerCondition: querypb.TriggerCondition_grpcRequest,
		},
		ReleasePartitionsRequest: req,
		cluster:                  qc.cluster,
	}
	qc.scheduler.Enqueue([]task{releasePartitionTask})

	err := releasePartitionTask.WaitToFinish()
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		return status, err
294 295
	}
	log.Debug("ReleasePartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
296 297
	qc.meta.printMeta()
	qc.cluster.printMeta()
298 299 300
	return status, nil
}

301
func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.CreateQueryChannelRequest) (*querypb.CreateQueryChannelResponse, error) {
302
	collectionID := req.CollectionID
303
	queryChannel, queryResultChannel := qc.meta.GetQueryChannel(collectionID)
304 305 306 307 308 309 310 311 312 313

	return &querypb.CreateQueryChannelResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		RequestChannel: queryChannel,
		ResultChannel:  queryResultChannel,
	}, nil
}

314
func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
315 316 317
	partitionIDs := req.PartitionIDs
	partitionStates := make([]*querypb.PartitionStates, 0)
	for _, partitionID := range partitionIDs {
318
		state, err := qc.meta.getPartitionStateByID(partitionID)
319
		if err != nil {
320
			return &querypb.GetPartitionStatesResponse{
321 322 323 324 325 326
				Status: &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UnexpectedError,
					Reason:    err.Error(),
				},
			}, err
		}
327 328 329 330 331
		partitionState := &querypb.PartitionStates{
			PartitionID: partitionID,
			State:       state,
		}
		partitionStates = append(partitionStates, partitionState)
332 333
	}

334
	return &querypb.GetPartitionStatesResponse{
335 336 337
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
338
		PartitionDescriptions: partitionStates,
339 340 341
	}, nil
}

342
func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
343 344 345 346 347
	totalMemSize := int64(0)
	totalNumRows := int64(0)
	//TODO::get segment infos from meta
	//segmentIDs := req.SegmentIDs
	//segmentInfos, err := qs.meta.getSegmentInfos(segmentIDs)
348
	segmentInfos, err := qc.cluster.getSegmentInfo(ctx, req)
349
	if err != nil {
350
		return &querypb.GetSegmentInfoResponse{
351 352 353 354 355 356
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, err
	}
357 358 359
	for _, info := range segmentInfos {
		totalNumRows += info.NumRows
		totalMemSize += info.MemSize
360
	}
361
	log.Debug("getSegmentInfo", zap.Int64("num rows", totalNumRows), zap.Int64("memory size", totalMemSize))
362 363 364 365 366 367 368
	return &querypb.GetSegmentInfoResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		Infos: segmentInfos,
	}, nil
}