event_data.go 12.5 KB
Newer Older
G
godchen 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

S
sunby 已提交
12 13 14 15
package storage

import (
	"encoding/binary"
G
godchen 已提交
16
	"encoding/json"
S
sunby 已提交
17
	"errors"
C
Cai Yudong 已提交
18
	"io"
S
sunby 已提交
19

X
Xiangyu Wang 已提交
20 21
	"github.com/milvus-io/milvus/internal/proto/schemapb"
	"github.com/milvus-io/milvus/internal/util/typeutil"
S
sunby 已提交
22 23 24 25
)

type descriptorEventData struct {
	DescriptorEventDataFixPart
G
godchen 已提交
26 27 28
	ExtraLength       int32
	ExtraBytes        []byte
	Extras            map[string]interface{}
29
	PostHeaderLengths []uint8
S
sunby 已提交
30 31
}

32
// DescriptorEventDataFixPart is a memorty struct saves events' DescriptorEventData.
S
sunby 已提交
33 34 35 36
type DescriptorEventDataFixPart struct {
	CollectionID    int64
	PartitionID     int64
	SegmentID       int64
C
cai.zhang 已提交
37
	FieldID         int64
S
sunby 已提交
38 39 40 41 42
	StartTimestamp  typeutil.Timestamp
	EndTimestamp    typeutil.Timestamp
	PayloadDataType schemapb.DataType
}

43
// SetEventTimeStamp set the timestamp value of DescriptorEventDataFixPart.
C
Cai Yudong 已提交
44 45 46
func (data *descriptorEventData) SetEventTimeStamp(start typeutil.Timestamp, end typeutil.Timestamp) {
	data.StartTimestamp = start
	data.EndTimestamp = end
S
sunby 已提交
47 48
}

G
godchen 已提交
49
// GetEventDataFixPartSize returns the memory size of DescriptorEventDataFixPart.
C
Cai Yudong 已提交
50 51
func (data *descriptorEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data.DescriptorEventDataFixPart))
S
sunby 已提交
52 53
}

G
godchen 已提交
54
// GetMemoryUsageInBytes returns the memory size of DescriptorEventDataFixPart.
S
sunby 已提交
55
func (data *descriptorEventData) GetMemoryUsageInBytes() int32 {
G
godchen 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
	return data.GetEventDataFixPartSize() + int32(binary.Size(data.PostHeaderLengths)) + int32(binary.Size(data.ExtraLength)) + data.ExtraLength

}

// AddExtra add extra params to description event.
func (data *descriptorEventData) AddExtra(k string, v interface{}) {
	data.Extras[k] = v
}

// FinishExtra marshal extras to json format.
// Call before GetMemoryUsageInBytes to get a accurate length of description event.
func (data *descriptorEventData) FinishExtra() error {
	var err error
	data.ExtraBytes, err = json.Marshal(data.Extras)
	if err != nil {
		return err
	}
	data.ExtraLength = int32(len(data.ExtraBytes))
	return nil
S
sunby 已提交
75 76
}

77
// Write transfer DescriptorEventDataFixPart to binary buffer.
S
sunby 已提交
78 79 80 81 82 83 84
func (data *descriptorEventData) Write(buffer io.Writer) error {
	if err := binary.Write(buffer, binary.LittleEndian, data.DescriptorEventDataFixPart); err != nil {
		return err
	}
	if err := binary.Write(buffer, binary.LittleEndian, data.PostHeaderLengths); err != nil {
		return err
	}
G
godchen 已提交
85 86 87 88 89 90 91
	if err := binary.Write(buffer, binary.LittleEndian, data.ExtraLength); err != nil {
		return err
	}
	if err := binary.Write(buffer, binary.LittleEndian, data.ExtraBytes); err != nil {
		return err
	}

S
sunby 已提交
92 93 94 95
	return nil
}

func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) {
G
godchen 已提交
96
	event := newDescriptorEventData()
S
sunby 已提交
97 98 99 100 101 102
	if err := binary.Read(buffer, binary.LittleEndian, &event.DescriptorEventDataFixPart); err != nil {
		return nil, err
	}
	if err := binary.Read(buffer, binary.LittleEndian, &event.PostHeaderLengths); err != nil {
		return nil, err
	}
G
godchen 已提交
103 104 105 106 107 108 109 110 111 112 113 114

	if err := binary.Read(buffer, binary.LittleEndian, &event.ExtraLength); err != nil {
		return nil, err
	}
	event.ExtraBytes = make([]byte, event.ExtraLength)
	if err := binary.Read(buffer, binary.LittleEndian, &event.ExtraBytes); err != nil {
		return nil, err
	}
	if err := json.Unmarshal(event.ExtraBytes, &event.Extras); err != nil {
		return nil, err
	}

N
neza2017 已提交
115
	return event, nil
S
sunby 已提交
116 117 118
}

