提交 a46824c8 编写于 作者: U Ulric Qin

Merge branch 'master' of github.com:didi/nightingale

......@@ -16,8 +16,12 @@ backend:
opentsdb:
enabled: false
address: "127.0.0.1:4242"
kafka:
enabled: false
brokersPeers: "192.168.1.1:9092,192.168.1.2:9092"
topic: "n9e"
logger:
dir: logs/transfer
level: WARNING
keepHours: 2
\ No newline at end of file
keepHours: 2
......@@ -3,6 +3,8 @@ module github.com/didi/nightingale
go 1.12
require (
github.com/Shopify/sarama v1.26.4
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/cespare/xxhash v1.1.0
github.com/codegangsta/negroni v1.0.0
github.com/dgryski/go-tsz v0.0.0-20180227144327-03b7d791f4fe
......@@ -10,6 +12,7 @@ require (
github.com/gin-contrib/pprof v1.2.1
github.com/gin-contrib/sessions v0.0.3
github.com/gin-gonic/gin v1.5.0
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-sql-driver/mysql v1.4.1
github.com/gorilla/mux v1.6.2
github.com/hpcloud/tail v1.0.0
......
此差异已折叠。
......@@ -109,13 +109,29 @@ func (m *MetricValue) CheckValidity(now int64) (err error) {
}
}
if len(m.Metric) > 255 {
if len(m.TagsMap) > 12 {
err = fmt.Errorf("tagkv count is too large > 12")
}
if len(m.Metric) > 128 {
err = fmt.Errorf("len(m.Metric) is too large")
return
}
for k, v := range m.TagsMap {
delete(m.TagsMap, k)
k = filterString(k)
v = filterString(v)
if len(k) == 0 || len(v) == 0 {
err = fmt.Errorf("tag key and value should not be empty")
return
}
m.TagsMap[k] = v
}
m.Tags = SortedTags(m.TagsMap)
if len(m.Tags) > 255 {
if len(m.Tags) > 512 {
err = fmt.Errorf("len(m.Tags) is too large")
return
}
......@@ -174,6 +190,37 @@ func HasReservedWords(str string) bool {
return idx != -1
}
func filterString(str string) string {
if -1 == strings.IndexFunc(str,
func(r rune) bool {
return r == '\t' ||
r == '\r' ||
r == '\n' ||
r == ',' ||
r == ' ' ||
r == ':' ||
r == '='
}) {
return str
}
return strings.Map(func(r rune) rune {
if r == '\t' ||
r == '\r' ||
r == '\n' ||
r == ',' ||
r == ' ' ||
r == ':' ||
r == '=' {
return '_'
}
return r
}, str)
return str
}
func SortedTags(tags map[string]string) string {
if tags == nil {
return ""
......
......@@ -458,10 +458,42 @@ func GetCollectById(collectType string, cid int64) (interface{}, error) {
}
}
func GetCollectByName(collectType string, name string) (interface{}, error) {
var collect interface{}
_, err := DB["mon"].Table(collectType+"_collect").Where("name = ?", name).Get(&collect)
return collect, err
func GetCollectByNameAndNid(collectType string, name string, nid int64) (interface{}, error) {
switch collectType {
case "port":
collect := new(PortCollect)
has, err := DB["mon"].Where("name = ? and nid = ?", name, nid).Get(collect)
if !has {
return nil, err
}
return collect, err
case "proc":
collect := new(ProcCollect)
has, err := DB["mon"].Where("name = ? and nid = ?", name, nid).Get(collect)
if !has {
return nil, err
}
return collect, err
case "log":
collect := new(LogCollect)
has, err := DB["mon"].Where("name = ? and nid = ?", name, nid).Get(collect)
if !has {
return nil, err
}
collect.Decode()
return collect, err
case "plugin":
collect := new(PluginCollect)
has, err := DB["mon"].Where("name = ? and nid = ?", name, nid).Get(collect)
if !has {
return nil, err
}
return collect, err
default:
return nil, fmt.Errorf("采集类型不合法")
}
return nil, nil
}
func DeleteCollectById(collectType, creator string, cid int64) error {
......
package query
import (
"fmt"
"sync"
"time"
"github.com/didi/nightingale/src/toolkits/report"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/toolkits/pkg/logger"
)
var IndexList IndexAddrs
type IndexAddrs struct {
sync.RWMutex
Data []string
}
func (i *IndexAddrs) Set(addrs []string) {
i.Lock()
defer i.Unlock()
i.Data = addrs
}
func (i *IndexAddrs) Get() []string {
i.RLock()
defer i.RUnlock()
return i.Data
}
func GetIndexLoop() {
t1 := time.NewTicker(time.Duration(9) * time.Second)
GetIndex()
for {
<-t1.C
GetIndex()
}
}
func GetIndex() {
instances, err := report.GetAlive("index", "monapi")
if err != nil {
stats.Counter.Set("get.index.err", 1)
logger.Warningf("get index list err:%v", err)
return
}
activeIndexs := []string{}
for _, instance := range instances {
activeIndexs = append(activeIndexs, fmt.Sprintf("%s:%s", instance.Identity, instance.HTTPPort))
}
IndexList.Set(activeIndexs)
return
}
......@@ -28,4 +28,6 @@ func Init(cfg SeriesQuerySection) {
TransferConnPools = pools.NewConnPools(
Config.MaxConn, Config.MaxIdle, Config.ConnTimeout, Config.CallTimeout, address.GetRPCAddresses("transfer"),
)
go GetIndexLoop()
}
......@@ -9,7 +9,6 @@ import (
"github.com/didi/nightingale/src/dataobj"
"github.com/didi/nightingale/src/modules/judge/cache"
"github.com/didi/nightingale/src/toolkits/address"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/didi/nightingale/src/toolkits/str"
......@@ -186,7 +185,7 @@ type IndexResp struct {
// index的xclude 不支持批量查询, 暂时不做
func Xclude(request *IndexReq) ([]IndexData, error) {
addrs := address.GetHTTPAddresses("index")
addrs := IndexList.Get()
if len(addrs) == 0 {
return nil, errors.New("empty index addr")
}
......
package cron
import (
"time"
"github.com/didi/nightingale/src/model"
"github.com/toolkits/pkg/logger"
)
func CleanCollectLoop() {
duration := time.Second * time.Duration(300)
for {
time.Sleep(duration)
CleanCollect()
}
}
//定期清理没有找到nid的采集策略
func CleanCollect() {
var list []interface{}
collects, err := model.GetPortCollects()
if err != nil {
logger.Warningf("get collect err: %v", err)
}
for _, collect := range collects {
list = append(list, collect)
}
procCollects, err := model.GetProcCollects()
if err != nil {
logger.Warningf("get collect err: %v", err)
}
for _, collect := range procCollects {
list = append(list, collect)
}
logCollects, err := model.GetLogCollects()
if err != nil {
logger.Warningf("get collect err: %v", err)
}
for _, collect := range logCollects {
list = append(list, collect)
}
pluginCollects, err := model.GetPluginCollects()
if err != nil {
logger.Warningf("get collect err: %v", err)
}
for _, collect := range pluginCollects {
list = append(list, collect)
}
for _, collect := range list {
var nid, id int64
var collectType string
switch collect.(type) {
case *model.ProcCollect:
nid = collect.(*model.ProcCollect).Nid
id = collect.(*model.ProcCollect).Id
collectType = collect.(*model.ProcCollect).CollectType
case *model.PortCollect:
nid = collect.(*model.PortCollect).Nid
id = collect.(*model.PortCollect).Id
collectType = collect.(*model.PortCollect).CollectType
case *model.LogCollect:
nid = collect.(*model.LogCollect).Nid
id = collect.(*model.LogCollect).Id
collectType = collect.(*model.LogCollect).CollectType
case *model.PluginCollect:
nid = collect.(*model.PluginCollect).Nid
id = collect.(*model.PluginCollect).Id
collectType = collect.(*model.PluginCollect).CollectType
}
node, err := model.NodeGet("id", nid)
if err != nil {
logger.Warningf("get node failed, node id: %d, err: %v", nid, err)
continue
}
if node == nil {
logger.Infof("delete collect: %+v", collect)
if err := model.DeleteCollectById(collectType, "sys", id); err != nil {
logger.Warningf("delete collect %s: %d, err: %v", collectType, id, err)
}
}
}
}
......@@ -48,10 +48,12 @@ func collectPost(c *gin.Context) {
nid := collect.Nid
name := collect.Name
old, _ := model.GetCollectByName(obj.Type, name)
if old != nil && int64(old.(map[string]interface{})["nid"].(float64)) == nid {
old, err := model.GetCollectByNameAndNid(obj.Type, name, nid)
errors.Dangerous(err)
if old != nil {
errors.Bomb("同节点下策略名称 %s 已存在", name)
}
errors.Dangerous(model.CreateCollect(obj.Type, creator, collect))
case "proc":
......@@ -73,8 +75,9 @@ func collectPost(c *gin.Context) {
nid := collect.Nid
name := collect.Name
old, _ := model.GetCollectByName(obj.Type, name)
if old != nil && int64(old.(map[string]interface{})["nid"].(float64)) == nid {
old, err := model.GetCollectByNameAndNid(obj.Type, name, nid)
errors.Dangerous(err)
if old != nil {
errors.Bomb("同节点下策略名称 %s 已存在", name)
}
errors.Dangerous(model.CreateCollect(obj.Type, creator, collect))
......@@ -98,10 +101,12 @@ func collectPost(c *gin.Context) {
nid := collect.Nid
name := collect.Name
old, _ := model.GetCollectByName(obj.Type, name)
if old != nil && int64(old.(map[string]interface{})["nid"].(float64)) == nid {
old, err := model.GetCollectByNameAndNid(obj.Type, name, nid)
errors.Dangerous(err)
if old != nil {
errors.Bomb("同节点下策略名称 %s 已存在", name)
}
errors.Dangerous(model.CreateCollect(obj.Type, creator, collect))
case "plugin":
......@@ -123,10 +128,12 @@ func collectPost(c *gin.Context) {
nid := collect.Nid
name := collect.Name
old, _ := model.GetCollectByName(obj.Type, name)
if old != nil && int64(old.(map[string]interface{})["nid"].(float64)) == nid {
old, err := model.GetCollectByNameAndNid(obj.Type, name, nid)
errors.Dangerous(err)
if old != nil {
errors.Bomb("同节点下策略名称 %s 已存在", name)
}
errors.Dangerous(model.CreateCollect(obj.Type, creator, collect))
default:
......@@ -208,9 +215,9 @@ func collectPut(c *gin.Context) {
collect.Creator = creator
collect.LastUpdator = creator
old, _ := model.GetCollectByName(recv.Type, name)
if old != nil && int64(old.(map[string]interface{})["nid"].(float64)) == nid &&
tmpId != collect.Id {
old, err := model.GetCollectByNameAndNid(recv.Type, name, nid)
errors.Dangerous(err)
if old != nil && old.(*model.PortCollect).Id != tmpId {
errors.Bomb("同节点下策略名称 %s 已存在", name)
}
......@@ -247,9 +254,9 @@ func collectPut(c *gin.Context) {
collect.Creator = creator
collect.LastUpdator = creator
old, _ := model.GetCollectByName(recv.Type, name)
if old != nil && int64(old.(map[string]interface{})["nid"].(float64)) == nid &&
tmpId != collect.Id {
old, err := model.GetCollectByNameAndNid(recv.Type, name, nid)
errors.Dangerous(err)
if old != nil && old.(*model.ProcCollect).Id != tmpId {
errors.Bomb("同节点下策略名称 %s 已存在", name)
}
......@@ -287,9 +294,9 @@ func collectPut(c *gin.Context) {
collect.Creator = creator
collect.LastUpdator = creator
old, _ := model.GetCollectByName(recv.Type, name)
if old != nil && int64(old.(map[string]interface{})["nid"].(float64)) == nid &&
tmpId != collect.Id {
old, err := model.GetCollectByNameAndNid(recv.Type, name, nid)
errors.Dangerous(err)
if old != nil && old.(*model.LogCollect).Id != tmpId {
errors.Bomb("同节点下策略名称 %s 已存在", name)
}
......@@ -326,9 +333,9 @@ func collectPut(c *gin.Context) {
collect.Creator = creator
collect.LastUpdator = creator
old, _ := model.GetCollectByName(recv.Type, name)
if old != nil && int64(old.(map[string]interface{})["nid"].(float64)) == nid &&
tmpId != collect.Id {
old, err := model.GetCollectByNameAndNid(recv.Type, name, nid)
errors.Dangerous(err)
if old != nil && old.(*model.PluginCollect).Id != tmpId {
errors.Bomb("同节点下策略名称 %s 已存在", name)
}
......
......@@ -88,6 +88,7 @@ func main() {
go cron.SyncMaskconfLoop()
go cron.SyncStraLoop()
go cron.CleanStraLoop()
go cron.CleanCollectLoop()
go cron.EventConsumer()
go cron.CallbackConsumer()
go cron.CleanEventLoop()
......
......@@ -36,6 +36,16 @@ type OpenTsdbSection struct {
Address string `yaml:"address"`
}
type KafkaSection struct {
Enabled bool `yaml:"enabled"`
Topic string `yaml:"topic"`
BrokersPeers string `yaml:"brokersPeers"`
SaslUser string `yaml:"saslUser"`
SaslPasswd string `yaml:"saslPasswd"`
Retry int `yaml:"retry"`
KeepAlive int64 `yaml:"keepAlive"`
}
type BackendSection struct {
Enabled bool `yaml:"enabled"`
Batch int `yaml:"batch"`
......@@ -53,6 +63,7 @@ type BackendSection struct {
ClusterList map[string]*ClusterNode `json:"clusterList"`
Influxdb InfluxdbSection `yaml:"influxdb"`
OpenTsdb OpenTsdbSection `yaml:"opentsdb"`
Kafka KafkaSection `yaml:"kafka"`
}
const DefaultSendQueueMaxSize = 102400 //10.24w
......@@ -71,6 +82,7 @@ var (
JudgeQueues = cache.SafeJudgeQueue{}
InfluxdbQueue *list.SafeListLimited
OpenTsdbQueue *list.SafeListLimited
KafkaQueue = make(chan KafkaData, 10)
// 连接池 node_address -> connection_pool
TsdbConnPools *pools.ConnPools
......
package backend
import (
"encoding/json"
"errors"
"fmt"
"github.com/Shopify/sarama"
"github.com/toolkits/pkg/logger"
"os"
"strings"
"time"
)
type KafkaData map[string]interface{}
type KfClient struct {
producer sarama.AsyncProducer
cfg *sarama.Config
Topic string
BrokersPeers []string
ticker *time.Ticker
}
func NewKfClient(c KafkaSection) (kafkaSender *KfClient, err error) {
topic := c.Topic
if len(topic) == 0 {
err = errors.New("topic is nil")
return
}
brokers := strings.Split(c.BrokersPeers, ",")
if len(brokers) == 0 {
err = errors.New("brokers is nil")
return
}
hostName, _ := os.Hostname()
cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true
cfg.Producer.Return.Errors = true
if len(hostName) > 0 {
cfg.ClientID = hostName
}
cfg.Producer.Partitioner = func(topic string) sarama.Partitioner { return sarama.NewRoundRobinPartitioner(topic) }
if len(c.SaslUser) > 0 && len(c.SaslPasswd) > 0 {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = c.SaslUser
cfg.Net.SASL.Password = c.SaslPasswd
}
if c.Retry > 0 {
cfg.Producer.Retry.Max = c.Retry
}
cfg.Net.DialTimeout = time.Duration(connTimeout) * time.Millisecond
if c.KeepAlive > 0 {
cfg.Net.KeepAlive = time.Duration(c.KeepAlive) * time.Millisecond
}
producer, err := sarama.NewAsyncProducer(brokers, cfg)
if err != nil {
return
}
kafkaSender = newSender(brokers, topic, cfg, producer)
return
}
func newSender(brokers []string, topic string, cfg *sarama.Config, producer sarama.AsyncProducer) (kf *KfClient) {
kf = &KfClient{
producer: producer,
Topic: topic,
BrokersPeers: brokers,
ticker: time.NewTicker(time.Millisecond * time.Duration(callTimeout)),
}
go kf.readMessageToErrorChan()
return
}
func (kf *KfClient) readMessageToErrorChan() {
var producer = kf.producer
for {
select {
case <-producer.Successes():
case errMsg := <-producer.Errors():
msg, _ := errMsg.Msg.Value.Encode()
logger.Errorf("ReadMessageToErrorChan err:%v %v", errMsg.Error(), string(msg))
}
}
}
func (kf *KfClient) Send(data KafkaData) error {
var producer = kf.producer
message, err := kf.getEventMessage(data)
if err != nil {
logger.Errorf("Dropping event: %v", err)
return err
}
select {
case producer.Input() <- message:
case <-kf.ticker.C:
return fmt.Errorf("send kafka failed:%v[%v]", kf.Topic, kf.BrokersPeers)
}
return nil
}
func (kf *KfClient) Close() error {
logger.Infof("kafka sender(%s) was closed", kf.Topic, kf.BrokersPeers)
_ = kf.producer.Close()
kf.producer = nil
return nil
}
func (kf *KfClient) getEventMessage(event map[string]interface{}) (pm *sarama.ProducerMessage, err error) {
value, err := json.Marshal(event)
if err != nil {
return
}
pm = &sarama.ProducerMessage{
Topic: kf.Topic,
Value: sarama.StringEncoder(string(value)),
}
return
}
......@@ -74,6 +74,9 @@ func startSendTasks() {
}
if Config.Kafka.Enabled {
go send2KafkaTask()
}
}
func Send2TsdbTask(Q *list.SafeListLimited, node, addr string, concurrent int) {
......@@ -495,3 +498,38 @@ func convert2OpenTsdbItem(d *dataobj.MetricValue) *dataobj.OpenTsdbItem {
t.Value = d.Value
return &t
}
func Push2KafkaSendQueue(items []*dataobj.MetricValue) {
for _, item := range items {
KafkaQueue <- convert2KafkaItem(item)
}
}
func convert2KafkaItem(d *dataobj.MetricValue) KafkaData {
m := make(KafkaData)
m["metric"] = d.Metric
m["value"] = d.Value
m["timestamp"] = d.Timestamp
m["value"] = d.Value
m["step"] = d.Step
m["endpoint"] = d.Endpoint
m["tags"] = d.Tags
return m
}
func send2KafkaTask() {
kf, err := NewKfClient(Config.Kafka)
if err != nil {
logger.Errorf("init kafka client fail: %v", err)
return
}
defer kf.Close()
for {
kafkaItem := <-KafkaQueue
stats.Counter.Set("points.out.kafka", 1)
err = kf.Send(kafkaItem)
if err != nil {
stats.Counter.Set("points.out.kafka.err", 1)
logger.Errorf("send %v to kafka %s fail: %v", kafkaItem, Config.Kafka.BrokersPeers, err)
}
}
}
......@@ -48,6 +48,18 @@ func PushData(c *gin.Context) {
backend.Push2JudgeSendQueue(metricValues)
}
if backend.Config.Influxdb.Enabled {
backend.Push2InfluxdbSendQueue(metricValues)
}
if backend.Config.OpenTsdb.Enabled {
backend.Push2OpenTsdbSendQueue(metricValues)
}
if backend.Config.Kafka.Enabled {
backend.Push2KafkaSendQueue(metricValues)
}
if msg != "" {
render.Message(c, msg)
return
......
......@@ -52,6 +52,9 @@ func (t *Transfer) Push(args []*dataobj.MetricValue, reply *dataobj.TransferResp
backend.Push2OpenTsdbSendQueue(items)
}
if backend.Config.Kafka.Enabled {
backend.Push2KafkaSendQueue(items)
}
if reply.Invalid == 0 {
reply.Msg = "ok"
}
......
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
*.test
# Folders
_obj
_test
.vagrant
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
coverage.txt
profile.out
run:
timeout: 5m
deadline: 10m
linters-settings:
govet:
check-shadowing: false
golint:
min-confidence: 0
gocyclo:
min-complexity: 99
maligned:
suggest-new: true
dupl:
threshold: 100
goconst:
min-len: 2
min-occurrences: 3
misspell:
locale: US
goimports:
local-prefixes: github.com/Shopify/sarama
gocritic:
enabled-tags:
- diagnostic
- experimental
- opinionated
- performance
- style
disabled-checks:
- wrapperFunc
- ifElseChain
funlen:
lines: 300
statements: 300
linters:
disable-all: true
enable:
- bodyclose
- deadcode
- depguard
- dogsled
# - dupl
- errcheck
- funlen
# - gocritic
- gocyclo
- gofmt
- goimports
# - golint
- gosec
# - gosimple
- govet
# - ineffassign
- interfacer
# - misspell
# - nakedret
# - scopelint
# - staticcheck
- structcheck
# - stylecheck
- typecheck
- unconvert
- unused
- varcheck
- whitespace
# - goconst
# - gochecknoinits
issues:
exclude:
- consider giving a name to these results
- include an explanation for nolint directive
此差异已折叠。
Copyright (c) 2013 Shopify
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
default: fmt get update test lint
GO := GO111MODULE=on GOPRIVATE=github.com/linkedin GOSUMDB=off go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOTEST := $(GO) test -gcflags='-l' -p 3 -v -race -timeout 6m -coverprofile=profile.out -covermode=atomic
FILES := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -not -name '*_test.go')
TESTS := $(shell find . -name '*.go' -type f -not -name '*.pb.go' -not -name '*_generated.go' -name '*_test.go')
get:
$(GO) get ./...
$(GO) mod verify
$(GO) mod tidy
update:
$(GO) get -u -v all
$(GO) mod verify
$(GO) mod tidy
fmt:
gofmt -s -l -w $(FILES) $(TESTS)
lint:
golangci-lint run
test:
$(GOTEST) ./...
# sarama
[![GoDoc](https://godoc.org/github.com/Shopify/sarama?status.svg)](https://godoc.org/github.com/Shopify/sarama)
[![Build Status](https://travis-ci.org/Shopify/sarama.svg?branch=master)](https://travis-ci.org/Shopify/sarama)
[![Coverage](https://codecov.io/gh/Shopify/sarama/branch/master/graph/badge.svg)](https://codecov.io/gh/Shopify/sarama)
Sarama is an MIT-licensed Go client library for [Apache Kafka](https://kafka.apache.org/) version 0.8 (and later).
## Getting started
- API documentation and examples are available via [godoc](https://godoc.org/github.com/Shopify/sarama).
- Mocks for testing are available in the [mocks](./mocks) subpackage.
- The [examples](./examples) directory contains more elaborate example applications.
- The [tools](./tools) directory contains command line tools that can be useful for testing, diagnostics, and instrumentation.
You might also want to look at the [Frequently Asked Questions](https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions).
## Compatibility and API stability
Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
the two latest stable releases of Kafka and Go, and we provide a two month
grace period for older releases. This means we currently officially support
Go 1.12 through 1.14, and Kafka 2.1 through 2.4, although older releases are
still likely to work.
Sarama follows semantic versioning and provides API stability via the gopkg.in service.
You can import a version with a guaranteed stable API via http://gopkg.in/Shopify/sarama.v1.
A changelog is available [here](CHANGELOG.md).
## Contributing
- Get started by checking our [contribution guidelines](https://github.com/Shopify/sarama/blob/master/.github/CONTRIBUTING.md).
- Read the [Sarama wiki](https://github.com/Shopify/sarama/wiki) for more technical and design details.
- The [Kafka Protocol Specification](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) contains a wealth of useful information.
- For more general issues, there is [a google group](https://groups.google.com/forum/#!forum/kafka-clients) for Kafka client developers.
- If you have any questions, just ask!
# We have 5 * 192MB ZK processes and 5 * 320MB Kafka processes => 2560MB
MEMORY = 3072
Vagrant.configure("2") do |config|
config.vm.box = "ubuntu/bionic64"
config.vm.provision :shell, path: "vagrant/provision.sh"
config.vm.network "private_network", ip: "192.168.100.67"
config.vm.provider "virtualbox" do |v|
v.memory = MEMORY
end
end
package sarama
//Resource holds information about acl resource type
type Resource struct {
ResourceType AclResourceType
ResourceName string
ResourcePatternType AclResourcePatternType
}
func (r *Resource) encode(pe packetEncoder, version int16) error {
pe.putInt8(int8(r.ResourceType))
if err := pe.putString(r.ResourceName); err != nil {
return err
}
if version == 1 {
if r.ResourcePatternType == AclPatternUnknown {
Logger.Print("Cannot encode an unknown resource pattern type, using Literal instead")
r.ResourcePatternType = AclPatternLiteral
}
pe.putInt8(int8(r.ResourcePatternType))
}
return nil
}
func (r *Resource) decode(pd packetDecoder, version int16) (err error) {
resourceType, err := pd.getInt8()
if err != nil {
return err
}
r.ResourceType = AclResourceType(resourceType)
if r.ResourceName, err = pd.getString(); err != nil {
return err
}
if version == 1 {
pattern, err := pd.getInt8()
if err != nil {
return err
}
r.ResourcePatternType = AclResourcePatternType(pattern)
}
return nil
}
//Acl holds information about acl type
type Acl struct {
Principal string
Host string
Operation AclOperation
PermissionType AclPermissionType
}
func (a *Acl) encode(pe packetEncoder) error {
if err := pe.putString(a.Principal); err != nil {
return err
}
if err := pe.putString(a.Host); err != nil {
return err
}
pe.putInt8(int8(a.Operation))
pe.putInt8(int8(a.PermissionType))
return nil
}
func (a *Acl) decode(pd packetDecoder, version int16) (err error) {
if a.Principal, err = pd.getString(); err != nil {
return err
}
if a.Host, err = pd.getString(); err != nil {
return err
}
operation, err := pd.getInt8()
if err != nil {
return err
}
a.Operation = AclOperation(operation)
permissionType, err := pd.getInt8()
if err != nil {
return err
}
a.PermissionType = AclPermissionType(permissionType)
return nil
}
//ResourceAcls is an acl resource type
type ResourceAcls struct {
Resource
Acls []*Acl
}
func (r *ResourceAcls) encode(pe packetEncoder, version int16) error {
if err := r.Resource.encode(pe, version); err != nil {
return err
}
if err := pe.putArrayLength(len(r.Acls)); err != nil {
return err
}
for _, acl := range r.Acls {
if err := acl.encode(pe); err != nil {
return err
}
}
return nil
}
func (r *ResourceAcls) decode(pd packetDecoder, version int16) error {
if err := r.Resource.decode(pd, version); err != nil {
return err
}
n, err := pd.getArrayLength()
if err != nil {
return err
}
r.Acls = make([]*Acl, n)
for i := 0; i < n; i++ {
r.Acls[i] = new(Acl)
if err := r.Acls[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
package sarama
//CreateAclsRequest is an acl creation request
type CreateAclsRequest struct {
Version int16
AclCreations []*AclCreation
}
func (c *CreateAclsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(c.AclCreations)); err != nil {
return err
}
for _, aclCreation := range c.AclCreations {
if err := aclCreation.encode(pe, c.Version); err != nil {
return err
}
}
return nil
}
func (c *CreateAclsRequest) decode(pd packetDecoder, version int16) (err error) {
c.Version = version
n, err := pd.getArrayLength()
if err != nil {
return err
}
c.AclCreations = make([]*AclCreation, n)
for i := 0; i < n; i++ {
c.AclCreations[i] = new(AclCreation)
if err := c.AclCreations[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
func (c *CreateAclsRequest) key() int16 {
return 30
}
func (c *CreateAclsRequest) version() int16 {
return c.Version
}
func (c *CreateAclsRequest) headerVersion() int16 {
return 1
}
func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}
//AclCreation is a wrapper around Resource and Acl type
type AclCreation struct {
Resource
Acl
}
func (a *AclCreation) encode(pe packetEncoder, version int16) error {
if err := a.Resource.encode(pe, version); err != nil {
return err
}
if err := a.Acl.encode(pe); err != nil {
return err
}
return nil
}
func (a *AclCreation) decode(pd packetDecoder, version int16) (err error) {
if err := a.Resource.decode(pd, version); err != nil {
return err
}
if err := a.Acl.decode(pd, version); err != nil {
return err
}
return nil
}
package sarama
import "time"
//CreateAclsResponse is a an acl response creation type
type CreateAclsResponse struct {
ThrottleTime time.Duration
AclCreationResponses []*AclCreationResponse
}
func (c *CreateAclsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
if err := pe.putArrayLength(len(c.AclCreationResponses)); err != nil {
return err
}
for _, aclCreationResponse := range c.AclCreationResponses {
if err := aclCreationResponse.encode(pe); err != nil {
return err
}
}
return nil
}
func (c *CreateAclsResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
n, err := pd.getArrayLength()
if err != nil {
return err
}
c.AclCreationResponses = make([]*AclCreationResponse, n)
for i := 0; i < n; i++ {
c.AclCreationResponses[i] = new(AclCreationResponse)
if err := c.AclCreationResponses[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
func (c *CreateAclsResponse) key() int16 {
return 30
}
func (c *CreateAclsResponse) version() int16 {
return 0
}
func (c *CreateAclsResponse) headerVersion() int16 {
return 0
}
func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
//AclCreationResponse is an acl creation response type
type AclCreationResponse struct {
Err KError
ErrMsg *string
}
func (a *AclCreationResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(a.Err))
if err := pe.putNullableString(a.ErrMsg); err != nil {
return err
}
return nil
}
func (a *AclCreationResponse) decode(pd packetDecoder, version int16) (err error) {
kerr, err := pd.getInt16()
if err != nil {
return err
}
a.Err = KError(kerr)
if a.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}
return nil
}
package sarama
//DeleteAclsRequest is a delete acl request
type DeleteAclsRequest struct {
Version int
Filters []*AclFilter
}
func (d *DeleteAclsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(d.Filters)); err != nil {
return err
}
for _, filter := range d.Filters {
filter.Version = d.Version
if err := filter.encode(pe); err != nil {
return err
}
}
return nil
}
func (d *DeleteAclsRequest) decode(pd packetDecoder, version int16) (err error) {
d.Version = int(version)
n, err := pd.getArrayLength()
if err != nil {
return err
}
d.Filters = make([]*AclFilter, n)
for i := 0; i < n; i++ {
d.Filters[i] = new(AclFilter)
d.Filters[i].Version = int(version)
if err := d.Filters[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
func (d *DeleteAclsRequest) key() int16 {
return 31
}
func (d *DeleteAclsRequest) version() int16 {
return int16(d.Version)
}
func (c *DeleteAclsRequest) headerVersion() int16 {
return 1
}
func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}
package sarama
import "time"
//DeleteAclsResponse is a delete acl response
type DeleteAclsResponse struct {
Version int16
ThrottleTime time.Duration
FilterResponses []*FilterResponse
}
func (d *DeleteAclsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
if err := pe.putArrayLength(len(d.FilterResponses)); err != nil {
return err
}
for _, filterResponse := range d.FilterResponses {
if err := filterResponse.encode(pe, d.Version); err != nil {
return err
}
}
return nil
}
func (d *DeleteAclsResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
n, err := pd.getArrayLength()
if err != nil {
return err
}
d.FilterResponses = make([]*FilterResponse, n)
for i := 0; i < n; i++ {
d.FilterResponses[i] = new(FilterResponse)
if err := d.FilterResponses[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
func (d *DeleteAclsResponse) key() int16 {
return 31
}
func (d *DeleteAclsResponse) version() int16 {
return d.Version
}
func (d *DeleteAclsResponse) headerVersion() int16 {
return 0
}
func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
//FilterResponse is a filter response type
type FilterResponse struct {
Err KError
ErrMsg *string
MatchingAcls []*MatchingAcl
}
func (f *FilterResponse) encode(pe packetEncoder, version int16) error {
pe.putInt16(int16(f.Err))
if err := pe.putNullableString(f.ErrMsg); err != nil {
return err
}
if err := pe.putArrayLength(len(f.MatchingAcls)); err != nil {
return err
}
for _, matchingAcl := range f.MatchingAcls {
if err := matchingAcl.encode(pe, version); err != nil {
return err
}
}
return nil
}
func (f *FilterResponse) decode(pd packetDecoder, version int16) (err error) {
kerr, err := pd.getInt16()
if err != nil {
return err
}
f.Err = KError(kerr)
if f.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}
n, err := pd.getArrayLength()
if err != nil {
return err
}
f.MatchingAcls = make([]*MatchingAcl, n)
for i := 0; i < n; i++ {
f.MatchingAcls[i] = new(MatchingAcl)
if err := f.MatchingAcls[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
//MatchingAcl is a matching acl type
type MatchingAcl struct {
Err KError
ErrMsg *string
Resource
Acl
}
func (m *MatchingAcl) encode(pe packetEncoder, version int16) error {
pe.putInt16(int16(m.Err))
if err := pe.putNullableString(m.ErrMsg); err != nil {
return err
}
if err := m.Resource.encode(pe, version); err != nil {
return err
}
if err := m.Acl.encode(pe); err != nil {
return err
}
return nil
}
func (m *MatchingAcl) decode(pd packetDecoder, version int16) (err error) {
kerr, err := pd.getInt16()
if err != nil {
return err
}
m.Err = KError(kerr)
if m.ErrMsg, err = pd.getNullableString(); err != nil {
return err
}
if err := m.Resource.decode(pd, version); err != nil {
return err
}
if err := m.Acl.decode(pd, version); err != nil {
return err
}
return nil
}
package sarama
//DescribeAclsRequest is a secribe acl request type
type DescribeAclsRequest struct {
Version int
AclFilter
}
func (d *DescribeAclsRequest) encode(pe packetEncoder) error {
d.AclFilter.Version = d.Version
return d.AclFilter.encode(pe)
}
func (d *DescribeAclsRequest) decode(pd packetDecoder, version int16) (err error) {
d.Version = int(version)
d.AclFilter.Version = int(version)
return d.AclFilter.decode(pd, version)
}
func (d *DescribeAclsRequest) key() int16 {
return 29
}
func (d *DescribeAclsRequest) version() int16 {
return int16(d.Version)
}
func (d *DescribeAclsRequest) headerVersion() int16 {
return 1
}
func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}
package sarama
import "time"
//DescribeAclsResponse is a describe acl response type
type DescribeAclsResponse struct {
Version int16
ThrottleTime time.Duration
Err KError
ErrMsg *string
ResourceAcls []*ResourceAcls
}
func (d *DescribeAclsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
pe.putInt16(int16(d.Err))
if err := pe.putNullableString(d.ErrMsg); err != nil {
return err
}
if err := pe.putArrayLength(len(d.ResourceAcls)); err != nil {
return err
}
for _, resourceAcl := range d.ResourceAcls {
if err := resourceAcl.encode(pe, d.Version); err != nil {
return err
}
}
return nil
}
func (d *DescribeAclsResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
kerr, err := pd.getInt16()
if err != nil {
return err
}
d.Err = KError(kerr)
errmsg, err := pd.getString()
if err != nil {
return err
}
if errmsg != "" {
d.ErrMsg = &errmsg
}
n, err := pd.getArrayLength()
if err != nil {
return err
}
d.ResourceAcls = make([]*ResourceAcls, n)
for i := 0; i < n; i++ {
d.ResourceAcls[i] = new(ResourceAcls)
if err := d.ResourceAcls[i].decode(pd, version); err != nil {
return err
}
}
return nil
}
func (d *DescribeAclsResponse) key() int16 {
return 29
}
func (d *DescribeAclsResponse) version() int16 {
return d.Version
}
func (d *DescribeAclsResponse) headerVersion() int16 {
return 0
}
func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
return V2_0_0_0
default:
return V0_11_0_0
}
}
package sarama
type AclFilter struct {
Version int
ResourceType AclResourceType
ResourceName *string
ResourcePatternTypeFilter AclResourcePatternType
Principal *string
Host *string
Operation AclOperation
PermissionType AclPermissionType
}
func (a *AclFilter) encode(pe packetEncoder) error {
pe.putInt8(int8(a.ResourceType))
if err := pe.putNullableString(a.ResourceName); err != nil {
return err
}
if a.Version == 1 {
pe.putInt8(int8(a.ResourcePatternTypeFilter))
}
if err := pe.putNullableString(a.Principal); err != nil {
return err
}
if err := pe.putNullableString(a.Host); err != nil {
return err
}
pe.putInt8(int8(a.Operation))
pe.putInt8(int8(a.PermissionType))
return nil
}
func (a *AclFilter) decode(pd packetDecoder, version int16) (err error) {
resourceType, err := pd.getInt8()
if err != nil {
return err
}
a.ResourceType = AclResourceType(resourceType)
if a.ResourceName, err = pd.getNullableString(); err != nil {
return err
}
if a.Version == 1 {
pattern, err := pd.getInt8()
if err != nil {
return err
}
a.ResourcePatternTypeFilter = AclResourcePatternType(pattern)
}
if a.Principal, err = pd.getNullableString(); err != nil {
return err
}
if a.Host, err = pd.getNullableString(); err != nil {
return err
}
operation, err := pd.getInt8()
if err != nil {
return err
}
a.Operation = AclOperation(operation)
permissionType, err := pd.getInt8()
if err != nil {
return err
}
a.PermissionType = AclPermissionType(permissionType)
return nil
}
package sarama
type (
AclOperation int
AclPermissionType int
AclResourceType int
AclResourcePatternType int
)
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java
const (
AclOperationUnknown AclOperation = iota
AclOperationAny
AclOperationAll
AclOperationRead
AclOperationWrite
AclOperationCreate
AclOperationDelete
AclOperationAlter
AclOperationDescribe
AclOperationClusterAction
AclOperationDescribeConfigs
AclOperationAlterConfigs
AclOperationIdempotentWrite
)
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java
const (
AclPermissionUnknown AclPermissionType = iota
AclPermissionAny
AclPermissionDeny
AclPermissionAllow
)
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
const (
AclResourceUnknown AclResourceType = iota
AclResourceAny
AclResourceTopic
AclResourceGroup
AclResourceCluster
AclResourceTransactionalID
)
// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
const (
AclPatternUnknown AclResourcePatternType = iota
AclPatternAny
AclPatternMatch
AclPatternLiteral
AclPatternPrefixed
)
package sarama
//AddOffsetsToTxnRequest adds offsets to a transaction request
type AddOffsetsToTxnRequest struct {
TransactionalID string
ProducerID int64
ProducerEpoch int16
GroupID string
}
func (a *AddOffsetsToTxnRequest) encode(pe packetEncoder) error {
if err := pe.putString(a.TransactionalID); err != nil {
return err
}
pe.putInt64(a.ProducerID)
pe.putInt16(a.ProducerEpoch)
if err := pe.putString(a.GroupID); err != nil {
return err
}
return nil
}
func (a *AddOffsetsToTxnRequest) decode(pd packetDecoder, version int16) (err error) {
if a.TransactionalID, err = pd.getString(); err != nil {
return err
}
if a.ProducerID, err = pd.getInt64(); err != nil {
return err
}
if a.ProducerEpoch, err = pd.getInt16(); err != nil {
return err
}
if a.GroupID, err = pd.getString(); err != nil {
return err
}
return nil
}
func (a *AddOffsetsToTxnRequest) key() int16 {
return 25
}
func (a *AddOffsetsToTxnRequest) version() int16 {
return 0
}
func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
return 1
}
func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
package sarama
import (
"time"
)
//AddOffsetsToTxnResponse is a response type for adding offsets to txns
type AddOffsetsToTxnResponse struct {
ThrottleTime time.Duration
Err KError
}
func (a *AddOffsetsToTxnResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
pe.putInt16(int16(a.Err))
return nil
}
func (a *AddOffsetsToTxnResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
kerr, err := pd.getInt16()
if err != nil {
return err
}
a.Err = KError(kerr)
return nil
}
func (a *AddOffsetsToTxnResponse) key() int16 {
return 25
}
func (a *AddOffsetsToTxnResponse) version() int16 {
return 0
}
func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
return 0
}
func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
package sarama
//AddPartitionsToTxnRequest is a add paartition request
type AddPartitionsToTxnRequest struct {
TransactionalID string
ProducerID int64
ProducerEpoch int16
TopicPartitions map[string][]int32
}
func (a *AddPartitionsToTxnRequest) encode(pe packetEncoder) error {
if err := pe.putString(a.TransactionalID); err != nil {
return err
}
pe.putInt64(a.ProducerID)
pe.putInt16(a.ProducerEpoch)
if err := pe.putArrayLength(len(a.TopicPartitions)); err != nil {
return err
}
for topic, partitions := range a.TopicPartitions {
if err := pe.putString(topic); err != nil {
return err
}
if err := pe.putInt32Array(partitions); err != nil {
return err
}
}
return nil
}
func (a *AddPartitionsToTxnRequest) decode(pd packetDecoder, version int16) (err error) {
if a.TransactionalID, err = pd.getString(); err != nil {
return err
}
if a.ProducerID, err = pd.getInt64(); err != nil {
return err
}
if a.ProducerEpoch, err = pd.getInt16(); err != nil {
return err
}
n, err := pd.getArrayLength()
if err != nil {
return err
}
a.TopicPartitions = make(map[string][]int32)
for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
partitions, err := pd.getInt32Array()
if err != nil {
return err
}
a.TopicPartitions[topic] = partitions
}
return nil
}
func (a *AddPartitionsToTxnRequest) key() int16 {
return 24
}
func (a *AddPartitionsToTxnRequest) version() int16 {
return 0
}
func (a *AddPartitionsToTxnRequest) headerVersion() int16 {
return 1
}
func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
package sarama
import (
"time"
)
//AddPartitionsToTxnResponse is a partition errors to transaction type
type AddPartitionsToTxnResponse struct {
ThrottleTime time.Duration
Errors map[string][]*PartitionError
}
func (a *AddPartitionsToTxnResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
if err := pe.putArrayLength(len(a.Errors)); err != nil {
return err
}
for topic, e := range a.Errors {
if err := pe.putString(topic); err != nil {
return err
}
if err := pe.putArrayLength(len(e)); err != nil {
return err
}
for _, partitionError := range e {
if err := partitionError.encode(pe); err != nil {
return err
}
}
}
return nil
}
func (a *AddPartitionsToTxnResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
n, err := pd.getArrayLength()
if err != nil {
return err
}
a.Errors = make(map[string][]*PartitionError)
for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
m, err := pd.getArrayLength()
if err != nil {
return err
}
a.Errors[topic] = make([]*PartitionError, m)
for j := 0; j < m; j++ {
a.Errors[topic][j] = new(PartitionError)
if err := a.Errors[topic][j].decode(pd, version); err != nil {
return err
}
}
}
return nil
}
func (a *AddPartitionsToTxnResponse) key() int16 {
return 24
}
func (a *AddPartitionsToTxnResponse) version() int16 {
return 0
}
func (a *AddPartitionsToTxnResponse) headerVersion() int16 {
return 0
}
func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
//PartitionError is a partition error type
type PartitionError struct {
Partition int32
Err KError
}
func (p *PartitionError) encode(pe packetEncoder) error {
pe.putInt32(p.Partition)
pe.putInt16(int16(p.Err))
return nil
}
func (p *PartitionError) decode(pd packetDecoder, version int16) (err error) {
if p.Partition, err = pd.getInt32(); err != nil {
return err
}
kerr, err := pd.getInt16()
if err != nil {
return err
}
p.Err = KError(kerr)
return nil
}
此差异已折叠。
package sarama
//AlterConfigsRequest is an alter config request type
type AlterConfigsRequest struct {
Resources []*AlterConfigsResource
ValidateOnly bool
}
//AlterConfigsResource is an alter config resource type
type AlterConfigsResource struct {
Type ConfigResourceType
Name string
ConfigEntries map[string]*string
}
func (a *AlterConfigsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(a.Resources)); err != nil {
return err
}
for _, r := range a.Resources {
if err := r.encode(pe); err != nil {
return err
}
}
pe.putBool(a.ValidateOnly)
return nil
}
func (a *AlterConfigsRequest) decode(pd packetDecoder, version int16) error {
resourceCount, err := pd.getArrayLength()
if err != nil {
return err
}
a.Resources = make([]*AlterConfigsResource, resourceCount)
for i := range a.Resources {
r := &AlterConfigsResource{}
err = r.decode(pd, version)
if err != nil {
return err
}
a.Resources[i] = r
}
validateOnly, err := pd.getBool()
if err != nil {
return err
}
a.ValidateOnly = validateOnly
return nil
}
func (a *AlterConfigsResource) encode(pe packetEncoder) error {
pe.putInt8(int8(a.Type))
if err := pe.putString(a.Name); err != nil {
return err
}
if err := pe.putArrayLength(len(a.ConfigEntries)); err != nil {
return err
}
for configKey, configValue := range a.ConfigEntries {
if err := pe.putString(configKey); err != nil {
return err
}
if err := pe.putNullableString(configValue); err != nil {
return err
}
}
return nil
}
func (a *AlterConfigsResource) decode(pd packetDecoder, version int16) error {
t, err := pd.getInt8()
if err != nil {
return err
}
a.Type = ConfigResourceType(t)
name, err := pd.getString()
if err != nil {
return err
}
a.Name = name
n, err := pd.getArrayLength()
if err != nil {
return err
}
if n > 0 {
a.ConfigEntries = make(map[string]*string, n)
for i := 0; i < n; i++ {
configKey, err := pd.getString()
if err != nil {
return err
}
if a.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
return err
}
}
}
return err
}
func (a *AlterConfigsRequest) key() int16 {
return 33
}
func (a *AlterConfigsRequest) version() int16 {
return 0
}
func (a *AlterConfigsRequest) headerVersion() int16 {
return 1
}
func (a *AlterConfigsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
package sarama
import "time"
//AlterConfigsResponse is a response type for alter config
type AlterConfigsResponse struct {
ThrottleTime time.Duration
Resources []*AlterConfigsResourceResponse
}
//AlterConfigsResourceResponse is a response type for alter config resource
type AlterConfigsResourceResponse struct {
ErrorCode int16
ErrorMsg string
Type ConfigResourceType
Name string
}
func (a *AlterConfigsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
if err := pe.putArrayLength(len(a.Resources)); err != nil {
return err
}
for i := range a.Resources {
pe.putInt16(a.Resources[i].ErrorCode)
err := pe.putString(a.Resources[i].ErrorMsg)
if err != nil {
return nil
}
pe.putInt8(int8(a.Resources[i].Type))
err = pe.putString(a.Resources[i].Name)
if err != nil {
return nil
}
}
return nil
}
func (a *AlterConfigsResponse) decode(pd packetDecoder, version int16) error {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
responseCount, err := pd.getArrayLength()
if err != nil {
return err
}
a.Resources = make([]*AlterConfigsResourceResponse, responseCount)
for i := range a.Resources {
a.Resources[i] = new(AlterConfigsResourceResponse)
errCode, err := pd.getInt16()
if err != nil {
return err
}
a.Resources[i].ErrorCode = errCode
e, err := pd.getString()
if err != nil {
return err
}
a.Resources[i].ErrorMsg = e
t, err := pd.getInt8()
if err != nil {
return err
}
a.Resources[i].Type = ConfigResourceType(t)
name, err := pd.getString()
if err != nil {
return err
}
a.Resources[i].Name = name
}
return nil
}
func (a *AlterConfigsResponse) key() int16 {
return 32
}
func (a *AlterConfigsResponse) version() int16 {
return 0
}
func (a *AlterConfigsResponse) headerVersion() int16 {
return 0
}
func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
package sarama
type alterPartitionReassignmentsBlock struct {
replicas []int32
}
func (b *alterPartitionReassignmentsBlock) encode(pe packetEncoder) error {
if err := pe.putNullableCompactInt32Array(b.replicas); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
return nil
}
func (b *alterPartitionReassignmentsBlock) decode(pd packetDecoder) (err error) {
if b.replicas, err = pd.getCompactInt32Array(); err != nil {
return err
}
return nil
}
type AlterPartitionReassignmentsRequest struct {
TimeoutMs int32
blocks map[string]map[int32]*alterPartitionReassignmentsBlock
Version int16
}
func (r *AlterPartitionReassignmentsRequest) encode(pe packetEncoder) error {
pe.putInt32(r.TimeoutMs)
pe.putCompactArrayLength(len(r.blocks))
for topic, partitions := range r.blocks {
if err := pe.putCompactString(topic); err != nil {
return err
}
pe.putCompactArrayLength(len(partitions))
for partition, block := range partitions {
pe.putInt32(partition)
if err := block.encode(pe); err != nil {
return err
}
}
pe.putEmptyTaggedFieldArray()
}
pe.putEmptyTaggedFieldArray()
return nil
}
func (r *AlterPartitionReassignmentsRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.TimeoutMs, err = pd.getInt32(); err != nil {
return err
}
topicCount, err := pd.getCompactArrayLength()
if err != nil {
return err
}
if topicCount > 0 {
r.blocks = make(map[string]map[int32]*alterPartitionReassignmentsBlock)
for i := 0; i < topicCount; i++ {
topic, err := pd.getCompactString()
if err != nil {
return err
}
partitionCount, err := pd.getCompactArrayLength()
if err != nil {
return err
}
r.blocks[topic] = make(map[int32]*alterPartitionReassignmentsBlock)
for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
block := &alterPartitionReassignmentsBlock{}
if err := block.decode(pd); err != nil {
return err
}
r.blocks[topic][partition] = block
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
}
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
return
}
func (r *AlterPartitionReassignmentsRequest) key() int16 {
return 45
}
func (r *AlterPartitionReassignmentsRequest) version() int16 {
return r.Version
}
func (r *AlterPartitionReassignmentsRequest) headerVersion() int16 {
return 2
}
func (r *AlterPartitionReassignmentsRequest) requiredVersion() KafkaVersion {
return V2_4_0_0
}
func (r *AlterPartitionReassignmentsRequest) AddBlock(topic string, partitionID int32, replicas []int32) {
if r.blocks == nil {
r.blocks = make(map[string]map[int32]*alterPartitionReassignmentsBlock)
}
if r.blocks[topic] == nil {
r.blocks[topic] = make(map[int32]*alterPartitionReassignmentsBlock)
}
r.blocks[topic][partitionID] = &alterPartitionReassignmentsBlock{replicas}
}
package sarama
type alterPartitionReassignmentsErrorBlock struct {
errorCode KError
errorMessage *string
}
func (b *alterPartitionReassignmentsErrorBlock) encode(pe packetEncoder) error {
pe.putInt16(int16(b.errorCode))
if err := pe.putNullableCompactString(b.errorMessage); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
return nil
}
func (b *alterPartitionReassignmentsErrorBlock) decode(pd packetDecoder) (err error) {
errorCode, err := pd.getInt16()
if err != nil {
return err
}
b.errorCode = KError(errorCode)
b.errorMessage, err = pd.getCompactNullableString()
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
return err
}
type AlterPartitionReassignmentsResponse struct {
Version int16
ThrottleTimeMs int32
ErrorCode KError
ErrorMessage *string
Errors map[string]map[int32]*alterPartitionReassignmentsErrorBlock
}
func (r *AlterPartitionReassignmentsResponse) AddError(topic string, partition int32, kerror KError, message *string) {
if r.Errors == nil {
r.Errors = make(map[string]map[int32]*alterPartitionReassignmentsErrorBlock)
}
partitions := r.Errors[topic]
if partitions == nil {
partitions = make(map[int32]*alterPartitionReassignmentsErrorBlock)
r.Errors[topic] = partitions
}
partitions[partition] = &alterPartitionReassignmentsErrorBlock{errorCode: kerror, errorMessage: message}
}
func (r *AlterPartitionReassignmentsResponse) encode(pe packetEncoder) error {
pe.putInt32(r.ThrottleTimeMs)
pe.putInt16(int16(r.ErrorCode))
if err := pe.putNullableCompactString(r.ErrorMessage); err != nil {
return err
}
pe.putCompactArrayLength(len(r.Errors))
for topic, partitions := range r.Errors {
if err := pe.putCompactString(topic); err != nil {
return err
}
pe.putCompactArrayLength(len(partitions))
for partition, block := range partitions {
pe.putInt32(partition)
if err := block.encode(pe); err != nil {
return err
}
}
pe.putEmptyTaggedFieldArray()
}
pe.putEmptyTaggedFieldArray()
return nil
}
func (r *AlterPartitionReassignmentsResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
return err
}
kerr, err := pd.getInt16()
if err != nil {
return err
}
r.ErrorCode = KError(kerr)
if r.ErrorMessage, err = pd.getCompactNullableString(); err != nil {
return err
}
numTopics, err := pd.getCompactArrayLength()
if err != nil {
return err
}
if numTopics > 0 {
r.Errors = make(map[string]map[int32]*alterPartitionReassignmentsErrorBlock, numTopics)
for i := 0; i < numTopics; i++ {
topic, err := pd.getCompactString()
if err != nil {
return err
}
ongoingPartitionReassignments, err := pd.getCompactArrayLength()
if err != nil {
return err
}
r.Errors[topic] = make(map[int32]*alterPartitionReassignmentsErrorBlock, ongoingPartitionReassignments)
for j := 0; j < ongoingPartitionReassignments; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
block := &alterPartitionReassignmentsErrorBlock{}
if err := block.decode(pd); err != nil {
return err
}
r.Errors[topic][partition] = block
}
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
}
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
return nil
}
func (r *AlterPartitionReassignmentsResponse) key() int16 {
return 45
}
func (r *AlterPartitionReassignmentsResponse) version() int16 {
return r.Version
}
func (r *AlterPartitionReassignmentsResponse) headerVersion() int16 {
return 1
}
func (r *AlterPartitionReassignmentsResponse) requiredVersion() KafkaVersion {
return V2_4_0_0
}
package sarama
//ApiVersionsRequest ...
type ApiVersionsRequest struct {
}
func (a *ApiVersionsRequest) encode(pe packetEncoder) error {
return nil
}
func (a *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
return nil
}
func (a *ApiVersionsRequest) key() int16 {
return 18
}
func (a *ApiVersionsRequest) version() int16 {
return 0
}
func (a *ApiVersionsRequest) headerVersion() int16 {
return 1
}
func (a *ApiVersionsRequest) requiredVersion() KafkaVersion {
return V0_10_0_0
}
package sarama
//ApiVersionsResponseBlock is an api version response block type
type ApiVersionsResponseBlock struct {
ApiKey int16
MinVersion int16
MaxVersion int16
}
func (b *ApiVersionsResponseBlock) encode(pe packetEncoder) error {
pe.putInt16(b.ApiKey)
pe.putInt16(b.MinVersion)
pe.putInt16(b.MaxVersion)
return nil
}
func (b *ApiVersionsResponseBlock) decode(pd packetDecoder) error {
var err error
if b.ApiKey, err = pd.getInt16(); err != nil {
return err
}
if b.MinVersion, err = pd.getInt16(); err != nil {
return err
}
if b.MaxVersion, err = pd.getInt16(); err != nil {
return err
}
return nil
}
//ApiVersionsResponse is an api version response type
type ApiVersionsResponse struct {
Err KError
ApiVersions []*ApiVersionsResponseBlock
}
func (r *ApiVersionsResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
if err := pe.putArrayLength(len(r.ApiVersions)); err != nil {
return err
}
for _, apiVersion := range r.ApiVersions {
if err := apiVersion.encode(pe); err != nil {
return err
}
}
return nil
}
func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) error {
kerr, err := pd.getInt16()
if err != nil {
return err
}
r.Err = KError(kerr)
numBlocks, err := pd.getArrayLength()
if err != nil {
return err
}
r.ApiVersions = make([]*ApiVersionsResponseBlock, numBlocks)
for i := 0; i < numBlocks; i++ {
block := new(ApiVersionsResponseBlock)
if err := block.decode(pd); err != nil {
return err
}
r.ApiVersions[i] = block
}
return nil
}
func (r *ApiVersionsResponse) key() int16 {
return 18
}
func (r *ApiVersionsResponse) version() int16 {
return 0
}
func (a *ApiVersionsResponse) headerVersion() int16 {
return 0
}
func (r *ApiVersionsResponse) requiredVersion() KafkaVersion {
return V0_10_0_0
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
package sarama
import (
"bytes"
"compress/gzip"
"fmt"
"sync"
snappy "github.com/eapache/go-xerial-snappy"
"github.com/pierrec/lz4"
)
var (
lz4WriterPool = sync.Pool{
New: func() interface{} {
return lz4.NewWriter(nil)
},
}
gzipWriterPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
gzipWriterPoolForCompressionLevel1 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 1)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel2 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 2)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel3 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 3)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel4 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 4)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel5 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 5)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel6 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 6)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel7 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 7)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel8 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 8)
if err != nil {
panic(err)
}
return gz
},
}
gzipWriterPoolForCompressionLevel9 = sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, 9)
if err != nil {
panic(err)
}
return gz
},
}
)
func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
switch cc {
case CompressionNone:
return data, nil
case CompressionGZIP:
var (
err error
buf bytes.Buffer
writer *gzip.Writer
)
switch level {
case CompressionLevelDefault:
writer = gzipWriterPool.Get().(*gzip.Writer)
defer gzipWriterPool.Put(writer)
writer.Reset(&buf)
case 1:
writer = gzipWriterPoolForCompressionLevel1.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel1.Put(writer)
writer.Reset(&buf)
case 2:
writer = gzipWriterPoolForCompressionLevel2.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel2.Put(writer)
writer.Reset(&buf)
case 3:
writer = gzipWriterPoolForCompressionLevel3.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel3.Put(writer)
writer.Reset(&buf)
case 4:
writer = gzipWriterPoolForCompressionLevel4.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel4.Put(writer)
writer.Reset(&buf)
case 5:
writer = gzipWriterPoolForCompressionLevel5.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel5.Put(writer)
writer.Reset(&buf)
case 6:
writer = gzipWriterPoolForCompressionLevel6.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel6.Put(writer)
writer.Reset(&buf)
case 7:
writer = gzipWriterPoolForCompressionLevel7.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel7.Put(writer)
writer.Reset(&buf)
case 8:
writer = gzipWriterPoolForCompressionLevel8.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel8.Put(writer)
writer.Reset(&buf)
case 9:
writer = gzipWriterPoolForCompressionLevel9.Get().(*gzip.Writer)
defer gzipWriterPoolForCompressionLevel9.Put(writer)
writer.Reset(&buf)
default:
writer, err = gzip.NewWriterLevel(&buf, level)
if err != nil {
return nil, err
}
}
if _, err := writer.Write(data); err != nil {
return nil, err
}
if err := writer.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
case CompressionSnappy:
return snappy.Encode(data), nil
case CompressionLZ4:
writer := lz4WriterPool.Get().(*lz4.Writer)
defer lz4WriterPool.Put(writer)
var buf bytes.Buffer
writer.Reset(&buf)
if _, err := writer.Write(data); err != nil {
return nil, err
}
if err := writer.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
case CompressionZSTD:
return zstdCompress(nil, data)
default:
return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
}
}
此差异已折叠。
package sarama
// ConfigResourceType is a type for resources that have configs.
type ConfigResourceType int8
// Taken from:
// https://github.com/apache/kafka/blob/ed7c071e07f1f90e4c2895582f61ca090ced3c42/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L32-L55
const (
// UnknownResource constant type
UnknownResource ConfigResourceType = 0
// TopicResource constant type
TopicResource ConfigResourceType = 2
// BrokerResource constant type
BrokerResource ConfigResourceType = 4
// BrokerLoggerResource constant type
BrokerLoggerResource ConfigResourceType = 8
)
此差异已折叠。
此差异已折叠。
package sarama
//ConsumerGroupMemberMetadata holds the metadata for consumer group
type ConsumerGroupMemberMetadata struct {
Version int16
Topics []string
UserData []byte
}
func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
pe.putInt16(m.Version)
if err := pe.putStringArray(m.Topics); err != nil {
return err
}
if err := pe.putBytes(m.UserData); err != nil {
return err
}
return nil
}
func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}
if m.Topics, err = pd.getStringArray(); err != nil {
return
}
if m.UserData, err = pd.getBytes(); err != nil {
return
}
return nil
}
//ConsumerGroupMemberAssignment holds the member assignment for a consume group
type ConsumerGroupMemberAssignment struct {
Version int16
Topics map[string][]int32
UserData []byte
}
func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error {
pe.putInt16(m.Version)
if err := pe.putArrayLength(len(m.Topics)); err != nil {
return err
}
for topic, partitions := range m.Topics {
if err := pe.putString(topic); err != nil {
return err
}
if err := pe.putInt32Array(partitions); err != nil {
return err
}
}
if err := pe.putBytes(m.UserData); err != nil {
return err
}
return nil
}
func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) {
if m.Version, err = pd.getInt16(); err != nil {
return
}
var topicLen int
if topicLen, err = pd.getArrayLength(); err != nil {
return
}
m.Topics = make(map[string][]int32, topicLen)
for i := 0; i < topicLen; i++ {
var topic string
if topic, err = pd.getString(); err != nil {
return
}
if m.Topics[topic], err = pd.getInt32Array(); err != nil {
return
}
}
if m.UserData, err = pd.getBytes(); err != nil {
return
}
return nil
}
package sarama
//ConsumerMetadataRequest is used for metadata requests
type ConsumerMetadataRequest struct {
ConsumerGroup string
}
func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error {
tmp := new(FindCoordinatorRequest)
tmp.CoordinatorKey = r.ConsumerGroup
tmp.CoordinatorType = CoordinatorGroup
return tmp.encode(pe)
}
func (r *ConsumerMetadataRequest) decode(pd packetDecoder, version int16) (err error) {
tmp := new(FindCoordinatorRequest)
if err := tmp.decode(pd, version); err != nil {
return err
}
r.ConsumerGroup = tmp.CoordinatorKey
return nil
}
func (r *ConsumerMetadataRequest) key() int16 {
return 10
}
func (r *ConsumerMetadataRequest) version() int16 {
return 0
}
func (r *ConsumerMetadataRequest) headerVersion() int16 {
return 1
}
func (r *ConsumerMetadataRequest) requiredVersion() KafkaVersion {
return V0_8_2_0
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
name: sarama
up:
- go:
version: '1.14'
commands:
test:
run: make test
desc: 'run unit tests'
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册