rocksmq_impl.go 31.3 KB
Newer Older
X
Xiangyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

J
jaime 已提交
12
package server
Y
yukun 已提交
13 14

import (
N
neza2017 已提交
15
	"errors"
Y
yukun 已提交
16
	"fmt"
17
	"path"
X
Xiaofan 已提交
18
	"runtime"
Y
yukun 已提交
19
	"strconv"
X
Xiaofan 已提交
20
	"strings"
Y
yukun 已提交
21
	"sync"
Y
yukun 已提交
22
	"sync/atomic"
Y
yukun 已提交
23
	"time"
Y
yukun 已提交
24

25 26 27
	"github.com/tecbot/gorocksdb"
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
28 29
	"github.com/milvus-io/milvus/internal/allocator"
	"github.com/milvus-io/milvus/internal/kv"
30
	rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
X
Xiangyu Wang 已提交
31
	"github.com/milvus-io/milvus/internal/log"
32
	"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
33
	"github.com/milvus-io/milvus/internal/util/hardware"
34
	"github.com/milvus-io/milvus/internal/util/paramtable"
Y
yukun 已提交
35
	"github.com/milvus-io/milvus/internal/util/retry"
X
Xiangyu Wang 已提交
36
	"github.com/milvus-io/milvus/internal/util/typeutil"
Y
yukun 已提交
37 38
)

Y
yukun 已提交
39
// UniqueID is the type of message ID
Y
yukun 已提交
40 41
type UniqueID = typeutil.UniqueID

42
// RmqState Rocksmq state
Y
yukun 已提交
43 44
type RmqState = int64

45
// RocksmqPageSize is the size of a message page, default 16MB
46
var RocksmqPageSize int64 = 64 << 20
Y
yukun 已提交
47

48 49 50 51
// RocksDB cache size limitation(TODO config it)
var RocksDBLRUCacheMinCapacity = uint64(1 << 29)
var RocksDBLRUCacheMaxCapacity = uint64(4 << 30)

Y
yukun 已提交
52
// Const variable that will be used in rocksmqs
Y
yukun 已提交
53
const (
54
	DefaultMessageID UniqueID = -1
X
Xiaofan 已提交
55

Y
yukun 已提交
56 57
	kvSuffix = "_meta_kv"

X
Xiaofan 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
	//  topic_begin_id/topicName
	// topic begin id record a topic is valid, create when topic is created, cleaned up on destroy topic
	TopicIDTitle = "topic_id/"

	// message_size/topicName record the current page message size, once current message size > RocksMq size, reset this value and open a new page
	// TODO should be cached
	MessageSizeTitle = "message_size/"

	// page_message_size/topicName/pageId record the endId of each page, it will be purged either in retention or the destroy of topic
	PageMsgSizeTitle = "page_message_size/"

	// page_ts/topicName/pageId, record the page last ts, used for TTL functionality
	PageTsTitle = "page_ts/"

	// acked_ts/topicName/pageId, record the latest ack ts of each page, will be purged on retention or destroy of the topic
	AckedTsTitle = "acked_ts/"

	RmqNotServingErrMsg = "Rocksmq is not serving"
Y
yukun 已提交
76 77 78 79 80 81 82
)

const (
	// RmqStateStopped state stands for just created or stopped `Rocksmq` instance
	RmqStateStopped RmqState = 0
	// RmqStateHealthy state stands for healthy `Rocksmq` instance
	RmqStateHealthy RmqState = 1
Y
yukun 已提交
83 84
)

Y
yukun 已提交
85 86 87 88
/**
 * Construct current id
 */
func constructCurrentID(topicName, groupName string) string {
89
	return groupName + "/" + topicName
Y
yukun 已提交
90 91
}

Y
yukun 已提交
92
/**
X
Xiaofan 已提交
93
 * Combine metaname together with topic
Y
yukun 已提交
94
 */
X
Xiaofan 已提交
95
func constructKey(metaName, topic string) string {
Y
yukun 已提交
96
	// Check metaName/topic
X
Xiaofan 已提交
97 98
	return metaName + topic
}
Y
yukun 已提交
99

X
Xiaofan 已提交
100 101 102 103
func parsePageID(key string) (int64, error) {
	stringSlice := strings.Split(key, "/")
	if len(stringSlice) != 3 {
		return 0, fmt.Errorf("Invalid page id %s ", key)
Y
yukun 已提交
104
	}
X
Xiaofan 已提交
105
	return strconv.ParseInt(stringSlice[2], 10, 64)
Y
yukun 已提交
106 107
}

108
func checkRetention() bool {
X
Xiaofan 已提交
109
	return RocksmqRetentionTimeInSecs != -1 || RocksmqRetentionSizeInMB != -1
110 111
}

112
var topicMu = sync.Map{}
Y
yukun 已提交
113

Y
yukun 已提交
114 115
type rocksmq struct {
	store       *gorocksdb.DB
G
godchen 已提交
116
	kv          kv.BaseKV
117
	idAllocator allocator.Interface
118
	storeMu     *sync.Mutex
Y
yukun 已提交
119
	consumers   sync.Map
X
Xiaofan 已提交
120
	consumersID sync.Map
Y
yukun 已提交
121

Y
yukun 已提交
122
	retentionInfo *retentionInfo
123
	readers       sync.Map
Y
yukun 已提交
124
	state         RmqState
Y
yukun 已提交
125 126
}

