historical.go 4.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

package querynode

import (
	"context"
16 17
	"errors"
	"fmt"
18

19
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
20 21 22
	"go.uber.org/zap"

	"github.com/milvus-io/milvus/internal/log"
23 24 25 26 27 28
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
)

type historical struct {
	replica      ReplicaInterface
29
	loader       *segmentLoader
30
	statsService *statsService
31 32 33

	//TODO
	globalSealedSegments []UniqueID
34 35 36 37 38 39
}

func newHistorical(ctx context.Context,
	masterService types.MasterService,
	dataService types.DataService,
	indexService types.IndexService,
40 41 42 43
	factory msgstream.Factory,
	etcdKV *etcdkv.EtcdKV) *historical {
	replica := newCollectionReplica(etcdKV)
	loader := newSegmentLoader(ctx, masterService, indexService, dataService, replica, etcdKV)
44
	ss := newStatsService(ctx, replica, loader.indexLoader.fieldStatsChan, factory)
45 46 47

	return &historical{
		replica:      replica,
48
		loader:       loader,
49 50 51 52 53 54 55 56 57 58 59 60 61 62
		statsService: ss,
	}
}

func (h *historical) start() {
	h.statsService.start()
}

func (h *historical) close() {
	h.statsService.close()

	// free collectionReplica
	h.replica.freeAll()
}
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83

func (h *historical) search(searchReqs []*searchRequest,
	collID UniqueID,
	partIDs []UniqueID,
	plan *Plan,
	searchTs Timestamp) ([]*SearchResult, []*Segment, error) {

	searchResults := make([]*SearchResult, 0)
	segmentResults := make([]*Segment, 0)

	// get historical partition ids
	var searchPartIDs []UniqueID
	if len(partIDs) == 0 {
		hisPartIDs, err := h.replica.getPartitionIDs(collID)
		if len(hisPartIDs) == 0 {
			// no partitions in collection, do empty search
			return nil, nil, nil
		}
		if err != nil {
			return searchResults, segmentResults, err
		}
84 85 86 87
		log.Debug("no partition specified, search all partitions",
			zap.Any("collectionID", collID),
			zap.Any("all partitions", hisPartIDs),
		)
88 89 90 91 92
		searchPartIDs = hisPartIDs
	} else {
		for _, id := range partIDs {
			_, err := h.replica.getPartitionByID(id)
			if err == nil {
93 94 95 96
				log.Debug("append search partition id",
					zap.Any("collectionID", collID),
					zap.Any("partitionID", id),
				)
97 98 99 100 101
				searchPartIDs = append(searchPartIDs, id)
			}
		}
	}

102 103 104 105 106
	col, err := h.replica.getCollectionByID(collID)
	if err != nil {
		return nil, nil, err
	}

107
	// all partitions have been released
108
	if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition {
109 110 111 112 113 114
		return nil, nil, errors.New("partitions have been released , collectionID = " +
			fmt.Sprintln(collID) +
			"target partitionIDs = " +
			fmt.Sprintln(partIDs))
	}

115 116 117 118 119 120 121
	if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection {
		if err = col.checkReleasedPartitions(partIDs); err != nil {
			return nil, nil, err
		}
		return nil, nil, nil
	}

122 123 124 125 126 127
	log.Debug("doing search in historical",
		zap.Any("collectionID", collID),
		zap.Any("reqPartitionIDs", partIDs),
		zap.Any("searchPartitionIDs", searchPartIDs),
	)

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
	for _, partID := range searchPartIDs {
		segIDs, err := h.replica.getSegmentIDs(partID)
		if err != nil {
			return searchResults, segmentResults, err
		}
		for _, segID := range segIDs {
			seg, err := h.replica.getSegmentByID(segID)
			if err != nil {
				return searchResults, segmentResults, err
			}
			if !seg.getOnService() {
				continue
			}
			searchResult, err := seg.segmentSearch(plan, searchReqs, []Timestamp{searchTs})
			if err != nil {
				return searchResults, segmentResults, err
			}
			searchResults = append(searchResults, searchResult)
			segmentResults = append(segmentResults, seg)
		}
	}

	return searchResults, segmentResults, nil
}