未验证 提交 5abeacc2 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #17711 from taosdata/feature/stream

refactor(mnode): remove unused code
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_OFFSET_H_
#define _TD_MND_OFFSET_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mndInitOffset(SMnode *pMnode);
void mndCleanupOffset(SMnode *pMnode);
SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key);
void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset);
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset);
SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw);
int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs);
static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) {
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
}
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
int32_t mndDropOffsetBySubKey(SMnode *pMnode, STrans *pTrans, const char *subKey);
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_OFFSET_H_*/
......@@ -31,6 +31,10 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) {
return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
}
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
......
......@@ -18,7 +18,6 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndStb.h"
......@@ -408,12 +407,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
.offset = -1,
};
// 2.2.2 fetch vg offset
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey);
if (pOffsetObj != NULL) {
vgEp.offset = atomic_load_64(&pOffsetObj->offset);
mndReleaseOffset(pMnode, pOffsetObj);
}
taosArrayPush(topicEp.vgs, &vgEp);
}
taosArrayPush(rsp.topics, &topicEp);
......
......@@ -17,7 +17,6 @@
#include "mndDb.h"
#include "mndCluster.h"
#include "mndDnode.h"
#include "mndOffset.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndSma.h"
......
......@@ -23,7 +23,6 @@
#include "mndGrant.h"
#include "mndInfoSchema.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndPerfSchema.h"
#include "mndPrivilege.h"
#include "mndProfile.h"
......@@ -305,7 +304,6 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndOffset.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tname.h"
#define MND_OFFSET_VER_NUMBER 1
#define MND_OFFSET_RESERVE_SIZE 64
static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset);
static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset);
static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOffset, SMqOffsetObj *pNewOffset);
static int32_t mndProcessCommitOffsetReq(SRpcMsg *pReq);
int32_t mndInitOffset(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_OFFSET,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndOffsetActionEncode,
.decodeFp = (SdbDecodeFp)mndOffsetActionDecode,
.insertFp = (SdbInsertFp)mndOffsetActionInsert,
.updateFp = (SdbUpdateFp)mndOffsetActionUpdate,
.deleteFp = (SdbDeleteFp)mndOffsetActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_MQ_COMMIT_OFFSET, mndProcessCommitOffsetReq);
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupOffset(SMnode *pMnode) {}
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic) {
int32_t i = 0;
while (pOffset->key[i] != ':') i++;
while (pOffset->key[i] != ':') i++;
if (strcmp(&pOffset->key[i + 1], topic) == 0) return true;
return false;
}
bool mndOffsetFromSubKey(SMqOffsetObj *pOffset, const char *subKey) {
int32_t i = 0;
while (pOffset->key[i] != ':') i++;
if (strcmp(&pOffset->key[i + 1], subKey) == 0) return true;
return false;
}
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
int32_t tlen = tEncodeSMqOffsetObj(NULL, pOffset);
int32_t size = sizeof(int32_t) + tlen + MND_OFFSET_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_OFFSET, MND_OFFSET_VER_NUMBER, size);
if (pRaw == NULL) goto OFFSET_ENCODE_OVER;
buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto OFFSET_ENCODE_OVER;
void *abuf = buf;
tEncodeSMqOffsetObj(&abuf, pOffset);
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, OFFSET_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, OFFSET_ENCODE_OVER);
terrno = TSDB_CODE_SUCCESS;
OFFSET_ENCODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("offset:%s, failed to encode to raw:%p since %s", pOffset->key, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("offset:%s, encode to raw:%p, row:%p", pOffset->key, pRaw, pOffset);
return pRaw;
}
SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto OFFSET_DECODE_OVER;
if (sver != MND_OFFSET_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto OFFSET_DECODE_OVER;
}
int32_t size = sizeof(SMqOffsetObj);
SSdbRow *pRow = sdbAllocRow(size);
if (pRow == NULL) goto OFFSET_DECODE_OVER;
SMqOffsetObj *pOffset = sdbGetRowObj(pRow);
if (pOffset == NULL) goto OFFSET_DECODE_OVER;
int32_t dataPos = 0;
int32_t tlen;
SDB_GET_INT32(pRaw, dataPos, &tlen, OFFSET_DECODE_OVER);
buf = taosMemoryMalloc(tlen + 1);
if (buf == NULL) goto OFFSET_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OFFSET_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_OFFSET_RESERVE_SIZE, OFFSET_DECODE_OVER);
if (tDecodeSMqOffsetObj(buf, pOffset) == NULL) {
goto OFFSET_DECODE_OVER;
}
terrno = TSDB_CODE_SUCCESS;
OFFSET_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("offset:%s, failed to decode from raw:%p since %s", pOffset->key, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
mTrace("offset:%s, decode from raw:%p, row:%p", pOffset->key, pRaw, pOffset);
return pRow;
}
int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) {
int32_t sz = taosArrayGetSize(vgs);
for (int32_t i = 0; i < sz; i++) {
int32_t vgId = *(int32_t *)taosArrayGet(vgs, i);
SMqOffsetObj offsetObj = {0};
if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, vgId) < 0) {
return -1;
}
// TODO assign db
offsetObj.offset = -1;
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(&offsetObj);
if (pOffsetRaw == NULL) {
return -1;
}
(void)sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
// commit log or redo log?
if (mndTransAppendRedolog(pTrans, pOffsetRaw) < 0) {
return -1;
}
}
return 0;
}
static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
char key[TSDB_PARTITION_KEY_LEN];
SMnode *pMnode = pMsg->info.node;
char *msgStr = pMsg->pCont;
SMqCMCommitOffsetReq commitOffsetReq;
SDecoder decoder;
tDecoderInit(&decoder, msgStr, pMsg->contLen);
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "commit-offset");
if (pTrans == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tDecoderClear(&decoder);
return -1;
}
for (int32_t i = 0; i < commitOffsetReq.num; i++) {
SMqOffset *pOffset = &commitOffsetReq.offsets[i];
mInfo("commit offset %" PRId64 " to vgId:%d of consumer group %s on topic %s", pOffset->offset, pOffset->vgId,
pOffset->cgroup, pOffset->topicName);
if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) {
mError("submit offset to topic %s failed", pOffset->topicName);
return -1;
}
bool create = false;
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
if (pOffsetObj == NULL) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOffset->topicName);
if (pTopic == NULL) {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr());
continue;
}
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj));
pOffsetObj->dbUid = pTopic->dbUid;
mndReleaseTopic(pMnode, pTopic);
memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN);
create = true;
}
pOffsetObj->offset = pOffset->offset;
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj);
(void)sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY);
mndTransAppendCommitlog(pTrans, pOffsetRaw);
if (create) {
taosMemoryFree(pOffsetObj);
} else {
mndReleaseOffset(pMnode, pOffsetObj);
}
}
tDecoderClear(&decoder);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("mq-commit-offset-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset) {
mTrace("offset:%s, perform insert action", pOffset->key);
return 0;
}
static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset) {
mTrace("offset:%s, perform delete action", pOffset->key);
return 0;
}
static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOldOffset, SMqOffsetObj *pNewOffset) {
mTrace("offset:%s, perform update action", pOldOffset->key);
atomic_store_64(&pOldOffset->offset, pNewOffset->offset);
return 0;
}
SMqOffsetObj *mndAcquireOffset(SMnode *pMnode, const char *key) {
SSdb *pSdb = pMnode->pSdb;
SMqOffsetObj *pOffset = sdbAcquire(pSdb, SDB_OFFSET, key);
if (pOffset == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_OFFSET_NOT_EXIST;
}
return pOffset;
}
void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pOffset);
}
static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) {
SSdbRaw *pCommitRaw = mndOffsetActionEncode(pOffset);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetDropOffsetRedoLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) {
SSdbRaw *pRedoRaw = mndOffsetActionEncode(pOffset);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqOffsetObj *pOffset = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
if (pIter == NULL) break;
if (pOffset->dbUid != pDb->uid) {
sdbRelease(pSdb, pOffset);
continue;
}
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset);
sdbCancelFetch(pSdb, pIter);
code = -1;
break;
}
sdbRelease(pSdb, pOffset);
}
return code;
}
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqOffsetObj *pOffset = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
if (pIter == NULL) break;
if (!mndOffsetFromTopic(pOffset, topic)) {
sdbRelease(pSdb, pOffset);
continue;
}
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset);
goto END;
}
sdbRelease(pSdb, pOffset);
}
code = 0;
END:
return code;
}
int32_t mndDropOffsetBySubKey(SMnode *pMnode, STrans *pTrans, const char *subKey) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqOffsetObj *pOffset = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
if (pIter == NULL) break;
if (!mndOffsetFromSubKey(pOffset, subKey)) {
sdbRelease(pSdb, pOffset);
continue;
}
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset);
goto END;
}
sdbRelease(pSdb, pOffset);
}
code = 0;
END:
return code;
}
......@@ -18,7 +18,6 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
#include "mndSnode.h"
#include "mndStb.h"
......
......@@ -19,7 +19,6 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndScheduler.h"
#include "mndShow.h"
#include "mndStb.h"
......@@ -683,12 +682,6 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
if (mndDropOffsetBySubKey(pMnode, pTrans, pSub->key) < 0) {
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return -1;
}
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
......
......@@ -18,7 +18,6 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndStb.h"
......
......@@ -671,8 +671,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (tqTaosxScanLog(pTq, pHandle, pCont, &taosxRsp) < 0) {
/*ASSERT(0);*/
}
// TODO batch optimization:
// TODO continue scan until meeting batch requirement
if (taosxRsp.blockNum > 0 /* threshold */) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册