Y
yukun 已提交
127
// NewRocksMQ step:
Y
yukun 已提交
128 129 130
// 1. New rocksmq instance based on rocksdb with name and rocksdbkv with kvname
// 2. Init retention info, load retention info to memory
// 3. Start retention goroutine
131
func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator.Interface) (*rocksmq, error) {
X
Xiaofan 已提交
132
	// TODO we should use same rocksdb instance with different cfs
X
Xiaofan 已提交
133 134 135 136 137 138 139
	maxProcs := runtime.GOMAXPROCS(0)
	parallelism := 1
	if maxProcs > 32 {
		parallelism = 4
	} else if maxProcs > 8 {
		parallelism = 2
	}
140
	memoryCount := hardware.GetMemoryCount()
141 142 143 144 145 146 147 148 149 150 151 152 153
	// default rocks db cache is set with memory
	rocksDBLRUCacheCapacity := RocksDBLRUCacheMinCapacity
	if memoryCount > 0 {
		ratio := params.ParseFloatWithDefault("rocksmq.lrucacheratio", 0.06)
		calculatedCapacity := uint64(float64(memoryCount) * ratio)
		if calculatedCapacity < RocksDBLRUCacheMinCapacity {
			rocksDBLRUCacheCapacity = RocksDBLRUCacheMinCapacity
		} else if calculatedCapacity > RocksDBLRUCacheMaxCapacity {
			rocksDBLRUCacheCapacity = RocksDBLRUCacheMaxCapacity
		} else {
			rocksDBLRUCacheCapacity = calculatedCapacity
		}
	}
154
	log.Info("Start rocksmq ", zap.Int("max proc", maxProcs),
155
		zap.Int("parallism", parallelism), zap.Uint64("lru cache", rocksDBLRUCacheCapacity))
Y
yukun 已提交
156
	bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
A
aoiasd 已提交
157
	bbto.SetBlockSize(64 << 10)
158
	bbto.SetBlockCache(gorocksdb.NewLRUCache(rocksDBLRUCacheCapacity))
A
aoiasd 已提交
159

X
Xiaofan 已提交
160
	optsKV := gorocksdb.NewDefaultOptions()
161
	optsKV.SetCompressionPerLevel([]gorocksdb.CompressionType{0, 0, 7, 7, 7, 7, 7})
X
Xiaofan 已提交
162 163 164 165
	optsKV.SetBlockBasedTableFactory(bbto)
	optsKV.SetCreateIfMissing(true)
	// by default there are only 1 thread for flush compaction, which may block each other.
	// increase to a reasonable thread numbers
X
Xiaofan 已提交
166
	optsKV.IncreaseParallelism(parallelism)
X
Xiaofan 已提交
167 168 169 170 171 172
	// enable back ground flush
	optsKV.SetMaxBackgroundFlushes(1)

	// finish rocks KV
	kvName := name + kvSuffix
	kv, err := rocksdbkv.NewRocksdbKVWithOpts(kvName, optsKV)
Y
yukun 已提交
173 174 175 176
	if err != nil {
		return nil, err
	}

X
Xiaofan 已提交
177 178 179
	// finish rocks mq store initialization, rocks mq store has to set the prefix extractor
	optsStore := gorocksdb.NewDefaultOptions()
	// share block cache with kv
180
	optsStore.SetCompressionPerLevel([]gorocksdb.CompressionType{0, 0, 7, 7, 7, 7, 7})
X
Xiaofan 已提交
181 182 183 184
	optsStore.SetBlockBasedTableFactory(bbto)
	optsStore.SetCreateIfMissing(true)
	// by default there are only 1 thread for flush compaction, which may block each other.
	// increase to a reasonable thread numbers
X
Xiaofan 已提交
185
	optsStore.IncreaseParallelism(parallelism)
X
Xiaofan 已提交
186 187 188 189
	// enable back ground flush
	optsStore.SetMaxBackgroundFlushes(1)

	db, err := gorocksdb.OpenDb(optsStore, name)
Y
yukun 已提交
190 191 192
	if err != nil {
		return nil, err
	}
Y
yukun 已提交
193

194
	var mqIDAllocator allocator.Interface
X
Xiaofan 已提交
195 196 197 198 199 200 201 202 203 204 205 206
	// if user didn't specify id allocator, init one with kv
	if idAllocator == nil {
		allocator := allocator.NewGlobalIDAllocator("rmq_id", kv)
		err = allocator.Initialize()
		if err != nil {
			return nil, err
		}
		mqIDAllocator = allocator
	} else {
		mqIDAllocator = idAllocator
	}

Y
yukun 已提交
207 208
	rmq := &rocksmq{
		store:       db,
Y
yukun 已提交
209
		kv:          kv,
X
Xiaofan 已提交
210
		idAllocator: mqIDAllocator,
211
		storeMu:     &sync.Mutex{},
Y
yukun 已提交
212
		consumers:   sync.Map{},
213
		readers:     sync.Map{},
Y
yukun 已提交
214
	}
Y
yukun 已提交
215

216
	ri, err := initRetentionInfo(params, kv, db)
Y
yukun 已提交
217 218 219 220 221
	if err != nil {
		return nil, err
	}
	rmq.retentionInfo = ri

222 223 224
	if checkRetention() {
		rmq.retentionInfo.startRetentionInfo()
	}
Y
yukun 已提交
225
	atomic.StoreInt64(&rmq.state, RmqStateHealthy)
X
Xiaofan 已提交
226 227 228
	// TODO add this to monitor metrics
	go func() {
		for {
229 230
			time.Sleep(10 * time.Minute)

X
Xiaofan 已提交
231 232 233
			log.Info("Rocksmq stats",
				zap.String("cache", kv.DB.GetProperty("rocksdb.block-cache-usage")),
				zap.String("rockskv memtable ", kv.DB.GetProperty("rocksdb.size-all-mem-tables")),
X
Xiaofan 已提交
234 235
				zap.String("rockskv table readers", kv.DB.GetProperty("rocksdb.estimate-table-readers-mem")),
				zap.String("rockskv pinned", kv.DB.GetProperty("rocksdb.block-cache-pinned-usage")),
X
Xiaofan 已提交
236
				zap.String("store memtable ", db.GetProperty("rocksdb.size-all-mem-tables")),
X
Xiaofan 已提交
237 238
				zap.String("store table readers", db.GetProperty("rocksdb.estimate-table-readers-mem")),
				zap.String("store pinned", db.GetProperty("rocksdb.block-cache-pinned-usage")),
X
Xiaofan 已提交
239 240 241 242 243
				zap.String("store l0 file num", db.GetProperty("rocksdb.num-files-at-level0")),
				zap.String("store l1 file num", db.GetProperty("rocksdb.num-files-at-level1")),
				zap.String("store l2 file num", db.GetProperty("rocksdb.num-files-at-level2")),
				zap.String("store l3 file num", db.GetProperty("rocksdb.num-files-at-level3")),
				zap.String("store l4 file num", db.GetProperty("rocksdb.num-files-at-level4")),
X
Xiaofan 已提交
244
			)
245
			rmq.Info()
X
Xiaofan 已提交
246 247 248
		}
	}()

Y
yukun 已提交
249 250 251
	return rmq, nil
}

Y
yukun 已提交
252 253 254 255
func (rmq *rocksmq) isClosed() bool {
	return atomic.LoadInt64(&rmq.state) != RmqStateHealthy
}

