diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 89f7cd6c39d86c15da0eebb590bb6c963d4ad215..145fc90a77633f3766031ff91bd9ef9b6c90c104 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1474,7 +1474,6 @@ _err: // this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or // deserialization typedef struct { - int8_t* mqInReb; SHashObj* rebSubHash; // SHashObj } SMqDoRebalanceMsg; diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 19e202dddc0c56cc7679d7a3d319ab7918e36528..a8bfe91cbf8edfb225510253b9536115b21cea30 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -44,6 +44,11 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); +bool mndRebTryStart(); +void mndRebEnd(); +void mndRebCntInc(); +void mndRebCntDec(); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 5b5aff7c86ef204a1e75d6b0d20a295c454ab57c..7644ec3c4cab709e10ed89b7443c5bca466f6f7e 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -36,8 +36,8 @@ typedef struct { typedef enum { TEST_TRANS_START_FUNC = 1, TEST_TRANS_STOP_FUNC = 2, - CONSUME_TRANS_START_FUNC = 3, - CONSUME_TRANS_STOP_FUNC = 4, + MQ_REB_TRANS_START_FUNC = 3, + MQ_REB_TRANS_STOP_FUNC = 4, } ETrnFuncType; typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 025f61dc87694d2a6f872f6200470d83445e8b7c..b17772bdb2184c32b92dd87911490222967b24fb 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -35,7 +35,7 @@ #define MND_CONSUMER_LOST_HB_CNT 3 -static int8_t mqInRebFlag = 0; +static int8_t mqRebLock = 0; static const char *mndConsumerStatusName(int status); @@ -75,6 +75,17 @@ int32_t mndInitConsumer(SMnode *pMnode) { void mndCleanupConsumer(SMnode *pMnode) {} +bool mndRebTryStart() { + int8_t old = atomic_val_compare_exchange_8(&mqRebLock, 0, 1); + return old == 0; +} + +void mndRebEnd() { atomic_sub_fetch_8(&mqRebLock, 1); } + +void mndRebCntInc() { atomic_add_fetch_8(&mqRebLock, 1); } + +void mndRebCntDec() { atomic_sub_fetch_8(&mqRebLock, 1); } + static int32_t mndProcessConsumerLostMsg(SNodeMsg *pMsg) { SMnode *pMnode = pMsg->pNode; SMqConsumerLostMsg *pLostMsg = pMsg->rpcMsg.pCont; @@ -143,8 +154,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { void *pIter = NULL; // rebalance cannot be parallel - int8_t old = atomic_val_compare_exchange_8(&mqInRebFlag, 0, 1); - if (old != 0) { + if (!mndRebTryStart()) { mInfo("mq rebalance already in progress, do nothing"); return 0; } @@ -152,7 +162,6 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg)); pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); // TODO set cleanfp - pRebMsg->mqInReb = &mqInRebFlag; // iterate all consumers, find all modification while (1) { @@ -223,7 +232,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { taosHashCleanup(pRebMsg->rebSubHash); rpcFreeCont(pRebMsg); mTrace("mq rebalance finished, no modification"); - atomic_store_8(&mqInRebFlag, 0); + mndRebEnd(); } return 0; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 3f4b2fe145ea5121cf42a56275daf285de16279d..b760dd6aa8d584accd2569ca4605f61934cab12d 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -308,8 +308,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // sink part if (level == 0) { // only for inplace - pTask->sinkType = TASK_SINK__SHOW; - pTask->showSink.reserved = 0; + pTask->sinkType = TASK_SINK__NONE; if (!hasExtraSink) { #if 1 if (pStream->createdBy == STREAM_CREATED_BY__SMA) { @@ -368,8 +367,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pTask->sourceType = TASK_SOURCE__PIPE; // sink part - pTask->sinkType = TASK_SINK__SHOW; - /*pTask->sinkType = TASK_SINK__NONE;*/ + pTask->sinkType = TASK_SINK__NONE; // dispatch part ASSERT(hasExtraSink); @@ -456,7 +454,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pTask->sourceType = TASK_SOURCE__MERGE; // sink part - pTask->sinkType = TASK_SINK__SHOW; + pTask->sinkType = TASK_SINK__NONE; // dispatch part pTask->dispatchType = TASK_DISPATCH__NONE; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 94366a241d697025f0d0682d8b2bab24df12ad76..0a3789402b4a1e5b7ca0b1d7d7c85da72214dc09 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -47,7 +47,7 @@ void mndCleanupShow(SMnode *pMnode) { } } -static int32_t convertToRetrieveType(char* name, int32_t len) { +static int32_t convertToRetrieveType(char *name, int32_t len) { int32_t type = -1; if (strncasecmp(name, TSDB_INS_TABLE_DNODES, len) == 0) { @@ -72,8 +72,6 @@ static int32_t convertToRetrieveType(char* name, int32_t len) { // type = TSDB_MGMT_TABLE_INDEX; } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) { type = TSDB_MGMT_TABLE_STB; - } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, len) == 0) { - type = TSDB_MGMT_TABLE_STREAMS; } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) { type = TSDB_MGMT_TABLE_TABLE; } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) { @@ -98,12 +96,14 @@ static int32_t convertToRetrieveType(char* name, int32_t len) { type = TSDB_MGMT_TABLE_CONNS; } else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, len) == 0) { type = TSDB_MGMT_TABLE_QUERIES; - } else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, len) == 0) { + } else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, len) == 0) { type = TSDB_MGMT_TABLE_VNODES; } else if (strncasecmp(name, TSDB_PERFS_TABLE_TOPICS, len) == 0) { type = TSDB_MGMT_TABLE_TOPICS; + } else if (strncasecmp(name, TSDB_PERFS_TABLE_STREAMS, len) == 0) { + type = TSDB_MGMT_TABLE_STREAMS; } else { -// ASSERT(0); + // ASSERT(0); } return type; @@ -115,12 +115,12 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) { int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1); if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1); - int32_t size = sizeof(SShowObj); + int32_t size = sizeof(SShowObj); SShowObj showObj = {0}; - showObj.id = showId; + showObj.id = showId; showObj.pMnode = pMnode; - showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb)); + showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb)); memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN); int32_t keepTime = tsShellActivityTimer * 6 * 1000; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index aeecab34cb0e61a3251dc7506ec5b174a69109d9..2b3af85066b8658d23ca05a7ef1220299c5db807 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -452,7 +452,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO } // 4. TODO commit log: modification log - // 5. execution + // 5. set cb + mndTransSetCb(pTrans, MQ_REB_TRANS_START_FUNC, MQ_REB_TRANS_STOP_FUNC, NULL, 0); + + // 6. execution if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL; mndTransDrop(pTrans); @@ -518,9 +521,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { } // reset flag - atomic_store_8(pReq->mqInReb, 0); mInfo("mq rebalance completed successfully"); taosHashCleanup(pReq->rebSubHash); + mndRebEnd(); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index c08b0f6db94b19f270f608690bca875c0664a763..2d10c4a7a5623cb35fc161b882883c690b917461 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "mndTrans.h" #include "mndAuth.h" +#include "mndConsumer.h" #include "mndDb.h" #include "mndShow.h" #include "mndSync.h" @@ -442,6 +443,10 @@ static TransCbFp mndTransGetCbFp(ETrnFuncType ftype) { return mndTransTestStartFunc; case TEST_TRANS_STOP_FUNC: return mndTransTestStopFunc; + case MQ_REB_TRANS_START_FUNC: + return mndRebCntInc; + case MQ_REB_TRANS_STOP_FUNC: + return mndRebCntDec; default: return NULL; } diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 8aaaa414ca265f31f09f26f877b2ec3dea7ce5b3..b06c774ed3cb289afb5b08073f1d3dc9602b6bda 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -158,8 +158,6 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // } else if (pTask->sinkType == TASK_SINK__FETCH) { // - } else if (pTask->sinkType == TASK_SINK__SHOW) { - blockDebugShowData(pRes); } else { ASSERT(pTask->sinkType == TASK_SINK__NONE); } @@ -280,8 +278,6 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__FETCH) { if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; - } else if (pTask->sinkType == TASK_SINK__SHOW) { - if (tEncodeI8(pEncoder, pTask->showSink.reserved) < 0) return -1; } else { ASSERT(pTask->sinkType == TASK_SINK__NONE); } @@ -326,8 +322,6 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__FETCH) { if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; - } else if (pTask->sinkType == TASK_SINK__SHOW) { - if (tDecodeI8(pDecoder, &pTask->showSink.reserved) < 0) return -1; } else { ASSERT(pTask->sinkType == TASK_SINK__NONE); }