vector_chunk_manager.go 4.6 KB
Newer Older
G
godchen 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

package storage

import (
G
godchen 已提交
15
	"bytes"
G
godchen 已提交
16
	"encoding/binary"
G
godchen 已提交
17
	"errors"
18
	"io"
G
godchen 已提交
19

20
	"github.com/milvus-io/milvus/internal/common"
G
godchen 已提交
21 22 23
	"github.com/milvus-io/milvus/internal/proto/etcdpb"
)

24
// VectorChunkManager is responsible for read and write vector data.
G
godchen 已提交
25 26 27
type VectorChunkManager struct {
	localChunkManager  ChunkManager
	remoteChunkManager ChunkManager
28 29 30 31

	schema *etcdpb.CollectionMeta

	localCacheEnable bool
G
godchen 已提交
32 33
}

34
// NewVectorChunkManager create a new vector manager object.
35
func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager ChunkManager, schema *etcdpb.CollectionMeta, localCacheEnable bool) *VectorChunkManager {
G
godchen 已提交
36 37 38
	return &VectorChunkManager{
		localChunkManager:  localChunkManager,
		remoteChunkManager: remoteChunkManager,
39 40 41

		schema:           schema,
		localCacheEnable: localCacheEnable,
G
godchen 已提交
42 43 44
	}
}

45 46 47
// For vector data, we will download vector file from storage. And we will
// deserialize the file for it has binlog style. At last we store pure vector
// data to local storage as cache.
48
func (vcm *VectorChunkManager) downloadVectorFile(key string) ([]byte, error) {
G
godchen 已提交
49
	if vcm.localChunkManager.Exist(key) {
50
		return vcm.localChunkManager.Read(key)
G
godchen 已提交
51
	}
52
	insertCodec := NewInsertCodec(vcm.schema)
G
godchen 已提交
53
	content, err := vcm.remoteChunkManager.Read(key)
G
godchen 已提交
54
	if err != nil {
55
		return nil, err
G
godchen 已提交
56 57 58 59 60 61
	}
	blob := &Blob{
		Key:   key,
		Value: content,
	}

G
godchen 已提交
62
	_, _, data, err := insertCodec.Deserialize([]*Blob{blob})
G
godchen 已提交
63
	if err != nil {
64
		return nil, err
G
godchen 已提交
65
	}
66
	defer insertCodec.Close()
G
godchen 已提交
67

68
	var results []byte
G
godchen 已提交
69 70 71
	for _, singleData := range data.Data {
		binaryVector, ok := singleData.(*BinaryVectorFieldData)
		if ok {
72
			results = binaryVector.Data
G
godchen 已提交
73 74 75
		}
		floatVector, ok := singleData.(*FloatVectorFieldData)
		if ok {
G
godchen 已提交
76
			buf := new(bytes.Buffer)
77
			err := binary.Write(buf, common.Endian, floatVector.Data)
G
godchen 已提交
78
			if err != nil {
79
				return nil, err
G
godchen 已提交
80
			}
81
			results = buf.Bytes()
G
godchen 已提交
82 83
		}
	}
84
	return results, nil
G
godchen 已提交
85 86
}

87 88
// GetPath returns the path of vector data. If cached, return local path.
// If not cached return remote path.
G
godchen 已提交
89
func (vcm *VectorChunkManager) GetPath(key string) (string, error) {
90
	if vcm.localChunkManager.Exist(key) && vcm.localCacheEnable {
G
godchen 已提交
91 92
		return vcm.localChunkManager.GetPath(key)
	}
93
	return vcm.remoteChunkManager.GetPath(key)
G
godchen 已提交
94 95
}

96
// Write writes the vector data to local cache if cache enabled.
G
godchen 已提交
97
func (vcm *VectorChunkManager) Write(key string, content []byte) error {
98 99 100
	if !vcm.localCacheEnable {
		return errors.New("Cannot write local file for local cache is not allowed")
	}
G
godchen 已提交
101 102 103
	return vcm.localChunkManager.Write(key, content)
}

104
// Exist checks whether vector data is saved to local cache.
G
godchen 已提交
105 106 107 108
func (vcm *VectorChunkManager) Exist(key string) bool {
	return vcm.localChunkManager.Exist(key)
}

109
// Read reads the pure vector data. If cached, it reads from local.
G
godchen 已提交
110
func (vcm *VectorChunkManager) Read(key string) ([]byte, error) {
111 112 113 114 115 116 117 118 119 120 121 122
	if vcm.localCacheEnable {
		if vcm.localChunkManager.Exist(key) {
			return vcm.localChunkManager.Read(key)
		}
		bytes, err := vcm.downloadVectorFile(key)
		if err != nil {
			return nil, err
		}
		err = vcm.localChunkManager.Write(key, bytes)
		if err != nil {
			return nil, err
		}
G
godchen 已提交
123
		return vcm.localChunkManager.Read(key)
G
godchen 已提交
124
	}
125
	return vcm.downloadVectorFile(key)
G
godchen 已提交
126 127
}

128
// ReadAt reads specific position data of vector. If cached, it reads from local.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
func (vcm *VectorChunkManager) ReadAt(key string, p []byte, off int64) (int, error) {
	if vcm.localCacheEnable {
		if vcm.localChunkManager.Exist(key) {
			return vcm.localChunkManager.ReadAt(key, p, off)
		}
		bytes, err := vcm.downloadVectorFile(key)
		if err != nil {
			return -1, err
		}
		err = vcm.localChunkManager.Write(key, bytes)
		if err != nil {
			return -1, err
		}
		return vcm.localChunkManager.ReadAt(key, p, off)
	}
	bytes, err := vcm.downloadVectorFile(key)
	if err != nil {
		return -1, err
	}

	if bytes == nil {
		return 0, errors.New("vectorChunkManager: data downloaded is nil")
	}

	if off < 0 || int64(len(bytes)) < off {
		return 0, errors.New("vectorChunkManager: invalid offset")
	}
	n := copy(p, bytes[off:])
	if n < len(p) {
		return n, io.EOF
	}

	return n, nil
G
godchen 已提交
162
}