256 257 258 259
// Close step:
// 1. Stop retention
// 2. Destroy all consumer groups and topics
// 3. Close rocksdb instance
260
func (rmq *rocksmq) Close() {
Y
yukun 已提交
261
	atomic.StoreInt64(&rmq.state, RmqStateStopped)
262 263
	rmq.stopRetention()
	rmq.consumers.Range(func(k, v interface{}) bool {
X
Xiaofan 已提交
264 265
		// TODO what happened if the server crashed? who handled the destroy consumer group? should we just handled it when rocksmq created?
		// or we should not even make consumer info persistent?
266
		for _, consumer := range v.([]*Consumer) {
X
Xiaofan 已提交
267
			err := rmq.destroyConsumerGroupInternal(consumer.Topic, consumer.GroupName)
268
			if err != nil {
269
				log.Warn("Failed to destroy consumer group in rocksmq!", zap.Any("topic", consumer.Topic), zap.Any("groupName", consumer.GroupName), zap.Any("error", err))
270 271 272 273
			}
		}
		return true
	})
Y
yukun 已提交
274 275
	rmq.storeMu.Lock()
	defer rmq.storeMu.Unlock()
X
Xiaofan 已提交
276
	rmq.kv.Close()
277
	rmq.store.Close()
X
Xiaofan 已提交
278
	log.Info("Successfully close rocksmq")
279 280
}

281
// print rmq consumer Info
282 283
func (rmq *rocksmq) Info() bool {
	rtn := true
284 285
	rmq.consumers.Range(func(key, vals interface{}) bool {
		topic, _ := key.(string)
286
		consumerList, _ := vals.([]*Consumer)
287

288 289 290 291 292
		minConsumerPosition := UniqueID(-1)
		minConsumerGroupName := ""
		for _, consumer := range consumerList {
			consumerKey := constructCurrentID(consumer.Topic, consumer.GroupName)
			consumerPosition, ok := rmq.consumersID.Load(consumerKey)
293 294 295 296
			if !ok {
				log.Error("some group not regist", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName))
				continue
			}
297 298 299 300 301 302 303 304 305 306 307 308
			if minConsumerPosition == UniqueID(-1) || consumerPosition.(UniqueID) < minConsumerPosition {
				minConsumerPosition = consumerPosition.(UniqueID)
				minConsumerGroupName = consumer.GroupName
			}
		}

		pageTsSizeKey := constructKey(PageTsTitle, topic)
		pages, _, err := rmq.kv.LoadWithPrefix(pageTsSizeKey)
		if err != nil {
			log.Error("Rocksmq get page num failed", zap.String("topic", topic))
			rtn = false
			return false
309 310
		}

311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
		msgSizeKey := MessageSizeTitle + topic
		msgSizeVal, err := rmq.kv.Load(msgSizeKey)
		if err != nil {
			log.Error("Rocksmq get last page size failed", zap.String("topic", topic))
			rtn = false
			return false
		}

		log.Info("Rocksmq Info",
			zap.String("topic", topic),
			zap.Int("consumer num", len(consumerList)),
			zap.String("min position group names", minConsumerGroupName),
			zap.Int64("min positions", minConsumerPosition),
			zap.Int("page sum", len(pages)),
			zap.String("last page size", msgSizeVal),
		)
327 328
		return true
	})
329
	return rtn
330 331
}

Y
yukun 已提交
332
func (rmq *rocksmq) stopRetention() {
333
	if rmq.retentionInfo != nil {
C
congqixia 已提交
334
		rmq.retentionInfo.Stop()
335
	}
Y
yukun 已提交
336 337
}

