rmq_client.go 2.7 KB
Newer Older
X
Xiangyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
Xiangyu Wang 已提交
12
package mqclient
Y
yukun 已提交
13 14 15 16

import (
	"strconv"

X
xige-16 已提交
17 18
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
19 20
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq"
Y
yukun 已提交
21 22 23 24 25 26
)

type rmqClient struct {
	client rocksmq.Client
}

27
// NewRmqClient returns a new rmqClient object
Y
yukun 已提交
28 29 30
func NewRmqClient(opts rocksmq.ClientOptions) (*rmqClient, error) {
	c, err := rocksmq.NewClient(opts)
	if err != nil {
31
		log.Error("Failed to set rmq client: ", zap.Error(err))
Y
yukun 已提交
32 33 34 35 36
		return nil, err
	}
	return &rmqClient{client: c}, nil
}

37
// CreateProducer creates a producer for rocksmq client
X
Xiangyu Wang 已提交
38
func (rc *rmqClient) CreateProducer(options ProducerOptions) (Producer, error) {
Y
yukun 已提交
39 40 41 42 43 44 45 46 47
	rmqOpts := rocksmq.ProducerOptions{Topic: options.Topic}
	pp, err := rc.client.CreateProducer(rmqOpts)
	if err != nil {
		return nil, err
	}
	rp := rmqProducer{p: pp}
	return &rp, nil
}

48
// Subscribe subscribes a consumer in rmq client
X
Xiangyu Wang 已提交
49
func (rc *rmqClient) Subscribe(options ConsumerOptions) (Consumer, error) {
Y
yukun 已提交
50 51 52
	receiveChannel := make(chan rocksmq.ConsumerMessage, options.BufSize)

	cli, err := rc.client.Subscribe(rocksmq.ConsumerOptions{
53 54 55 56
		Topic:                       options.Topic,
		SubscriptionName:            options.SubscriptionName,
		MessageChannel:              receiveChannel,
		SubscriptionInitialPosition: rocksmq.SubscriptionInitialPosition(options.SubscriptionInitialPosition),
Y
yukun 已提交
57 58 59 60 61
	})
	if err != nil {
		return nil, err
	}

G
godchen 已提交
62
	rConsumer := &RmqConsumer{c: cli, closeCh: make(chan struct{})}
Y
yukun 已提交
63 64 65 66

	return rConsumer, nil
}

67
// EarliestMessageID returns the earliest message ID for rmq client
X
Xiangyu Wang 已提交
68
func (rc *rmqClient) EarliestMessageID() MessageID {
Y
yukun 已提交
69 70 71 72
	rID := rocksmq.EarliestMessageID()
	return &rmqID{messageID: rID}
}

73
// StringToMsgID converts string id to MessageID
X
Xiangyu Wang 已提交
74
func (rc *rmqClient) StringToMsgID(id string) (MessageID, error) {
Y
yukun 已提交
75 76 77 78 79 80 81
	rID, err := strconv.ParseInt(id, 10, 64)
	if err != nil {
		return nil, err
	}
	return &rmqID{messageID: rID}, nil
}

X
Xiangyu Wang 已提交
82 83
func (rc *rmqClient) BytesToMsgID(id []byte) (MessageID, error) {
	rID, err := DeserializeRmqID(id)
X
xige-16 已提交
84 85 86 87 88 89
	if err != nil {
		return nil, err
	}
	return &rmqID{messageID: rID}, nil
}

Y
yukun 已提交
90
func (rc *rmqClient) Close() {
91 92
	// TODO(yukun): What to do here?
	// rc.client.Close()
Y
yukun 已提交
93
}