event_data.go 10.1 KB
Newer Older
S
sunby 已提交
1 2 3 4
package storage

import (
	"encoding/binary"
S
sunby 已提交
5
	"fmt"
S
sunby 已提交
6 7
	"io"

S
sunby 已提交
8 9
	"errors"

S
sunby 已提交
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)

type descriptorEventData struct {
	DescriptorEventDataFixPart
	PostHeaderLengths []uint8
}

type DescriptorEventDataFixPart struct {
	BinlogVersion   int16
	ServerVersion   int64
	CommitID        int64
	HeaderLength    int8
	CollectionID    int64
	PartitionID     int64
	SegmentID       int64
C
cai.zhang 已提交
27
	FieldID         int64
S
sunby 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41
	StartTimestamp  typeutil.Timestamp
	EndTimestamp    typeutil.Timestamp
	PayloadDataType schemapb.DataType
}

func (data *descriptorEventData) SetStartTimeStamp(ts typeutil.Timestamp) {
	data.StartTimestamp = ts
}

func (data *descriptorEventData) SetEndTimeStamp(ts typeutil.Timestamp) {
	data.EndTimestamp = ts
}

func (data *descriptorEventData) GetMemoryUsageInBytes() int32 {
N
neza2017 已提交
42
	return int32(binary.Size(data.DescriptorEventDataFixPart) + binary.Size(data.PostHeaderLengths))
S
sunby 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56
}

func (data *descriptorEventData) Write(buffer io.Writer) error {
	if err := binary.Write(buffer, binary.LittleEndian, data.DescriptorEventDataFixPart); err != nil {
		return err
	}

	if err := binary.Write(buffer, binary.LittleEndian, data.PostHeaderLengths); err != nil {
		return err
	}
	return nil
}

func readDescriptorEventData(buffer io.Reader) (*descriptorEventData, error) {
N
neza2017 已提交
57 58 59 60
	event, err := newDescriptorEventData()
	if err != nil {
		return nil, err
	}
S
sunby 已提交
61 62 63 64 65 66 67 68 69

	if err := binary.Read(buffer, binary.LittleEndian, &event.DescriptorEventDataFixPart); err != nil {
		return nil, err
	}

	if err := binary.Read(buffer, binary.LittleEndian, &event.PostHeaderLengths); err != nil {
		return nil, err
	}

N
neza2017 已提交
70
	return event, nil
S
sunby 已提交
71 72 73
}

type eventData interface {
N
neza2017 已提交
74
	GetEventDataFixPartSize() int32
S
sunby 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
	WriteEventData(buffer io.Writer) error
}

// all event types' fixed part only have start Timestamp and end Timestamp yet, but maybe different events will
// have different fields later, so we just create a event data struct per event type.
type insertEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

func (data *insertEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
	data.StartTimestamp = timestamp
}

func (data *insertEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
	data.EndTimestamp = timestamp
}

N
neza2017 已提交
93 94
func (data *insertEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
95 96 97
}

func (data *insertEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
98 99 100 101 102 103
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
S
sunby 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
	return binary.Write(buffer, binary.LittleEndian, data)
}

type deleteEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

func (data *deleteEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
	data.StartTimestamp = timestamp
}

func (data *deleteEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
	data.EndTimestamp = timestamp
}

N
neza2017 已提交
120 121
func (data *deleteEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
122 123 124
}

func (data *deleteEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
125 126 127 128 129 130
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
N
neza2017 已提交
131
	return binary.Write(buffer, binary.LittleEndian, data)
S
sunby 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
}

type createCollectionEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

func (data *createCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
	data.StartTimestamp = timestamp
}

func (data *createCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
	data.EndTimestamp = timestamp
}

N
neza2017 已提交
147 148
func (data *createCollectionEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
149 150 151
}

func (data *createCollectionEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
152 153 154 155 156 157
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
N
neza2017 已提交
158
	return binary.Write(buffer, binary.LittleEndian, data)
S
sunby 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
}

type dropCollectionEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

func (data *dropCollectionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
	data.StartTimestamp = timestamp
}

func (data *dropCollectionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
	data.EndTimestamp = timestamp
}

N
neza2017 已提交
174 175
func (data *dropCollectionEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
176 177 178
}

func (data *dropCollectionEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
179 180 181 182 183 184
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
N
neza2017 已提交
185
	return binary.Write(buffer, binary.LittleEndian, data)
S
sunby 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
}

type createPartitionEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

func (data *createPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
	data.StartTimestamp = timestamp
}

func (data *createPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
	data.EndTimestamp = timestamp
}

N
neza2017 已提交
201 202
func (data *createPartitionEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
203 204 205
}

func (data *createPartitionEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
206 207 208 209 210 211
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
N
neza2017 已提交
212
	return binary.Write(buffer, binary.LittleEndian, data)
S
sunby 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
}

type dropPartitionEventData struct {
	StartTimestamp typeutil.Timestamp
	EndTimestamp   typeutil.Timestamp
}

func (data *dropPartitionEventData) SetStartTimestamp(timestamp typeutil.Timestamp) {
	data.StartTimestamp = timestamp
}

func (data *dropPartitionEventData) SetEndTimestamp(timestamp typeutil.Timestamp) {
	data.EndTimestamp = timestamp
}

N
neza2017 已提交
228 229
func (data *dropPartitionEventData) GetEventDataFixPartSize() int32 {
	return int32(binary.Size(data))
S
sunby 已提交
230 231 232
}

func (data *dropPartitionEventData) WriteEventData(buffer io.Writer) error {
C
cai.zhang 已提交
233 234 235 236 237 238
	if data.StartTimestamp == 0 {
		return errors.New("hasn't set start time stamp")
	}
	if data.EndTimestamp == 0 {
		return errors.New("hasn't set end time stamp")
	}
N
neza2017 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
	return binary.Write(buffer, binary.LittleEndian, data)
}