338
// CreateTopic writes initialized messages for topic in rocksdb
Y
yukun 已提交
339
func (rmq *rocksmq) CreateTopic(topicName string) error {
Y
yukun 已提交
340 341 342
	if rmq.isClosed() {
		return errors.New(RmqNotServingErrMsg)
	}
X
Xiaofan 已提交
343
	start := time.Now()
Y
yukun 已提交
344

Y
yukun 已提交
345 346
	// Check if topicName contains "/"
	if strings.Contains(topicName, "/") {
X
Xiaofan 已提交
347
		log.Warn("rocksmq failed to create topic for topic name contains \"/\"", zap.String("topic", topicName))
Y
yukun 已提交
348 349 350
		return retry.Unrecoverable(fmt.Errorf("topic name = %s contains \"/\"", topicName))
	}

351
	// topicIDKey is the only identifier of a topic
X
Xiaofan 已提交
352 353
	topicIDKey := TopicIDTitle + topicName
	val, err := rmq.kv.Load(topicIDKey)
Y
yukun 已提交
354 355 356
	if err != nil {
		return err
	}
X
Xiaofan 已提交
357
	if val != "" {
X
Xiaofan 已提交
358
		log.Warn("rocksmq topic already exists ", zap.String("topic", topicName))
X
Xiaofan 已提交
359
		return nil
Y
yukun 已提交
360
	}
X
Xiaofan 已提交
361

Y
yukun 已提交
362 363 364 365
	if _, ok := topicMu.Load(topicName); !ok {
		topicMu.Store(topicName, new(sync.Mutex))
	}

366 367
	// msgSizeKey -> msgSize
	// topicIDKey -> topic creating time
Y
yukun 已提交
368
	kvs := make(map[string]string)
369

X
Xiaofan 已提交
370 371
	// Initialize topic message size to 0
	msgSizeKey := MessageSizeTitle + topicName
Y
yukun 已提交
372
	kvs[msgSizeKey] = "0"
Y
yukun 已提交
373

374
	// Initialize topic id to its creating time, we don't really use it for now
X
Xiaofan 已提交
375
	nowTs := strconv.FormatInt(time.Now().Unix(), 10)
Y
yukun 已提交
376
	kvs[topicIDKey] = nowTs
377 378 379
	if err = rmq.kv.MultiSave(kvs); err != nil {
		return retry.Unrecoverable(err)
	}
Y
yukun 已提交
380

381 382
	rmq.retentionInfo.mutex.Lock()
	defer rmq.retentionInfo.mutex.Unlock()
X
Xiaofan 已提交
383
	rmq.retentionInfo.topicRetetionTime.Store(topicName, time.Now().Unix())
384
	log.Info("Rocksmq create topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
Y
yukun 已提交
385 386 387
	return nil
}

388
// DestroyTopic removes messages for topic in rocksmq
Y
yukun 已提交
389
func (rmq *rocksmq) DestroyTopic(topicName string) error {
X
Xiaofan 已提交
390
	start := time.Now()
Y
yukun 已提交
391 392 393
	ll, ok := topicMu.Load(topicName)
	if !ok {
		return fmt.Errorf("topic name = %s not exist", topicName)
Y
yukun 已提交
394
	}
Y
yukun 已提交
395 396 397
	lock, ok := ll.(*sync.Mutex)
	if !ok {
		return fmt.Errorf("get mutex failed, topic name = %s", topicName)
Y
yukun 已提交
398
	}
Y
yukun 已提交
399 400
	lock.Lock()
	defer lock.Unlock()
Y
yukun 已提交
401

Y
yukun 已提交
402
	rmq.consumers.Delete(topicName)
Y
yukun 已提交
403

X
Xiaofan 已提交
404
	// clean the topic data it self
Y
yukun 已提交
405 406
	fixTopicName := topicName + "/"
	err := rmq.kv.RemoveWithPrefix(fixTopicName)
X
Xiaofan 已提交
407 408 409 410 411 412 413 414 415 416
	if err != nil {
		return err
	}

	// clean page size info
	pageMsgSizeKey := constructKey(PageMsgSizeTitle, topicName)
	err = rmq.kv.RemoveWithPrefix(pageMsgSizeKey)
	if err != nil {
		return err
	}
417

X
Xiaofan 已提交
418 419 420 421 422 423
	// clean page ts info
	pageMsgTsKey := constructKey(PageTsTitle, topicName)
	err = rmq.kv.RemoveWithPrefix(pageMsgTsKey)
	if err != nil {
		return err
	}
Y
yukun 已提交
424

X
Xiaofan 已提交
425 426 427 428 429 430 431 432 433 434 435 436 437
	// cleaned acked ts info
	ackedTsKey := constructKey(AckedTsTitle, topicName)
	err = rmq.kv.RemoveWithPrefix(ackedTsKey)
	if err != nil {
		return err
	}

	// topic info
	topicIDKey := TopicIDTitle + topicName
	// message size of this topic
	msgSizeKey := MessageSizeTitle + topicName
	var removedKeys []string
	removedKeys = append(removedKeys, topicIDKey, msgSizeKey)
Y
yukun 已提交
438
	// Batch remove, atomic operation
X
Xiaofan 已提交
439
	err = rmq.kv.MultiRemove(removedKeys)
Y
yukun 已提交
440 441 442 443
	if err != nil {
		return err
	}

444
	// clean up retention info
Y
yukun 已提交
445
	topicMu.Delete(topicName)
X
Xiaofan 已提交
446
	rmq.retentionInfo.topicRetetionTime.Delete(topicName)
447

448
	log.Info("Rocksmq destroy topic successfully ", zap.String("topic", topicName), zap.Int64("elapsed", time.Since(start).Milliseconds()))
Y
yukun 已提交
449 450 451
	return nil
}

452
// ExistConsumerGroup check if a consumer exists and return the existed consumer
X
Xiaofan 已提交
453
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer, error) {
Y
yukun 已提交
454
	key := constructCurrentID(topicName, groupName)
X
Xiaofan 已提交
455 456
	_, ok := rmq.consumersID.Load(key)
	if ok {
Y
yukun 已提交
457 458 459
		if vals, ok := rmq.consumers.Load(topicName); ok {
			for _, v := range vals.([]*Consumer) {
				if v.GroupName == groupName {
X
Xiaofan 已提交
460
					return true, v, nil
Y
yukun 已提交
461
				}
Y
yukun 已提交
462 463 464
			}
		}
	}
X
Xiaofan 已提交
465
	return false, nil, nil
Y
yukun 已提交
466 467
}

468
// CreateConsumerGroup creates an nonexistent consumer group for topic
Y
yukun 已提交
469
func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error {
Y
yukun 已提交
470 471 472
	if rmq.isClosed() {
		return errors.New(RmqNotServingErrMsg)
	}
X
Xiaofan 已提交
473
	start := time.Now()
Y
yukun 已提交
474
	key := constructCurrentID(topicName, groupName)
X
Xiaofan 已提交
475 476 477
	_, ok := rmq.consumersID.Load(key)
	if ok {
		return fmt.Errorf("RMQ CreateConsumerGroup key already exists, key = %s", key)
Y
yukun 已提交
478
	}
X
Xiaofan 已提交
479
	rmq.consumersID.Store(key, DefaultMessageID)
480
	log.Info("Rocksmq create consumer group successfully ", zap.String("topic", topicName),
X
Xiaofan 已提交
481 482
		zap.String("group", groupName),
		zap.Int64("elapsed", time.Since(start).Milliseconds()))
Y
yukun 已提交
483 484 485
	return nil
}

486
// RegisterConsumer registers a consumer in rocksmq consumers
X
Xiaofan 已提交
487
func (rmq *rocksmq) RegisterConsumer(consumer *Consumer) error {
Y
yukun 已提交
488
	if rmq.isClosed() {
X
Xiaofan 已提交
489
		return errors.New(RmqNotServingErrMsg)
Y
yukun 已提交
490
	}
X
Xiaofan 已提交
491
	start := time.Now()
Y
yukun 已提交
492 493 494
	if vals, ok := rmq.consumers.Load(consumer.Topic); ok {
		for _, v := range vals.([]*Consumer) {
			if v.GroupName == consumer.GroupName {
X
Xiaofan 已提交
495
				return nil
Y
yukun 已提交
496
			}
Y
yukun 已提交
497
		}
Y
yukun 已提交
498 499 500 501 502 503 504
		consumers := vals.([]*Consumer)
		consumers = append(consumers, consumer)
		rmq.consumers.Store(consumer.Topic, consumers)
	} else {
		consumers := make([]*Consumer, 1)
		consumers[0] = consumer
		rmq.consumers.Store(consumer.Topic, consumers)
Y
yukun 已提交
505
	}
506
	log.Info("Rocksmq register consumer successfully ", zap.String("topic", consumer.Topic), zap.Int64("elapsed", time.Since(start).Milliseconds()))
X
Xiaofan 已提交
507
	return nil
Y
yukun 已提交
508 509
}

510 511 512 513 514 515 516 517 518 519 520 521
func (rmq *rocksmq) GetLatestMsg(topicName string) (int64, error) {
	if rmq.isClosed() {
		return DefaultMessageID, errors.New(RmqNotServingErrMsg)
	}
	msgID, err := rmq.getLatestMsg(topicName)
	if err != nil {
		return DefaultMessageID, err
	}

	return msgID, nil
}

