未验证 提交 ca61066b 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12006 from taosdata/feature/tq

enh(tmq): add rebalance global lock
...@@ -1474,7 +1474,6 @@ _err: ...@@ -1474,7 +1474,6 @@ _err:
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or // this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
// deserialization // deserialization
typedef struct { typedef struct {
int8_t* mqInReb;
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe> SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg; } SMqDoRebalanceMsg;
......
...@@ -44,6 +44,11 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); ...@@ -44,6 +44,11 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
bool mndRebTryStart();
void mndRebEnd();
void mndRebCntInc();
void mndRebCntDec();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -36,8 +36,8 @@ typedef struct { ...@@ -36,8 +36,8 @@ typedef struct {
typedef enum { typedef enum {
TEST_TRANS_START_FUNC = 1, TEST_TRANS_START_FUNC = 1,
TEST_TRANS_STOP_FUNC = 2, TEST_TRANS_STOP_FUNC = 2,
CONSUME_TRANS_START_FUNC = 3, MQ_REB_TRANS_START_FUNC = 3,
CONSUME_TRANS_STOP_FUNC = 4, MQ_REB_TRANS_STOP_FUNC = 4,
} ETrnFuncType; } ETrnFuncType;
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
......
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
#define MND_CONSUMER_LOST_HB_CNT 3 #define MND_CONSUMER_LOST_HB_CNT 3
static int8_t mqInRebFlag = 0; static int8_t mqRebLock = 0;
static const char *mndConsumerStatusName(int status); static const char *mndConsumerStatusName(int status);
...@@ -75,6 +75,17 @@ int32_t mndInitConsumer(SMnode *pMnode) { ...@@ -75,6 +75,17 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {} void mndCleanupConsumer(SMnode *pMnode) {}
bool mndRebTryStart() {
int8_t old = atomic_val_compare_exchange_8(&mqRebLock, 0, 1);
return old == 0;
}
void mndRebEnd() { atomic_sub_fetch_8(&mqRebLock, 1); }
void mndRebCntInc() { atomic_add_fetch_8(&mqRebLock, 1); }
void mndRebCntDec() { atomic_sub_fetch_8(&mqRebLock, 1); }
static int32_t mndProcessConsumerLostMsg(SNodeMsg *pMsg) { static int32_t mndProcessConsumerLostMsg(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode; SMnode *pMnode = pMsg->pNode;
SMqConsumerLostMsg *pLostMsg = pMsg->rpcMsg.pCont; SMqConsumerLostMsg *pLostMsg = pMsg->rpcMsg.pCont;
...@@ -143,8 +154,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { ...@@ -143,8 +154,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
void *pIter = NULL; void *pIter = NULL;
// rebalance cannot be parallel // rebalance cannot be parallel
int8_t old = atomic_val_compare_exchange_8(&mqInRebFlag, 0, 1); if (!mndRebTryStart()) {
if (old != 0) {
mInfo("mq rebalance already in progress, do nothing"); mInfo("mq rebalance already in progress, do nothing");
return 0; return 0;
} }
...@@ -152,7 +162,6 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { ...@@ -152,7 +162,6 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg)); SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
// TODO set cleanfp // TODO set cleanfp
pRebMsg->mqInReb = &mqInRebFlag;
// iterate all consumers, find all modification // iterate all consumers, find all modification
while (1) { while (1) {
...@@ -223,7 +232,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { ...@@ -223,7 +232,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
taosHashCleanup(pRebMsg->rebSubHash); taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg); rpcFreeCont(pRebMsg);
mTrace("mq rebalance finished, no modification"); mTrace("mq rebalance finished, no modification");
atomic_store_8(&mqInRebFlag, 0); mndRebEnd();
} }
return 0; return 0;
} }
......
...@@ -308,8 +308,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -308,8 +308,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// sink part // sink part
if (level == 0) { if (level == 0) {
// only for inplace // only for inplace
pTask->sinkType = TASK_SINK__SHOW; pTask->sinkType = TASK_SINK__NONE;
pTask->showSink.reserved = 0;
if (!hasExtraSink) { if (!hasExtraSink) {
#if 1 #if 1
if (pStream->createdBy == STREAM_CREATED_BY__SMA) { if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
...@@ -368,8 +367,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -368,8 +367,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->sourceType = TASK_SOURCE__PIPE; pTask->sourceType = TASK_SOURCE__PIPE;
// sink part // sink part
pTask->sinkType = TASK_SINK__SHOW; pTask->sinkType = TASK_SINK__NONE;
/*pTask->sinkType = TASK_SINK__NONE;*/
// dispatch part // dispatch part
ASSERT(hasExtraSink); ASSERT(hasExtraSink);
...@@ -456,7 +454,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -456,7 +454,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->sourceType = TASK_SOURCE__MERGE; pTask->sourceType = TASK_SOURCE__MERGE;
// sink part // sink part
pTask->sinkType = TASK_SINK__SHOW; pTask->sinkType = TASK_SINK__NONE;
// dispatch part // dispatch part
pTask->dispatchType = TASK_DISPATCH__NONE; pTask->dispatchType = TASK_DISPATCH__NONE;
......
...@@ -47,7 +47,7 @@ void mndCleanupShow(SMnode *pMnode) { ...@@ -47,7 +47,7 @@ void mndCleanupShow(SMnode *pMnode) {
} }
} }
static int32_t convertToRetrieveType(char* name, int32_t len) { static int32_t convertToRetrieveType(char *name, int32_t len) {
int32_t type = -1; int32_t type = -1;
if (strncasecmp(name, TSDB_INS_TABLE_DNODES, len) == 0) { if (strncasecmp(name, TSDB_INS_TABLE_DNODES, len) == 0) {
...@@ -72,8 +72,6 @@ static int32_t convertToRetrieveType(char* name, int32_t len) { ...@@ -72,8 +72,6 @@ static int32_t convertToRetrieveType(char* name, int32_t len) {
// type = TSDB_MGMT_TABLE_INDEX; // type = TSDB_MGMT_TABLE_INDEX;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) {
type = TSDB_MGMT_TABLE_STB; type = TSDB_MGMT_TABLE_STB;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, len) == 0) {
type = TSDB_MGMT_TABLE_STREAMS;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
type = TSDB_MGMT_TABLE_TABLE; type = TSDB_MGMT_TABLE_TABLE;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) {
...@@ -98,12 +96,14 @@ static int32_t convertToRetrieveType(char* name, int32_t len) { ...@@ -98,12 +96,14 @@ static int32_t convertToRetrieveType(char* name, int32_t len) {
type = TSDB_MGMT_TABLE_CONNS; type = TSDB_MGMT_TABLE_CONNS;
} else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, len) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, len) == 0) {
type = TSDB_MGMT_TABLE_QUERIES; type = TSDB_MGMT_TABLE_QUERIES;
} else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, len) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, len) == 0) {
type = TSDB_MGMT_TABLE_VNODES; type = TSDB_MGMT_TABLE_VNODES;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_TOPICS, len) == 0) { } else if (strncasecmp(name, TSDB_PERFS_TABLE_TOPICS, len) == 0) {
type = TSDB_MGMT_TABLE_TOPICS; type = TSDB_MGMT_TABLE_TOPICS;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_STREAMS, len) == 0) {
type = TSDB_MGMT_TABLE_STREAMS;
} else { } else {
// ASSERT(0); // ASSERT(0);
} }
return type; return type;
...@@ -115,12 +115,12 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) { ...@@ -115,12 +115,12 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1); int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1);
if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1); if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1);
int32_t size = sizeof(SShowObj); int32_t size = sizeof(SShowObj);
SShowObj showObj = {0}; SShowObj showObj = {0};
showObj.id = showId; showObj.id = showId;
showObj.pMnode = pMnode; showObj.pMnode = pMnode;
showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb)); showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN); memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
int32_t keepTime = tsShellActivityTimer * 6 * 1000; int32_t keepTime = tsShellActivityTimer * 6 * 1000;
......
...@@ -452,7 +452,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO ...@@ -452,7 +452,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
} }
// 4. TODO commit log: modification log // 4. TODO commit log: modification log
// 5. execution // 5. set cb
mndTransSetCb(pTrans, MQ_REB_TRANS_START_FUNC, MQ_REB_TRANS_STOP_FUNC, NULL, 0);
// 6. execution
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL;
mndTransDrop(pTrans); mndTransDrop(pTrans);
...@@ -518,9 +521,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { ...@@ -518,9 +521,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
} }
// reset flag // reset flag
atomic_store_8(pReq->mqInReb, 0);
mInfo("mq rebalance completed successfully"); mInfo("mq rebalance completed successfully");
taosHashCleanup(pReq->rebSubHash); taosHashCleanup(pReq->rebSubHash);
mndRebEnd();
return 0; return 0;
} }
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndTrans.h" #include "mndTrans.h"
#include "mndAuth.h" #include "mndAuth.h"
#include "mndConsumer.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndSync.h" #include "mndSync.h"
...@@ -442,6 +443,10 @@ static TransCbFp mndTransGetCbFp(ETrnFuncType ftype) { ...@@ -442,6 +443,10 @@ static TransCbFp mndTransGetCbFp(ETrnFuncType ftype) {
return mndTransTestStartFunc; return mndTransTestStartFunc;
case TEST_TRANS_STOP_FUNC: case TEST_TRANS_STOP_FUNC:
return mndTransTestStopFunc; return mndTransTestStopFunc;
case MQ_REB_TRANS_START_FUNC:
return mndRebCntInc;
case MQ_REB_TRANS_STOP_FUNC:
return mndRebCntDec;
default: default:
return NULL; return NULL;
} }
......
...@@ -158,8 +158,6 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in ...@@ -158,8 +158,6 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
// //
} else if (pTask->sinkType == TASK_SINK__FETCH) { } else if (pTask->sinkType == TASK_SINK__FETCH) {
// //
} else if (pTask->sinkType == TASK_SINK__SHOW) {
blockDebugShowData(pRes);
} else { } else {
ASSERT(pTask->sinkType == TASK_SINK__NONE); ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
...@@ -280,8 +278,6 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { ...@@ -280,8 +278,6 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) { } else if (pTask->sinkType == TASK_SINK__FETCH) {
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SHOW) {
if (tEncodeI8(pEncoder, pTask->showSink.reserved) < 0) return -1;
} else { } else {
ASSERT(pTask->sinkType == TASK_SINK__NONE); ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
...@@ -326,8 +322,6 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { ...@@ -326,8 +322,6 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) { } else if (pTask->sinkType == TASK_SINK__FETCH) {
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SHOW) {
if (tDecodeI8(pDecoder, &pTask->showSink.reserved) < 0) return -1;
} else { } else {
ASSERT(pTask->sinkType == TASK_SINK__NONE); ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册