func getEventFixPartSize(code EventTypeCode) int32 {
	switch code {
	case DescriptorEventType:
		return int32(binary.Size(descriptorEventData{}.DescriptorEventDataFixPart))
	case InsertEventType:
		return (&insertEventData{}).GetEventDataFixPartSize()
	case DeleteEventType:
		return (&deleteEventData{}).GetEventDataFixPartSize()
	case CreateCollectionEventType:
		return (&createCollectionEventData{}).GetEventDataFixPartSize()
	case DropCollectionEventType:
		return (&dropCollectionEventData{}).GetEventDataFixPartSize()
	case CreatePartitionEventType:
		return (&createCollectionEventData{}).GetEventDataFixPartSize()
	case DropPartitionEventType:
		return (&dropPartitionEventData{}).GetEventDataFixPartSize()
	default:
		return -1
S
sunby 已提交
260 261 262
	}
}

N
neza2017 已提交
263
func newDescriptorEventData() (*descriptorEventData, error) {
S
sunby 已提交
264 265 266 267 268 269 270 271
	data := descriptorEventData{
		DescriptorEventDataFixPart: DescriptorEventDataFixPart{
			BinlogVersion:   BinlogVersion,
			ServerVersion:   ServerVersion,
			CommitID:        CommitID,
			CollectionID:    -1,
			PartitionID:     -1,
			SegmentID:       -1,
C
cai.zhang 已提交
272
			FieldID:         -1,
S
sunby 已提交
273 274 275 276
			StartTimestamp:  0,
			EndTimestamp:    0,
			PayloadDataType: -1,
		},
N
neza2017 已提交
277 278 279 280 281
		PostHeaderLengths: []uint8{},
	}
	for i := DescriptorEventType; i < EventTypeEnd; i++ {
		size := getEventFixPartSize(i)
		if size == -1 {
S
sunby 已提交
282
			return nil, fmt.Errorf("undefined event type %d", i)
N
neza2017 已提交
283 284
		}
		data.PostHeaderLengths = append(data.PostHeaderLengths, uint8(size))
S
sunby 已提交
285
	}
N
neza2017 已提交
286
	return &data, nil
S
sunby 已提交
287 288
}

N
neza2017 已提交
289 290
func newInsertEventData() (*insertEventData, error) {
	return &insertEventData{
S
sunby 已提交
291 292
		StartTimestamp: 0,
		EndTimestamp:   0,
N
neza2017 已提交
293
	}, nil
S
sunby 已提交
294
}
N
neza2017 已提交
295 296
func newDeleteEventData() (*deleteEventData, error) {
	return &deleteEventData{
S
sunby 已提交
297 298
		StartTimestamp: 0,
		EndTimestamp:   0,
N
neza2017 已提交
299
	}, nil
S
sunby 已提交
300
}
N
neza2017 已提交
301 302
func newCreateCollectionEventData() (*createCollectionEventData, error) {
	return &createCollectionEventData{
S
sunby 已提交
303 304
		StartTimestamp: 0,
		EndTimestamp:   0,
N
neza2017 已提交
305
	}, nil
S
sunby 已提交
306
}
N
neza2017 已提交
307 308
func newDropCollectionEventData() (*dropCollectionEventData, error) {
	return &dropCollectionEventData{
S
sunby 已提交
309 310
		StartTimestamp: 0,
		EndTimestamp:   0,
N
neza2017 已提交
311
	}, nil
S
sunby 已提交
312
}
N
neza2017 已提交
313 314
func newCreatePartitionEventData() (*createPartitionEventData, error) {
	return &createPartitionEventData{
S
sunby 已提交
315 316
		StartTimestamp: 0,
		EndTimestamp:   0,
N
neza2017 已提交
317
	}, nil
S
sunby 已提交
318
}
N
neza2017 已提交
319 320
func newDropPartitionEventData() (*dropPartitionEventData, error) {
	return &dropPartitionEventData{
S
sunby 已提交
321 322
		StartTimestamp: 0,
		EndTimestamp:   0,
N
neza2017 已提交
323
	}, nil
S
sunby 已提交
324 325
}

N
neza2017 已提交
326
func readInsertEventDataFixPart(buffer io.Reader) (*insertEventData, error) {
S
sunby 已提交
327 328 329 330 331 332 333
	data := &insertEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}

N
neza2017 已提交
334
func readDeleteEventDataFixPart(buffer io.Reader) (*deleteEventData, error) {
S
sunby 已提交
335 336 337 338 339 340 341
	data := &deleteEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}

N
neza2017 已提交
342
func readCreateCollectionEventDataFixPart(buffer io.Reader) (*createCollectionEventData, error) {
S
sunby 已提交
343 344 345 346 347 348 349
	data := &createCollectionEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}

N
neza2017 已提交
350
func readDropCollectionEventDataFixPart(buffer io.Reader) (*dropCollectionEventData, error) {
S
sunby 已提交
351 352 353 354 355 356 357
	data := &dropCollectionEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}

N
neza2017 已提交
358
func readCreatePartitionEventDataFixPart(buffer io.Reader) (*createPartitionEventData, error) {
S
sunby 已提交
359 360 361 362 363 364 365
	data := &createPartitionEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}

N
neza2017 已提交
366
func readDropPartitionEventDataFixPart(buffer io.Reader) (*dropPartitionEventData, error) {
S
sunby 已提交
367 368 369 370 371 372
	data := &dropPartitionEventData{}
	if err := binary.Read(buffer, binary.LittleEndian, data); err != nil {
		return nil, err
	}
	return data, nil
}