522
// DestroyConsumerGroup removes a consumer group from rocksdb_kv
Y
yukun 已提交
523
func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
X
Xiaofan 已提交
524 525 526 527 528 529 530 531
	if rmq.isClosed() {
		return errors.New(RmqNotServingErrMsg)
	}
	return rmq.destroyConsumerGroupInternal(topicName, groupName)
}

// DestroyConsumerGroup removes a consumer group from rocksdb_kv
func (rmq *rocksmq) destroyConsumerGroupInternal(topicName, groupName string) error {
X
Xiaofan 已提交
532
	start := time.Now()
533 534 535 536 537 538 539 540 541 542
	ll, ok := topicMu.Load(topicName)
	if !ok {
		return fmt.Errorf("topic name = %s not exist", topicName)
	}
	lock, ok := ll.(*sync.Mutex)
	if !ok {
		return fmt.Errorf("get mutex failed, topic name = %s", topicName)
	}
	lock.Lock()
	defer lock.Unlock()
Y
yukun 已提交
543
	key := constructCurrentID(topicName, groupName)
X
Xiaofan 已提交
544
	rmq.consumersID.Delete(key)
Y
yukun 已提交
545 546 547 548 549 550 551 552 553
	if vals, ok := rmq.consumers.Load(topicName); ok {
		consumers := vals.([]*Consumer)
		for index, v := range consumers {
			if v.GroupName == groupName {
				close(v.MsgMutex)
				consumers = append(consumers[:index], consumers[index+1:]...)
				rmq.consumers.Store(topicName, consumers)
				break
			}
Y
yukun 已提交
554 555
		}
	}
556
	log.Info("Rocksmq destroy consumer group successfully ", zap.String("topic", topicName),
X
Xiaofan 已提交
557 558
		zap.String("group", groupName),
		zap.Int64("elapsed", time.Since(start).Milliseconds()))
Y
yukun 已提交
559 560 561
	return nil
}

562
// Produce produces messages for topic and updates page infos for retention
563
func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error) {
Y
yukun 已提交
564 565 566
	if rmq.isClosed() {
		return nil, errors.New(RmqNotServingErrMsg)
	}
Y
yukun 已提交
567
	start := time.Now()
Y
yukun 已提交
568
	ll, ok := topicMu.Load(topicName)
N
neza2017 已提交
569
	if !ok {
570
		return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName)
N
neza2017 已提交
571 572 573
	}
	lock, ok := ll.(*sync.Mutex)
	if !ok {
574
		return []UniqueID{}, fmt.Errorf("get mutex failed, topic name = %s", topicName)
N
neza2017 已提交
575 576 577 578
	}
	lock.Lock()
	defer lock.Unlock()

Y
yukun 已提交
579 580
	getLockTime := time.Since(start).Milliseconds()

Y
yukun 已提交
581 582 583 584
	msgLen := len(messages)
	idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen))

	if err != nil {
585
		return []UniqueID{}, err
Y
yukun 已提交
586
	}
X
Xiaofan 已提交
587
	allocTime := time.Since(start).Milliseconds()
Y
yukun 已提交
588
	if UniqueID(msgLen) != idEnd-idStart {
589
		return []UniqueID{}, errors.New("Obtained id length is not equal that of message")
Y
yukun 已提交
590 591
	}

X
Xiaofan 已提交
592
	// Insert data to store system
Y
yukun 已提交
593
	batch := gorocksdb.NewWriteBatch()
Y
yukun 已提交
594
	defer batch.Destroy()
Y
yukun 已提交
595
	msgSizes := make(map[UniqueID]int64)
Y
yukun 已提交
596
	msgIDs := make([]UniqueID, msgLen)
Y
yukun 已提交
597
	for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ {
Y
yukun 已提交
598
		msgID := idStart + UniqueID(i)
Y
yukun 已提交
599
		key := path.Join(topicName, strconv.FormatInt(msgID, 10))
Y
yukun 已提交
600
		batch.Put([]byte(key), messages[i].Payload)
Y
yukun 已提交
601 602
		msgIDs[i] = msgID
		msgSizes[msgID] = int64(len(messages[i].Payload))
Y
yukun 已提交
603 604
	}

Y
yukun 已提交
605 606 607
	opts := gorocksdb.NewDefaultWriteOptions()
	defer opts.Destroy()
	err = rmq.store.Write(opts, batch)
Y
yukun 已提交
608
	if err != nil {
609
		return []UniqueID{}, err
Y
yukun 已提交
610
	}
X
Xiaofan 已提交
611
	writeTime := time.Since(start).Milliseconds()
Y
yukun 已提交
612 613 614 615 616 617 618 619
	if vals, ok := rmq.consumers.Load(topicName); ok {
		for _, v := range vals.([]*Consumer) {
			select {
			case v.MsgMutex <- struct{}{}:
				continue
			default:
				continue
			}
Y
yukun 已提交
620 621
		}
	}
Y
yukun 已提交
622 623

	// Update message page info
X
Xiaofan 已提交
624
	err = rmq.updatePageInfo(topicName, msgIDs, msgSizes)
Y
yukun 已提交
625
	if err != nil {
626
		return []UniqueID{}, err
Y
yukun 已提交
627
	}
628

X
Xiaofan 已提交
629
	// TODO add this to monitor metrics
630
	getProduceTime := time.Since(start).Milliseconds()
X
Xiaofan 已提交
631
	if getProduceTime > 200 {
632
		log.Warn("rocksmq produce too slowly", zap.String("topic", topicName),
X
Xiaofan 已提交
633
			zap.Int64("get lock elapse", getLockTime),
634 635 636 637 638
			zap.Int64("alloc elapse", allocTime-getLockTime),
			zap.Int64("write elapse", writeTime-allocTime),
			zap.Int64("updatePage elapse", getProduceTime-writeTime),
			zap.Int64("produce total elapse", getProduceTime),
		)
639
	}
640
	return msgIDs, nil
Y
yukun 已提交
641 642
}