type eventData interface {
N
neza2017 已提交
119
	GetEventDataFixPartSize() int32
S
sunby 已提交
120 121 122 123 124 125 126 127 128 129
	WriteEventData(buffer io.Writer) error
}

// all event types' fixed part only have start Timestamp and end Timestamp yet, but maybe different events will
// have different fields later, so we just create a event data struct per event type.
type insertEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

C
Cai Yudong 已提交
130 131 132
func (data *insertEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
	data.StartTimestamp = start
	data.EndTimestamp = end
S
sunby 已提交
133 134
}

N
neza2017 已提交
135 136
func (data *insertEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
137 138 139
}

func (data *insertEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
140 141 142 143 144 145
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
S
sunby 已提交
146 147 148 149 150 151 152 153
	return binary.Write(buffer, binary.LittleEndian, data)
}

type deleteEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

C
Cai Yudong 已提交
154 155 156
func (data *deleteEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
	data.StartTimestamp = start
	data.EndTimestamp = end
S
sunby 已提交
157 158
}

N
neza2017 已提交
159 160
func (data *deleteEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
161 162 163
}

func (data *deleteEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
164 165 166 167 168 169
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
N
neza2017 已提交
170
	return binary.Write(buffer, binary.LittleEndian, data)
S
sunby 已提交
171 172 173 174 175 176 177
}

type createCollectionEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

C
Cai Yudong 已提交
178 179 180
func (data *createCollectionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
	data.StartTimestamp = start
	data.EndTimestamp = end
S
sunby 已提交
181 182
}

N
neza2017 已提交
183 184
func (data *createCollectionEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
185 186 187
}

func (data *createCollectionEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
188 189 190 191 192 193
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
N
neza2017 已提交
194
	return binary.Write(buffer, binary.LittleEndian, data)
S
sunby 已提交
195 196 197 198 199 200 201
}

type dropCollectionEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

C
Cai Yudong 已提交
202 203 204
func (data *dropCollectionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
	data.StartTimestamp = start
	data.EndTimestamp = end
S
sunby 已提交
205 206
}

N
neza2017 已提交
207 208
func (data *dropCollectionEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
209 210 211
}

func (data *dropCollectionEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
212 213 214 215 216 217
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
N
neza2017 已提交
218
	return binary.Write(buffer, binary.LittleEndian, data)
S
sunby 已提交
219 220 221 222 223 224 225
}

type createPartitionEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

C
Cai Yudong 已提交
226 227 228
func (data *createPartitionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
	data.StartTimestamp = start
	data.EndTimestamp = end
S
sunby 已提交
229 230
}

N
neza2017 已提交
231 232
func (data *createPartitionEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
233 234 235
}

func (data *createPartitionEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
236 237 238 239 240 241
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
N
neza2017 已提交
242
	return binary.Write(buffer, binary.LittleEndian, data)
S
sunby 已提交
243 244 245 246 247 248 249
}

type dropPartitionEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

C
Cai Yudong 已提交
250 251 252
func (data *dropPartitionEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
	data.StartTimestamp = start
	data.EndTimestamp = end
S
sunby 已提交
253 254
}

N
neza2017 已提交
255 256
func (data *dropPartitionEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
257 258 259
}

func (data *dropPartitionEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
260 261 262 263 264 265
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
N
neza2017 已提交
266 267 268
	return binary.Write(buffer, binary.LittleEndian, data)
}

269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
type indexFileEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

func (data *indexFileEventData) SetEventTimestamp(start typeutil.Timestamp, end typeutil.Timestamp) {
	data.StartTimestamp = start
	data.EndTimestamp = end
}

func (data *indexFileEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
}

func (data *indexFileEventData) WriteEventData(buffer io.Writer) error {
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
	return binary.Write(buffer, binary.LittleEndian, data)
}

N
neza2017 已提交
293 294 295
func getEventFixPartSize(code EventTypeCode) int32 {
	switch code {
	case DescriptorEventType:
C
Cai Yudong 已提交
296
		return (&descriptorEventData{}).GetEventDataFixPartSize()
N
neza2017 已提交
297 298 299 300 301 302 303 304 305
	case InsertEventType:
		return (&insertEventData{}).GetEventDataFixPartSize()
	case DeleteEventType:
		return (&deleteEventData{}).GetEventDataFixPartSize()
	case CreateCollectionEventType:
		return (&createCollectionEventData{}).GetEventDataFixPartSize()
	case DropCollectionEventType:
		return (&dropCollectionEventData{}).GetEventDataFixPartSize()
	case CreatePartitionEventType:
G
godchen 已提交
306
		return (&createPartitionEventData{}).GetEventDataFixPartSize()
N
neza2017 已提交
307 308
	case DropPartitionEventType:
		return (&dropPartitionEventData{}).GetEventDataFixPartSize()
309 310
	case IndexFileEventType:
		return (&indexFileEventData{}).GetEventDataFixPartSize()
N
neza2017 已提交
311 312
	default:
		return -1
S
sunby 已提交
313 314 315
	}
}

