historical.go 4.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

package querynode

import (
	"context"
16 17
	"errors"
	"fmt"
18

19
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
20 21 22
	"go.uber.org/zap"

	"github.com/milvus-io/milvus/internal/log"
23 24 25 26 27 28
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
)

type historical struct {
	replica      ReplicaInterface
29
	loader       *segmentLoader
30
	statsService *statsService
31 32 33

	//TODO
	globalSealedSegments []UniqueID
34 35 36
}

func newHistorical(ctx context.Context,
37 38
	rootCoord types.RootCoord,
	indexCoord types.IndexCoord,
39 40 41
	factory msgstream.Factory,
	etcdKV *etcdkv.EtcdKV) *historical {
	replica := newCollectionReplica(etcdKV)
42
	loader := newSegmentLoader(ctx, rootCoord, indexCoord, replica, etcdKV)
43
	ss := newStatsService(ctx, replica, loader.indexLoader.fieldStatsChan, factory)
44 45 46

	return &historical{
		replica:      replica,
47
		loader:       loader,
48 49 50 51 52 53 54 55 56 57 58 59 60 61
		statsService: ss,
	}
}

func (h *historical) start() {
	h.statsService.start()
}

func (h *historical) close() {
	h.statsService.close()

	// free collectionReplica
	h.replica.freeAll()
}
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82

func (h *historical) search(searchReqs []*searchRequest,
	collID UniqueID,
	partIDs []UniqueID,
	plan *Plan,
	searchTs Timestamp) ([]*SearchResult, []*Segment, error) {

	searchResults := make([]*SearchResult, 0)
	segmentResults := make([]*Segment, 0)

	// get historical partition ids
	var searchPartIDs []UniqueID
	if len(partIDs) == 0 {
		hisPartIDs, err := h.replica.getPartitionIDs(collID)
		if len(hisPartIDs) == 0 {
			// no partitions in collection, do empty search
			return nil, nil, nil
		}
		if err != nil {
			return searchResults, segmentResults, err
		}
83 84 85 86
		log.Debug("no partition specified, search all partitions",
			zap.Any("collectionID", collID),
			zap.Any("all partitions", hisPartIDs),
		)
87 88 89 90 91
		searchPartIDs = hisPartIDs
	} else {
		for _, id := range partIDs {
			_, err := h.replica.getPartitionByID(id)
			if err == nil {
92 93 94 95
				log.Debug("append search partition id",
					zap.Any("collectionID", collID),
					zap.Any("partitionID", id),
				)
96 97 98 99 100
				searchPartIDs = append(searchPartIDs, id)
			}
		}
	}

101 102 103 104 105
	col, err := h.replica.getCollectionByID(collID)
	if err != nil {
		return nil, nil, err
	}

106
	// all partitions have been released
107
	if len(searchPartIDs) == 0 && col.getLoadType() == loadTypePartition {
108 109 110 111 112 113
		return nil, nil, errors.New("partitions have been released , collectionID = " +
			fmt.Sprintln(collID) +
			"target partitionIDs = " +
			fmt.Sprintln(partIDs))
	}

114 115 116 117 118 119 120
	if len(searchPartIDs) == 0 && col.getLoadType() == loadTypeCollection {
		if err = col.checkReleasedPartitions(partIDs); err != nil {
			return nil, nil, err
		}
		return nil, nil, nil
	}

121 122 123 124 125 126
	log.Debug("doing search in historical",
		zap.Any("collectionID", collID),
		zap.Any("reqPartitionIDs", partIDs),
		zap.Any("searchPartitionIDs", searchPartIDs),
	)

127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
	for _, partID := range searchPartIDs {
		segIDs, err := h.replica.getSegmentIDs(partID)
		if err != nil {
			return searchResults, segmentResults, err
		}
		for _, segID := range segIDs {
			seg, err := h.replica.getSegmentByID(segID)
			if err != nil {
				return searchResults, segmentResults, err
			}
			if !seg.getOnService() {
				continue
			}
			searchResult, err := seg.segmentSearch(plan, searchReqs, []Timestamp{searchTs})
			if err != nil {
				return searchResults, segmentResults, err
			}
			searchResults = append(searchResults, searchResult)
			segmentResults = append(segmentResults, seg)
		}
	}

	return searchResults, segmentResults, nil
}