X
Xiaofan 已提交
643
func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes map[UniqueID]int64) error {
Y
yukun 已提交
644 645 646 647 648 649 650 651 652
	msgSizeKey := MessageSizeTitle + topicName
	msgSizeVal, err := rmq.kv.Load(msgSizeKey)
	if err != nil {
		return err
	}
	curMsgSize, err := strconv.ParseInt(msgSizeVal, 10, 64)
	if err != nil {
		return err
	}
X
Xiaofan 已提交
653 654 655 656
	fixedPageSizeKey := constructKey(PageMsgSizeTitle, topicName)
	fixedPageTsKey := constructKey(PageTsTitle, topicName)
	nowTs := strconv.FormatInt(time.Now().Unix(), 10)
	mutateBuffer := make(map[string]string)
Y
yukun 已提交
657 658 659
	for _, id := range msgIDs {
		msgSize := msgSizes[id]
		if curMsgSize+msgSize > RocksmqPageSize {
Y
yukun 已提交
660
			// Current page is full
Y
yukun 已提交
661 662
			newPageSize := curMsgSize + msgSize
			pageEndID := id
Y
yukun 已提交
663 664
			// Update page message size for current page. key is page end ID
			pageMsgSizeKey := fixedPageSizeKey + "/" + strconv.FormatInt(pageEndID, 10)
X
Xiaofan 已提交
665 666 667
			mutateBuffer[pageMsgSizeKey] = strconv.FormatInt(newPageSize, 10)
			pageTsKey := fixedPageTsKey + "/" + strconv.FormatInt(pageEndID, 10)
			mutateBuffer[pageTsKey] = nowTs
Y
yukun 已提交
668 669
			curMsgSize = 0
		} else {
Y
yukun 已提交
670
			curMsgSize += msgSize
Y
yukun 已提交
671 672
		}
	}
X
Xiaofan 已提交
673 674
	mutateBuffer[msgSizeKey] = strconv.FormatInt(curMsgSize, 10)
	err = rmq.kv.MultiSave(mutateBuffer)
675
	return err
Y
yukun 已提交
676 677
}

678 679 680 681
// Consume steps:
// 1. Consume n messages from rocksdb
// 2. Update current_id to the last consumed message
// 3. Update ack informations in rocksdb
Y
yukun 已提交
682
func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) {
Y
yukun 已提交
683 684 685
	if rmq.isClosed() {
		return nil, errors.New(RmqNotServingErrMsg)
	}
Y
yukun 已提交
686
	start := time.Now()
Y
yukun 已提交
687
	ll, ok := topicMu.Load(topicName)
N
neza2017 已提交
688 689 690 691 692 693 694 695 696
	if !ok {
		return nil, fmt.Errorf("topic name = %s not exist", topicName)
	}
	lock, ok := ll.(*sync.Mutex)
	if !ok {
		return nil, fmt.Errorf("get mutex failed, topic name = %s", topicName)
	}
	lock.Lock()
	defer lock.Unlock()
Y
yukun 已提交
697
	getLockTime := time.Since(start).Milliseconds()
N
neza2017 已提交
698

Y
yukun 已提交
699
	metaKey := constructCurrentID(topicName, groupName)
X
Xiaofan 已提交
700 701
	currentID, ok := rmq.consumersID.Load(metaKey)
	if !ok {
702 703
		return nil, fmt.Errorf("currentID of topicName=%s, groupName=%s not exist", topicName, groupName)
	}
Y
yukun 已提交
704 705

	readOpts := gorocksdb.NewDefaultReadOptions()
Y
yukun 已提交
706
	defer readOpts.Destroy()
X
Xiaofan 已提交
707
	prefix := topicName + "/"
X
Xiaofan 已提交
708
	iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.store, typeutil.AddOne(prefix), readOpts)
Y
yukun 已提交
709 710
	defer iter.Close()

X
Xiaofan 已提交
711 712
	var dataKey string
	if currentID == DefaultMessageID {
X
Xiaofan 已提交
713
		dataKey = prefix
Y
yukun 已提交
714
	} else {
Y
yukun 已提交
715
		dataKey = path.Join(topicName, strconv.FormatInt(currentID.(int64), 10))
Y
yukun 已提交
716
	}
X
Xiaofan 已提交
717
	iter.Seek([]byte(dataKey))
Y
yukun 已提交
718
	consumerMessage := make([]ConsumerMessage, 0, n)
Y
yukun 已提交
719
	offset := 0
X
Xiaofan 已提交
720
	for ; iter.Valid() && offset < n; iter.Next() {
Y
yukun 已提交
721 722
		key := iter.Key()
		val := iter.Value()
Y
yukun 已提交
723 724
		strKey := string(key.Data())
		key.Free()
Y
yukun 已提交
725
		offset++
Y
yukun 已提交
726
		msgID, err := strconv.ParseInt(strKey[len(topicName)+1:], 10, 64)
Y
yukun 已提交
727
		if err != nil {
Y
yukun 已提交
728
			val.Free()
Y
yukun 已提交
729 730 731
			return nil, err
		}
		msg := ConsumerMessage{
732 733 734 735 736 737 738 739 740
			MsgID: msgID,
		}
		origData := val.Data()
		dataLen := len(origData)
		if dataLen == 0 {
			msg.Payload = nil
		} else {
			msg.Payload = make([]byte, dataLen)
			copy(msg.Payload, origData)
Y
yukun 已提交
741 742 743 744
		}
		consumerMessage = append(consumerMessage, msg)
		val.Free()
	}
X
Xiaofan 已提交
745 746 747 748
	// if iterate fail
	if err := iter.Err(); err != nil {
		return nil, err
	}
749
	iterTime := time.Since(start).Milliseconds()
Y
yukun 已提交
750 751 752

	// When already consume to last mes, an empty slice will be returned
	if len(consumerMessage) == 0 {
Y
yukun 已提交
753
		// log.Debug("RocksMQ: consumerMessage is empty")
Y
yukun 已提交
754 755 756
		return consumerMessage, nil
	}

757 758 759 760
	newID := consumerMessage[len(consumerMessage)-1].MsgID
	moveConsumePosTime := time.Since(start).Milliseconds()

	err := rmq.moveConsumePos(topicName, groupName, newID+1)
X
Xiaofan 已提交
761 762 763 764
	if err != nil {
		return nil, err
	}

X
Xiaofan 已提交
765
	// TODO add this to monitor metrics
Y
yukun 已提交
766
	getConsumeTime := time.Since(start).Milliseconds()
X
Xiaofan 已提交
767
	if getConsumeTime > 200 {
Y
yukun 已提交
768
		log.Warn("rocksmq consume too slowly", zap.String("topic", topicName),
769 770
			zap.Int64("get lock elapse", getLockTime),
			zap.Int64("iterator elapse", iterTime-getLockTime),
771
			zap.Int64("moveConsumePosTime elapse", moveConsumePosTime-iterTime),
772
			zap.Int64("total consume elapse", getConsumeTime))
Y
yukun 已提交
773
	}