G
godchen 已提交
316
func newDescriptorEventData() *descriptorEventData {
S
sunby 已提交
317 318 319 320 321
	data := descriptorEventData{
		DescriptorEventDataFixPart: DescriptorEventDataFixPart{
			CollectionID:    -1,
			PartitionID:     -1,
			SegmentID:       -1,
C
cai.zhang 已提交
322
			FieldID:         -1,
S
sunby 已提交
323 324 325 326
			StartTimestamp:  0,
			EndTimestamp:    0,
			PayloadDataType: -1,
		},
N
neza2017 已提交
327
		PostHeaderLengths: []uint8{},
G
godchen 已提交
328
		Extras:            make(map[string]interface{}),
N
neza2017 已提交
329 330 331 332
	}
	for i := DescriptorEventType; i < EventTypeEnd; i++ {
		size := getEventFixPartSize(i)
		data.PostHeaderLengths = append(data.PostHeaderLengths, uint8(size))
S
sunby 已提交
333
	}
G
godchen 已提交
334
	return &data
S
sunby 已提交
335 336
}

G
godchen 已提交
337
func newInsertEventData() *insertEventData {
N
neza2017 已提交
338
	return &insertEventData{
S
sunby 已提交
339 340
		StartTimestamp: 0,
		EndTimestamp:   0,
G
godchen 已提交
341
	}
S
sunby 已提交
342
}
G
godchen 已提交
343
func newDeleteEventData() *deleteEventData {
N
neza2017 已提交
344
	return &deleteEventData{
S
sunby 已提交
345 346
		StartTimestamp: 0,
		EndTimestamp:   0,
G
godchen 已提交
347
	}
S
sunby 已提交
348
}
G
godchen 已提交
349
func newCreateCollectionEventData() *createCollectionEventData {
N
neza2017 已提交
350
	return &createCollectionEventData{
S
sunby 已提交
351 352
		StartTimestamp: 0,
		EndTimestamp:   0,
G
godchen 已提交
353
	}
S
sunby 已提交
354
}
G
godchen 已提交
355
func newDropCollectionEventData() *dropCollectionEventData {
N
neza2017 已提交
356
	return &dropCollectionEventData{
S
sunby 已提交
357 358
		StartTimestamp: 0,
		EndTimestamp:   0,
G
godchen 已提交
359
	}
S
sunby 已提交
360
}
G
godchen 已提交
361
func newCreatePartitionEventData() *createPartitionEventData {
N
neza2017 已提交
362
	return &createPartitionEventData{
S
sunby 已提交
363 364
		StartTimestamp: 0,
		EndTimestamp:   0,
G
godchen 已提交
365
	}
S
sunby 已提交
366
}
G
godchen 已提交
367
func newDropPartitionEventData() *dropPartitionEventData {
N
neza2017 已提交
368
	return &dropPartitionEventData{
S
sunby 已提交
369 370
		StartTimestamp: 0,
		EndTimestamp:   0,
G
godchen 已提交
371
	}
S
sunby 已提交
372
}
373 374 375 376 377 378
func newIndexFileEventData() *indexFileEventData {
	return &indexFileEventData{
		StartTimestamp: 0,
		EndTimestamp:   0,
	}
}
S
sunby 已提交
379

N
neza2017 已提交
380
func readInsertEventDataFixPart(buffer io.Reader) (*insertEventData, error) {
S
sunby 已提交
381 382 383 384 385 386 387
	data := &insertEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}

N
neza2017 已提交
388
func readDeleteEventDataFixPart(buffer io.Reader) (*deleteEventData, error) {
S
sunby 已提交
389 390 391 392 393 394 395
	data := &deleteEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}

N
neza2017 已提交
396
func readCreateCollectionEventDataFixPart(buffer io.Reader) (*createCollectionEventData, error) {
S
sunby 已提交
397 398 399 400 401 402 403
	data := &createCollectionEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}

N
neza2017 已提交
404
func readDropCollectionEventDataFixPart(buffer io.Reader) (*dropCollectionEventData, error) {
S
sunby 已提交
405 406 407 408 409 410 411
	data := &dropCollectionEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}

N
neza2017 已提交
412
func readCreatePartitionEventDataFixPart(buffer io.Reader) (*createPartitionEventData, error) {
S
sunby 已提交
413 414 415 416 417 418 419
	data := &createPartitionEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}

N
neza2017 已提交
420
func readDropPartitionEventDataFixPart(buffer io.Reader) (*dropPartitionEventData, error) {
S
sunby 已提交
421 422 423 424 425 426
	data := &dropPartitionEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}
427 428 429 430 431 432 433 434

func readIndexFileEventDataFixPart(buffer io.Reader) (*indexFileEventData, error) {
	data := &indexFileEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}