pulsar.go 705 字节
Newer Older
1 2 3 4 5 6 7
package informer

import (
	"log"
	"strconv"
	"time"

8
	"github.com/zilliztech/milvus-distributed/internal/conf"
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34

	"github.com/apache/pulsar-client-go/pulsar"
)

func NewPulsarClient() PulsarClient {
	pulsarAddr := "pulsar://"
	pulsarAddr += conf.Config.Pulsar.Address
	pulsarAddr += ":"
	pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               pulsarAddr,
		OperationTimeout:  30 * time.Second,
		ConnectionTimeout: 30 * time.Second,
	})
	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	}

	return PulsarClient{
		Client: client,
	}
}

type PulsarClient struct {
	Client pulsar.Client
}