Y
yukun 已提交
774 775 776
	return consumerMessage, nil
}

777
// seek is used for internal call without the topicMu
Y
yukun 已提交
778
func (rmq *rocksmq) seek(topicName string, groupName string, msgID UniqueID) error {
779 780
	rmq.storeMu.Lock()
	defer rmq.storeMu.Unlock()
Y
yukun 已提交
781
	key := constructCurrentID(topicName, groupName)
X
Xiaofan 已提交
782 783 784
	_, ok := rmq.consumersID.Load(key)
	if !ok {
		return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)
Y
yukun 已提交
785
	}
X
Xiaofan 已提交
786

Y
yukun 已提交
787
	storeKey := path.Join(topicName, strconv.FormatInt(msgID, 10))
Y
yukun 已提交
788 789
	opts := gorocksdb.NewDefaultReadOptions()
	defer opts.Destroy()
Y
yukun 已提交
790
	val, err := rmq.store.Get(opts, []byte(storeKey))
Y
yukun 已提交
791 792 793
	if err != nil {
		return err
	}
X
Xiaofan 已提交
794
	defer val.Free()
X
Xiaofan 已提交
795
	if !val.Exists() {
X
Xiaofan 已提交
796 797
		log.Warn("RocksMQ: trying to seek to no exist position, reset current id",
			zap.String("topic", topicName), zap.String("group", groupName), zap.Int64("msgId", msgID))
798
		err := rmq.moveConsumePos(topicName, groupName, DefaultMessageID)
X
Xiaofan 已提交
799
		//skip seek if key is not found, this is the behavior as pulsar
800
		return err
X
Xiaofan 已提交
801
	}
X
Xiaofan 已提交
802
	/* Step II: update current_id */
803 804
	err = rmq.moveConsumePos(topicName, groupName, msgID)
	return err
X
Xiaofan 已提交
805 806
}

807
func (rmq *rocksmq) moveConsumePos(topicName string, groupName string, msgID UniqueID) error {
X
Xiaofan 已提交
808
	key := constructCurrentID(topicName, groupName)
809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827
	oldPos, ok := rmq.consumersID.Load(key)
	if !ok {
		return errors.New("move unknown consumer")
	}

	if msgID < oldPos.(UniqueID) {
		log.Warn("RocksMQ: trying to move Consume position backward",
			zap.String("key", key), zap.Int64("oldPos", oldPos.(UniqueID)), zap.Int64("newPos", msgID))
		panic("move consume position backward")
	}

	//update ack if position move forward
	err := rmq.updateAckedInfo(topicName, groupName, oldPos.(UniqueID), msgID-1)
	if err != nil {
		log.Warn("failed to update acked info ", zap.String("topic", topicName),
			zap.String("groupName", groupName), zap.Error(err))
		return err
	}

X
Xiaofan 已提交
828
	rmq.consumersID.Store(key, msgID)
829
	return nil
Y
yukun 已提交
830
}
Y
yukun 已提交
831

Y
yukun 已提交
832 833
// Seek updates the current id to the given msgID
func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) error {
Y
yukun 已提交
834 835 836
	if rmq.isClosed() {
		return errors.New(RmqNotServingErrMsg)
	}
Y
yukun 已提交
837 838 839
	/* Step I: Check if key exists */
	ll, ok := topicMu.Load(topicName)
	if !ok {
840
		return fmt.Errorf("Topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist)
Y
yukun 已提交
841 842 843 844 845 846 847 848
	}
	lock, ok := ll.(*sync.Mutex)
	if !ok {
		return fmt.Errorf("get mutex failed, topic name = %s", topicName)
	}
	lock.Lock()
	defer lock.Unlock()

X
Xiaofan 已提交
849 850 851 852
	err := rmq.seek(topicName, groupName, msgID)
	if err != nil {
		return err
	}
853
	log.Info("successfully seek", zap.String("topic", topicName), zap.String("group", groupName), zap.Uint64("msgId", uint64(msgID)))
X
Xiaofan 已提交
854
	return nil
Y
yukun 已提交
855 856
}

857
// Only for test
858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884
func (rmq *rocksmq) ForceSeek(topicName string, groupName string, msgID UniqueID) error {
	log.Warn("Use method ForceSeek that only for test")
	if rmq.isClosed() {
		return errors.New(RmqNotServingErrMsg)
	}
	/* Step I: Check if key exists */
	ll, ok := topicMu.Load(topicName)
	if !ok {
		return fmt.Errorf("Topic %s not exist, %w", topicName, mqwrapper.ErrTopicNotExist)
	}
	lock, ok := ll.(*sync.Mutex)
	if !ok {
		return fmt.Errorf("get mutex failed, topic name = %s", topicName)
	}
	lock.Lock()
	defer lock.Unlock()
	rmq.storeMu.Lock()
	defer rmq.storeMu.Unlock()

	key := constructCurrentID(topicName, groupName)
	_, ok = rmq.consumersID.Load(key)
	if !ok {
		return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)
	}

	rmq.consumersID.Store(key, msgID)

885
	log.Info("successfully force seek", zap.String("topic", topicName),
886 887 888 889
		zap.String("group", groupName), zap.Uint64("msgID", uint64(msgID)))
	return nil
}

X
Xiaofan 已提交
890
// SeekToLatest updates current id to the msg id of latest message + 1
891
func (rmq *rocksmq) SeekToLatest(topicName, groupName string) error {
Y
yukun 已提交
892 893 894
	if rmq.isClosed() {
		return errors.New(RmqNotServingErrMsg)
	}
895 896
	rmq.storeMu.Lock()
	defer rmq.storeMu.Unlock()
897

898
	key := constructCurrentID(topicName, groupName)
X
Xiaofan 已提交
899 900
	_, ok := rmq.consumersID.Load(key)
	if !ok {
901 902 903
		return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)
	}

904 905 906 907 908 909
	msgID, err := rmq.getLatestMsg(topicName)
	if err != nil {
		return err
	}

	// current msgID should not be included
910 911 912 913 914
	err = rmq.moveConsumePos(topicName, groupName, msgID+1)
	if err != nil {
		return err
	}

915
	log.Info("successfully seek to latest", zap.String("topic", topicName),
916 917 918 919 920
		zap.String("group", groupName), zap.Uint64("latest", uint64(msgID+1)))
	return nil
}

