rmq_client.go 3.3 KB
Newer Older
X
Xiangyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
Xiangyu Wang 已提交
12
package mqclient
Y
yukun 已提交
13 14

import (
G
godchen 已提交
15
	"errors"
Y
yukun 已提交
16 17
	"strconv"

X
xige-16 已提交
18 19
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
20 21
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq"
Y
yukun 已提交
22 23 24 25 26 27
)

type rmqClient struct {
	client rocksmq.Client
}

28
// NewRmqClient returns a new rmqClient object
Y
yukun 已提交
29 30 31
func NewRmqClient(opts rocksmq.ClientOptions) (*rmqClient, error) {
	c, err := rocksmq.NewClient(opts)
	if err != nil {
32
		log.Error("Failed to set rmq client: ", zap.Error(err))
Y
yukun 已提交
33 34 35 36 37
		return nil, err
	}
	return &rmqClient{client: c}, nil
}

38
// CreateProducer creates a producer for rocksmq client
X
Xiangyu Wang 已提交
39
func (rc *rmqClient) CreateProducer(options ProducerOptions) (Producer, error) {
Y
yukun 已提交
40 41 42 43 44 45 46 47 48
	rmqOpts := rocksmq.ProducerOptions{Topic: options.Topic}
	pp, err := rc.client.CreateProducer(rmqOpts)
	if err != nil {
		return nil, err
	}
	rp := rmqProducer{p: pp}
	return &rp, nil
}

G
godchen 已提交
49 50
//TODO fishpenguin: implementation
func (rc *rmqClient) CreateReader(options ReaderOptions) (Reader, error) {
G
godchen 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64
	opts := rocksmq.ReaderOptions{
		Topic:                   options.Topic,
		StartMessageID:          options.StartMessageID.(*rmqID).messageID,
		StartMessageIDInclusive: options.StartMessageIDInclusive,
	}
	pr, err := rc.client.CreateReader(opts)
	if err != nil {
		return nil, err
	}
	if pr == nil {
		return nil, errors.New("pulsar is not ready, producer is nil")
	}
	reader := &rmqReader{r: pr}
	return reader, nil
G
godchen 已提交
65 66
}

67
// Subscribe subscribes a consumer in rmq client
X
Xiangyu Wang 已提交
68
func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) {
G
godchen 已提交
69
	receiveChannel := make(chan rocksmq.Message, options.BufSize)
Y
yukun 已提交
70 71

	cli, err := rc.client.Subscribe(rocksmq.ConsumerOptions{
72 73 74 75
		Topic:                       options.Topic,
		SubscriptionName:            options.SubscriptionName,
		MessageChannel:              receiveChannel,
		SubscriptionInitialPosition: rocksmq.SubscriptionInitialPosition(options.SubscriptionInitialPosition),
Y
yukun 已提交
76 77 78 79 80
	})
	if err != nil {
		return nil, err
	}

G
godchen 已提交
81
	rConsumer := &RmqConsumer{c: cli, closeCh: make(chan struct{})}
Y
yukun 已提交
82 83 84 85

	return rConsumer, nil
}

86
// EarliestMessageID returns the earliest message ID for rmq client
X
Xiangyu Wang 已提交
87
func (rc *rmqClient) EarliestMessageID() MessageID {
Y
yukun 已提交
88 89 90 91
	rID := rocksmq.EarliestMessageID()
	return &rmqID{messageID: rID}
}

92
// StringToMsgID converts string id to MessageID
X
Xiangyu Wang 已提交
93
func (rc *rmqClient) StringToMsgID(id string) (MessageID, error) {
Y
yukun 已提交
94 95 96 97 98 99 100
	rID, err := strconv.ParseInt(id, 10, 64)
	if err != nil {
		return nil, err
	}
	return &rmqID{messageID: rID}, nil
}

Y
yukun 已提交
101
// BytesToMsgID converts a byte array to messageID
X
Xiangyu Wang 已提交
102 103
func (rc *rmqClient) BytesToMsgID(id []byte) (MessageID, error) {
	rID, err := DeserializeRmqID(id)
X
xige-16 已提交
104 105 106 107 108 109
	if err != nil {
		return nil, err
	}
	return &rmqID{messageID: rID}, nil
}

Y
yukun 已提交
110
func (rc *rmqClient) Close() {
111 112
	// TODO(yukun): What to do here?
	// rc.client.Close()
Y
yukun 已提交
113
}