func (rmq *rocksmq) getLatestMsg(topicName string) (int64, error) {
921 922
	readOpts := gorocksdb.NewDefaultReadOptions()
	defer readOpts.Destroy()
X
Xiaofan 已提交
923
	iter := rocksdbkv.NewRocksIterator(rmq.store, readOpts)
924 925
	defer iter.Close()

X
Xiaofan 已提交
926
	prefix := topicName + "/"
X
Xiaofan 已提交
927
	// seek to the last message of thie topic
X
Xiaofan 已提交
928
	iter.SeekForPrev([]byte(typeutil.AddOne(prefix)))
X
Xiaofan 已提交
929

X
Xiaofan 已提交
930 931
	// if iterate fail
	if err := iter.Err(); err != nil {
932
		return DefaultMessageID, err
X
Xiaofan 已提交
933
	}
Y
yukun 已提交
934
	// should find the last key we written into, start with fixTopicName/
X
Xiaofan 已提交
935 936
	// if not find, start from 0
	if !iter.Valid() {
937
		return DefaultMessageID, nil
C
congqixia 已提交
938
	}
939

X
Xiaofan 已提交
940 941
	iKey := iter.Key()
	seekMsgID := string(iKey.Data())
X
Xiaofan 已提交
942 943 944
	if iKey != nil {
		iKey.Free()
	}
945

X
Xiaofan 已提交
946
	// if find message is not belong to current channel, start from 0
X
Xiaofan 已提交
947
	if !strings.Contains(seekMsgID, prefix) {
948
		return DefaultMessageID, nil
X
Xiaofan 已提交
949
	}
950

Y
yukun 已提交
951
	msgID, err := strconv.ParseInt(seekMsgID[len(topicName)+1:], 10, 64)
952
	if err != nil {
953
		return DefaultMessageID, err
954
	}
955 956

	return msgID, nil
957 958
}

959
// Notify sends a mutex in MsgMutex channel to tell consumers to consume
Y
yukun 已提交
960 961 962 963 964 965 966 967 968 969 970 971 972 973
func (rmq *rocksmq) Notify(topicName, groupName string) {
	if vals, ok := rmq.consumers.Load(topicName); ok {
		for _, v := range vals.([]*Consumer) {
			if v.GroupName == groupName {
				select {
				case v.MsgMutex <- struct{}{}:
					continue
				default:
					continue
				}
			}
		}
	}
}
Y
yukun 已提交
974

975
// updateAckedInfo update acked informations for retention after consume
976
func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, firstID UniqueID, lastID UniqueID) error {
X
Xiaofan 已提交
977
	// 1. Try to get the page id between first ID and last ID of ids
X
Xiaofan 已提交
978
	pageMsgPrefix := constructKey(PageMsgSizeTitle, topicName) + "/"
Y
yukun 已提交
979 980
	readOpts := gorocksdb.NewDefaultReadOptions()
	defer readOpts.Destroy()
X
Xiaofan 已提交
981
	pageMsgFirstKey := pageMsgPrefix + strconv.FormatInt(firstID, 10)
X
Xiaofan 已提交
982

X
Xiaofan 已提交
983
	iter := rocksdbkv.NewRocksIteratorWithUpperBound(rmq.kv.(*rocksdbkv.RocksdbKV).DB, typeutil.AddOne(pageMsgPrefix), readOpts)
Y
yukun 已提交
984
	defer iter.Close()
985
	var pageIDs []UniqueID
X
Xiaofan 已提交
986

X
Xiaofan 已提交
987
	for iter.Seek([]byte(pageMsgFirstKey)); iter.Valid(); iter.Next() {
Y
yukun 已提交
988
		key := iter.Key()
X
Xiaofan 已提交
989
		pageID, err := parsePageID(string(key.Data()))
Y
yukun 已提交
990 991 992 993 994 995 996 997 998 999 1000 1001
		if key != nil {
			key.Free()
		}
		if err != nil {
			return err
		}
		if pageID <= lastID {
			pageIDs = append(pageIDs, pageID)
		} else {
			break
		}
	}
X
Xiaofan 已提交
1002 1003 1004
	if err := iter.Err(); err != nil {
		return err
	}
Y
yukun 已提交
1005 1006 1007
	if len(pageIDs) == 0 {
		return nil
	}
X
Xiaofan 已提交
1008
	fixedAckedTsKey := constructKey(AckedTsTitle, topicName)
Y
yukun 已提交
1009

X
Xiaofan 已提交
1010
	// 2. Update acked ts and acked size for pageIDs
Y
yukun 已提交
1011
	if vals, ok := rmq.consumers.Load(topicName); ok {
Y
yukun 已提交
1012 1013
		consumers, ok := vals.([]*Consumer)
		if !ok || len(consumers) == 0 {
1014
			log.Error("update ack with no consumer", zap.String("topic", topicName))
1015
			return nil
Y
yukun 已提交
1016
		}
X
Xiaofan 已提交
1017 1018

		// find min id of all consumer
1019
		var minBeginID UniqueID = lastID
X
Xiaofan 已提交
1020
		for _, consumer := range consumers {
1021 1022 1023 1024 1025 1026 1027 1028 1029
			if consumer.GroupName != groupName {
				key := constructCurrentID(consumer.Topic, consumer.GroupName)
				beginID, ok := rmq.consumersID.Load(key)
				if !ok {
					return fmt.Errorf("currentID of topicName=%s, groupName=%s not exist", consumer.Topic, consumer.GroupName)
				}
				if beginID.(UniqueID) < minBeginID {
					minBeginID = beginID.(UniqueID)
				}
Y
yukun 已提交
1030 1031
			}
		}
Y
yukun 已提交
1032 1033 1034

		nowTs := strconv.FormatInt(time.Now().Unix(), 10)
		ackedTsKvs := make(map[string]string)
X
Xiaofan 已提交
1035
		// update ackedTs, if page is all acked, then ackedTs is set
Y
yukun 已提交
1036 1037 1038 1039 1040 1041 1042
		for _, pID := range pageIDs {
			if pID <= minBeginID {
				// Update acked info for message pID
				pageAckedTsKey := path.Join(fixedAckedTsKey, strconv.FormatInt(pID, 10))
				ackedTsKvs[pageAckedTsKey] = nowTs
			}
		}
X
Xiaofan 已提交
1043
		err := rmq.kv.MultiSave(ackedTsKvs)
Y
yukun 已提交
1044 1045
		if err != nil {
			return err
Y
yukun 已提交
1046 1047 1048 1049
		}
	}
	return nil
}