From d75ab9b3ff053db2ea017bd1c1fdb5b70f4037a1 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 20 Apr 2022 21:36:55 +0800 Subject: [PATCH] refactor --- example/src/tmq.c | 3 +- include/common/tmsg.h | 79 ++++++- source/client/inc/clientInt.h | 60 ++--- source/client/src/clientMain.c | 35 ++- source/client/src/tmq.c | 206 ++++++---------- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 11 +- source/dnode/mnode/impl/src/mndConsumer.c | 15 +- source/dnode/mnode/impl/src/mndDef.c | 29 ++- source/dnode/mnode/impl/src/mndScheduler.c | 17 +- source/dnode/mnode/impl/src/mndSubscribe.c | 74 +++--- source/dnode/mnode/impl/src/mnode.c | 6 +- source/dnode/vnode/src/inc/tq.h | 49 ++-- source/dnode/vnode/src/tq/tq.c | 247 +++++++++----------- source/dnode/vnode/src/tq/tqMetaStore.c | 1 - source/dnode/vnode/src/vnd/vnodeSvr.c | 10 +- source/os/src/osFile.c | 40 ++-- source/util/src/tarray.c | 7 +- tests/script/tsim/tmq/basic1.sim | 4 +- tests/test/c/tmqSim.c | 6 +- 20 files changed, 476 insertions(+), 424 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index fdd26bc95d..4847e243a1 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -22,6 +22,7 @@ static int running = 1; static void msg_process(TAOS_RES* msg) { char buf[1024]; + memset(buf, 0, 1024); printf("topic: %s\n", tmq_get_topic_name(msg)); printf("vg:%d\n", tmq_get_vgroup_id(msg)); while (1) { @@ -220,7 +221,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { msg_process(tmqmessage); tmq_message_destroy(tmqmessage); - if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); + /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ } } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ecb5cd3d61..2b480a4067 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2396,6 +2396,7 @@ typedef struct { int64_t consumerId; } SMqRspHead; +#if 0 typedef struct { SMsgHead head; @@ -2409,6 +2410,17 @@ typedef struct { uint64_t reqId; char topic[TSDB_TOPIC_FNAME_LEN]; } SMqPollReq; +#endif + +typedef struct { + SMsgHead head; + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; + int32_t epoch; + uint64_t reqId; + int64_t consumerId; + int64_t blockingTime; + int64_t currentOffset; +} SMqPollReqV2; typedef struct { int32_t vgId; @@ -2482,6 +2494,71 @@ static FORCE_INLINE void* tDecodeSMqPollRspV2(const void* buf, SMqPollRspV2* pRs return (void*)buf; } +typedef struct { + SMqRspHead head; + int64_t reqOffset; + int64_t rspOffset; + int32_t skipLogNum; + int32_t blockNum; + int8_t withTbName; + int8_t withSchema; + int8_t withTag; + int8_t withTagSchema; + SArray* blockDataLen; // SArray + SArray* blockData; // SArray + SArray* blockTbName; // SArray + SArray* blockSchema; // SArray + SArray* blockTags; // SArray + SArray* blockTagSchema; // SArray +} SMqDataBlkRsp; + +static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp* pRsp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); + tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); + tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); + tlen += taosEncodeFixedI32(buf, pRsp->blockNum); + if (pRsp->blockNum != 0) { + tlen += taosEncodeFixedI8(buf, pRsp->withTbName); + tlen += taosEncodeFixedI8(buf, pRsp->withSchema); + tlen += taosEncodeFixedI8(buf, pRsp->withTag); + tlen += taosEncodeFixedI8(buf, pRsp->withTagSchema); + + for (int32_t i = 0; i < pRsp->blockNum; i++) { + int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i); + void* data = taosArrayGetP(pRsp->blockData, i); + tlen += taosEncodeFixedI32(buf, bLen); + tlen += taosEncodeBinary(buf, data, bLen); + } + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* pRsp) { + buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); + buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); + buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); + buf = taosDecodeFixedI32(buf, &pRsp->blockNum); + pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*)); + pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*)); + if (pRsp->blockNum != 0) { + buf = taosDecodeFixedI8(buf, &pRsp->withTbName); + buf = taosDecodeFixedI8(buf, &pRsp->withSchema); + buf = taosDecodeFixedI8(buf, &pRsp->withTag); + buf = taosDecodeFixedI8(buf, &pRsp->withTagSchema); + + for (int32_t i = 0; i < pRsp->blockNum; i++) { + int32_t bLen = 0; + void* data = NULL; + buf = taosDecodeFixedI32(buf, &bLen); + buf = taosDecodeBinary(buf, &data, bLen); + taosArrayPush(pRsp->blockDataLen, &bLen); + taosArrayPush(pRsp->blockData, &data); + } + } + return (void*)buf; +} + typedef struct { SMqRspHead head; char cgroup[TSDB_CGROUP_LEN]; @@ -2489,7 +2566,7 @@ typedef struct { } SMqCMGetSubEpRsp; static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { - taosMemoryFree(pSubTopicEp->schema.pSchema); + // taosMemoryFree(pSubTopicEp->schema.pSchema); taosArrayDestroy(pSubTopicEp->vgs); } diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 772ff5e69a..9363cf00a2 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -44,7 +44,7 @@ extern "C" { } while (0) #define ERROR_MSG_BUF_DEFAULT_SIZE 512 -#define HEARTBEAT_INTERVAL 1500 // ms +#define HEARTBEAT_INTERVAL 1500 // ms enum { RES_TYPE__QUERY = 1, @@ -187,11 +187,13 @@ typedef struct SRequestSendRecvBody { } SRequestSendRecvBody; typedef struct { - int8_t resType; - char* topic; - SArray* res; // SArray - int32_t resIter; - int32_t vgId; + int8_t resType; + char topic[TSDB_TOPIC_FNAME_LEN]; + int32_t vgId; + SSchemaWrapper schema; + int32_t resIter; + SMqDataBlkRsp rsp; + SReqResultInfo resInfo; } SMqRspObj; typedef struct SRequestObj { @@ -211,16 +213,24 @@ typedef struct SRequestObj { SRequestSendRecvBody body; } SRequestObj; +void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); +void doSetOneRowPtr(SReqResultInfo* pResultInfo); +void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); +int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); +void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); + static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { SMqRspObj* msg = (SMqRspObj*)res; - int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter; - return (SReqResultInfo*)taosArrayGet(msg->res, resIter); + return (SReqResultInfo*)&msg->resInfo; } -static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res) { +static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { SMqRspObj* msg = (SMqRspObj*)res; - if (++msg->resIter < taosArrayGetSize(msg->res)) { - return (SReqResultInfo*)taosArrayGet(msg->res, msg->resIter); + msg->resIter++; + if (msg->resIter < msg->rsp.blockNum) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(msg->rsp.blockData, msg->resIter); + setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4); + return &msg->resInfo; } return NULL; } @@ -238,25 +248,25 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); -int taos_init(); +int taos_init(); -void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); -void destroyTscObj(void* pObj); -STscObj *acquireTscObj(int64_t rid); -int32_t releaseTscObj(int64_t rid); +void* createTscObj(const char* user, const char* auth, const char* db, SAppInstInfo* pAppInfo); +void destroyTscObj(void* pObj); +STscObj* acquireTscObj(int64_t rid); +int32_t releaseTscObj(int64_t rid); uint64_t generateRequestId(); -void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); -void destroyRequest(SRequestObj* pRequest); -SRequestObj *acquireRequest(int64_t rid); -int32_t releaseRequest(int64_t rid); +void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); +void destroyRequest(SRequestObj* pRequest); +SRequestObj* acquireRequest(int64_t rid); +int32_t releaseRequest(int64_t rid); char* getDbOfConnection(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); void resetConnectDB(STscObj* pTscObj); -int taos_options_imp(TSDB_OPTION option, const char* str); +int taos_options_imp(TSDB_OPTION option, const char* str); void* openTransporter(const char* user, const char* auth, int32_t numOfThreads); @@ -273,12 +283,6 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); -void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); -void doSetOneRowPtr(SReqResultInfo* pResultInfo); -void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); -void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); -int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); - // --- heartbeat // global, called by mgmt int hbMgrInit(); @@ -290,7 +294,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); void appHbMgrCleanup(void); // conn level -int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); +int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 903018d5c3..6472dea556 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -14,12 +14,12 @@ */ #include "catalog.h" -#include "scheduler.h" #include "clientInt.h" -#include "clientStmt.h" #include "clientLog.h" +#include "clientStmt.h" #include "os.h" #include "query.h" +#include "scheduler.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" @@ -177,25 +177,24 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return doFetchRows(pRequest, true, true); } else if (TD_RES_TMQ(res)) { - SMqRspObj *msg = ((SMqRspObj *)res); - if (msg->resIter == -1) msg->resIter++; - SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter); + SMqRspObj *msg = ((SMqRspObj *)res); + SReqResultInfo *pResultInfo; + if (msg->resIter == -1) { + pResultInfo = tmqGetNextResInfo(res, true); + } else { + pResultInfo = tmqGetCurResInfo(res); + } if (pResultInfo->current < pResultInfo->numOfRows) { doSetOneRowPtr(pResultInfo); pResultInfo->current += 1; return pResultInfo->row; } else { - msg->resIter++; - if (msg->resIter < taosArrayGetSize(msg->res)) { - pResultInfo = taosArrayGet(msg->res, msg->resIter); - doSetOneRowPtr(pResultInfo); - pResultInfo->current += 1; - return pResultInfo->row; - } else { - return NULL; - } + pResultInfo = tmqGetNextResInfo(res, true); + if (pResultInfo == NULL) return NULL; + doSetOneRowPtr(pResultInfo); + pResultInfo->current += 1; + return pResultInfo->row; } - } else { // assert to avoid un-initialization error ASSERT(0); @@ -455,7 +454,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { (*numOfRows) = pResultInfo->numOfRows; return pRequest->code; } else if (TD_RES_TMQ(res)) { - SReqResultInfo *pResultInfo = tmqGetNextResInfo(res); + SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, true); if (pResultInfo == NULL) return -1; pResultInfo->current = pResultInfo->numOfRows; @@ -474,7 +473,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { } if (TD_RES_TMQ(res)) { - SReqResultInfo *pResultInfo = tmqGetNextResInfo(res); + SReqResultInfo *pResultInfo = tmqGetNextResInfo(res, false); if (pResultInfo == NULL) { (*numOfRows) = 0; return 0; @@ -710,10 +709,8 @@ int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { return stmtBindBatch(stmt, bind); } - TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision) { // TODO return NULL; } - diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 9ea7a85d9b..90a589c36c 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -72,25 +72,25 @@ struct tmq_conf_t { struct tmq_t { // conf - char groupId[TSDB_CGROUP_LEN]; - char clientId[256]; - int8_t autoCommit; - int8_t inWaiting; + char groupId[TSDB_CGROUP_LEN]; + char clientId[256]; + int8_t autoCommit; + /*int8_t inWaiting;*/ int64_t consumerId; int32_t epoch; int32_t resetOffsetCfg; int64_t status; STscObj* pTscObj; tmq_commit_cb* commit_cb; - int32_t nextTopicIdx; - int8_t epStatus; - int32_t epSkipCnt; - int32_t waitingRequest; - int32_t readyRequest; - SArray* clientTopics; // SArray - STaosQueue* mqueue; // queue of tmq_message_t - STaosQall* qall; - tsem_t rspSem; + /*int32_t nextTopicIdx;*/ + int8_t epStatus; + int32_t epSkipCnt; + /*int32_t waitingRequest;*/ + /*int32_t readyRequest;*/ + SArray* clientTopics; // SArray + STaosQueue* mqueue; // queue of tmq_message_t + STaosQall* qall; + tsem_t rspSem; // stat int64_t pollCnt; }; @@ -134,7 +134,7 @@ typedef struct { int32_t epoch; SMqClientVg* vgHandle; SMqClientTopic* topicHandle; - SMqPollRspV2 msg; + SMqDataBlkRsp msg; } SMqPollRspWrapper; typedef struct { @@ -145,6 +145,7 @@ typedef struct { typedef struct { tmq_t* tmq; + int32_t code; int32_t sync; tsem_t rspSem; } SMqAskEpCbParam; @@ -327,12 +328,12 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return NULL; } pTmq->pTscObj = (STscObj*)conn; - pTmq->inWaiting = 0; + /*pTmq->inWaiting = 0;*/ pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; - pTmq->waitingRequest = 0; - pTmq->readyRequest = 0; + /*pTmq->waitingRequest = 0;*/ + /*pTmq->readyRequest = 0;*/ pTmq->epStatus = 0; pTmq->epSkipCnt = 0; // set conf @@ -372,12 +373,12 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ); if (pTmq->pTscObj == NULL) return NULL; - pTmq->inWaiting = 0; + /*pTmq->inWaiting = 0;*/ pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; - pTmq->waitingRequest = 0; - pTmq->readyRequest = 0; + /*pTmq->waitingRequest = 0;*/ + /*pTmq->readyRequest = 0;*/ pTmq->epStatus = 0; pTmq->epSkipCnt = 0; // set conf @@ -862,7 +863,6 @@ void tmqShowMsg(tmq_message_t* tmq_message) { #endif int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { - /*printf("recv poll\n");*/ SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqClientVg* pVg = pParam->pVg; SMqClientTopic* pTopic = pParam->pTopic; @@ -875,17 +875,15 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; int32_t tmqEpoch = atomic_load_32(&tmq->epoch); if (msgEpoch < tmqEpoch) { - /*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/ - /*tsem_post(&tmq->rspSem);*/ + // do not write into queue since updating epoch reset tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); + /*tsem_post(&tmq->rspSem);*/ return 0; } if (msgEpoch != tmqEpoch) { tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); - } else { - atomic_sub_fetch_32(&tmq->waitingRequest, 1); } #if 0 @@ -907,45 +905,33 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } #endif - /*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/ - /*tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));*/ SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper)); if (pRspWrapper == NULL) { tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch); goto CREATE_MSG_FAIL; } + pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP; pRspWrapper->vgHandle = pVg; pRspWrapper->topicHandle = pTopic; - /*memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));*/ + memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqPollRspV2(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg); - // TODO: alloc mem - /*pRsp->*/ - /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ -#if 0 - if (pRsp->msg.numOfTopics == 0) { - /*printf("no data\n");*/ - taosFreeQitem(pRsp); - goto CREATE_MSG_FAIL; - } -#endif + tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg); tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId, pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset); taosWriteQitem(tmq->mqueue, pRspWrapper); - atomic_add_fetch_32(&tmq->readyRequest, 1); /*tsem_post(&tmq->rspSem);*/ - return 0; + return 0; CREATE_MSG_FAIL: if (pParam->epoch == tmq->epoch) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } /*tsem_post(&tmq->rspSem);*/ - return code; + return -1; } bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { @@ -1028,6 +1014,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = pParam->tmq; + pParam->code = code; if (code != 0) { tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->sync); goto END; @@ -1067,6 +1054,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { taosWriteQitem(tmq->mqueue, pWrapper); /*tsem_post(&tmq->rspSem);*/ + taosMemoryFree(pParam); } END: @@ -1078,7 +1066,8 @@ END: } int32_t tmqAskEp(tmq_t* tmq, bool sync) { - int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); + int32_t code = 0; + int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); if (epStatus == 1) { int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1); tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt); @@ -1135,8 +1124,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - if (sync) tsem_wait(&pParam->rspSem); - return 0; + if (sync) { + tsem_wait(&pParam->rspSem); + code = pParam->code; + taosMemoryFree(pParam); + } + return code; } tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { @@ -1162,7 +1155,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { return TMQ_RESP_ERR__FAIL; } -SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { +SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { int64_t reqOffset; if (pVg->currentOffset >= 0) { reqOffset = pVg->currentOffset; @@ -1174,13 +1167,18 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo reqOffset = tmq->resetOffsetCfg; } - SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq)); + SMqPollReqV2* pReq = taosMemoryMalloc(sizeof(SMqPollReqV2)); if (pReq == NULL) { return NULL; } - strcpy(pReq->topic, pTopic->topicName); - strcpy(pReq->cgroup, tmq->groupId); + /*strcpy(pReq->topic, pTopic->topicName);*/ + /*strcpy(pReq->cgroup, tmq->groupId);*/ + + int32_t tlen = strlen(tmq->groupId); + memcpy(pReq->subKey, tmq->groupId, tlen); + pReq->subKey[tlen] = TMQ_SEPARATOR; + strcpy(pReq->subKey + tlen + 1, pTopic->topicName); pReq->blockingTime = blockingTime; pReq->consumerId = tmq->consumerId; @@ -1189,101 +1187,26 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo pReq->reqId = generateRequestId(); pReq->head.vgId = htonl(pVg->vgId); - pReq->head.contLen = htonl(sizeof(SMqPollReq)); + pReq->head.contLen = htonl(sizeof(SMqPollReqV2)); return pReq; } SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); pRspObj->resType = RES_TYPE__TMQ; - pRspObj->topic = strdup(pWrapper->topicHandle->topicName); - pRspObj->resIter = -1; + strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; - SMqPollRspV2* pRsp = &pWrapper->msg; - int32_t blockNum = taosArrayGetSize(pRsp->blockPos); - pRspObj->res = taosArrayInit(blockNum, sizeof(SReqResultInfo)); - for (int32_t i = 0; i < blockNum; i++) { - int32_t pos = *(int32_t*)taosArrayGet(pRsp->blockPos, i); - SRetrieveTableRsp* pRetrieve = POINTER_SHIFT(pRsp->blockData, pos); - SReqResultInfo resInfo = {0}; - resInfo.totalRows = 0; - resInfo.precision = TSDB_TIME_PRECISION_MILLI; - setResSchemaInfo(&resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); - setQueryResultFromRsp(&resInfo, pRetrieve, true); - taosArrayPush(pRspObj->res, &resInfo); - } - return pRspObj; -} - -#if 0 -tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { - tmq_message_t* msg = NULL; - for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); - /*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/ - /*continue;*/ - /*}*/ - SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); - if (pReq == NULL) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - // TODO: out of mem - return NULL; - } - - SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); - if (pParam == NULL) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - // TODO: out of mem - return NULL; - } - pParam->tmq = tmq; - pParam->pVg = pVg; - pParam->epoch = tmq->epoch; - pParam->sync = 1; - pParam->msg = &msg; - tsem_init(&pParam->rspSem, 0, 0); - - SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); - if (sendInfo == NULL) { - return NULL; - } - - sendInfo->msgInfo = (SDataBuf){ - .pData = pReq, - .len = sizeof(SMqPollReq), - .handle = NULL, - }; - sendInfo->requestId = generateRequestId(); - sendInfo->requestObjRefId = 0; - sendInfo->param = pParam; - sendInfo->fp = tmqPollCb; - sendInfo->msgType = TDMT_VND_CONSUME; + pRspObj->resIter = -1; + memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp)); - int64_t transporterId = 0; - /*printf("send poll\n");*/ - atomic_add_fetch_32(&tmq->waitingRequest, 1); - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); - pVg->pollCnt++; - tmq->pollCnt++; + /*SRetrieveTableRsp* pRetrieve = taosArrayGetP(pWrapper->msg.blockData, 0);*/ + pRspObj->resInfo.totalRows = 0; + pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; + setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); - tsem_wait(&pParam->rspSem); - tmq_message_t* nmsg = NULL; - while (1) { - taosReadQitem(tmq->mqueue, (void**)&nmsg); - if (nmsg == NULL) continue; - while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) { - taosReadQitem(tmq->mqueue, (void**)&nmsg); - } - return nmsg; - } - } - } - return NULL; + taosFreeQitem(pWrapper); + return pRspObj; } -#endif int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { /*printf("call poll\n");*/ @@ -1306,7 +1229,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { #endif } atomic_store_32(&pVg->vgSkipCnt, 0); - SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); + SMqPollReqV2* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); /*tsem_post(&tmq->rspSem);*/ @@ -1337,7 +1260,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { sendInfo->msgInfo = (SDataBuf){ .pData = pReq, - .len = sizeof(SMqPollReq), + .len = sizeof(SMqPollReqV2), .handle = NULL, }; sendInfo->requestId = pReq->reqId; @@ -1348,7 +1271,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int64_t transporterId = 0; /*printf("send poll\n");*/ - atomic_add_fetch_32(&tmq->waitingRequest, 1); + /*atomic_add_fetch_32(&tmq->waitingRequest, 1);*/ tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ @@ -1390,7 +1313,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; - atomic_sub_fetch_32(&tmq->readyRequest, 1); + /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/ if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) { /*printf("epoch match\n");*/ @@ -1398,7 +1321,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ pVg->currentOffset = pollRspWrapper->msg.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - if (pollRspWrapper->msg.dataLen == 0) { + if (pollRspWrapper->msg.blockNum == 0) { taosFreeQitem(pollRspWrapper); rspWrapper = NULL; continue; @@ -1454,7 +1377,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { // TODO: put into another thread or delayed queue int64_t status = atomic_load_64(&tmq->status); - tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); + while (0 != tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT)) { + tscDebug("not ready, retry\n"); + taosSsleep(1); + } rspObj = tmqHandleAllRsp(tmq, blocking_time, false); if (rspObj) { diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 38885333c9..4cf4a405e0 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -186,6 +186,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 751edd6f98..c9bc3514cc 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -321,7 +321,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + /*dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);*/ dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); @@ -334,10 +334,11 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); + /*dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);*/ + /*dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);*/ + /*dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);*/ + /*dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);*/ + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 8a66e5cb69..2ce7f0ae5f 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -127,7 +127,6 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); if (pIter == NULL) break; - taosRLockLatch(&pConsumer->lock); int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); if (status == MQ_CONSUMER_STATUS__READY && hbStatus > MND_CONSUMER_LOST_HB_CNT) { @@ -143,6 +142,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { if (status == MQ_CONSUMER_STATUS__LOST_REBD || status == MQ_CONSUMER_STATUS__READY) { // do nothing } else if (status == MQ_CONSUMER_STATUS__LOST) { + taosRLockLatch(&pConsumer->lock); int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); for (int32_t i = 0; i < topicNum; i++) { char key[TSDB_SUBSCRIBE_KEY_LEN]; @@ -151,7 +151,9 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } + taosRUnLockLatch(&pConsumer->lock); } else if (status == MQ_CONSUMER_STATUS__MODIFY) { + taosRLockLatch(&pConsumer->lock); int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics); for (int32_t i = 0; i < newTopicNum; i++) { char key[TSDB_SUBSCRIBE_KEY_LEN]; @@ -169,11 +171,11 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } + taosRUnLockLatch(&pConsumer->lock); } else { // do nothing } - taosRUnLockLatch(&pConsumer->lock); mndReleaseConsumer(pMnode, pConsumer); } @@ -188,7 +190,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { } else { taosHashCleanup(pRebMsg->rebSubHash); rpcFreeCont(pRebMsg); - mInfo("mq rebalance finished, no modification"); + mTrace("mq rebalance finished, no modification"); atomic_store_8(&mqInRebFlag, 0); } return 0; @@ -213,12 +215,12 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) { // 1. check consumer status int32_t status = atomic_load_32(&pConsumer->status); + if (status == MQ_CONSUMER_STATUS__LOST) { - // recover consumer + // TODO: recover consumer } if (status != MQ_CONSUMER_STATUS__READY) { - mndReleaseConsumer(pMnode, pConsumer); terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; return -1; } @@ -228,11 +230,13 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) { // 2. check epoch, only send ep info when epoches do not match if (epoch != serverEpoch) { taosRLockLatch(&pConsumer->lock); + mInfo("process ask ep, consumer %ld(epoch %d), server epoch %d", consumerId, epoch, serverEpoch); int32_t numOfTopics = taosArrayGetSize(pConsumer->currentTopics); rsp.topics = taosArrayInit(numOfTopics, sizeof(SMqSubTopicEp)); if (rsp.topics == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosRUnLockLatch(&pConsumer->lock); goto FAIL; } @@ -265,6 +269,7 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) { topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp)); if (topicEp.vgs == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + taosRUnLockLatch(&pConsumer->lock); goto FAIL; } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index dd2d6801e1..37c819ae60 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -250,13 +250,28 @@ void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub *pEpInSub) { int32_t tEncodeSMqConsumerEpInSub(void **buf, const SMqConsumerEpInSub *pEpInSub) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pEpInSub->consumerId); - tlen += taosEncodeArray(buf, pEpInSub->vgs, (FEncode)tEncodeSMqVgEp); + int32_t sz = taosArrayGetSize(pEpInSub->vgs); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, i); + tlen += tEncodeSMqVgEp(buf, pVgEp); + } + /*tlen += taosEncodeArray(buf, pEpInSub->vgs, (FEncode)tEncodeSMqVgEp);*/ return tlen; } void *tDecodeSMqConsumerEpInSub(const void *buf, SMqConsumerEpInSub *pEpInSub) { buf = taosDecodeFixedI64(buf, &pEpInSub->consumerId); - buf = taosDecodeArray(buf, &pEpInSub->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp)); + /*buf = taosDecodeArray(buf, &pEpInSub->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));*/ + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pEpInSub->vgs = taosArrayInit(sz, sizeof(void *)); + for (int32_t i = 0; i < sz; i++) { + SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp)); + buf = tDecodeSMqVgEp(buf, pVgEp); + taosArrayPush(pEpInSub->vgs, &pVgEp); + } + return (void *)buf; } @@ -268,10 +283,12 @@ SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) { pSubNew->vgNum = -1; pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); // TODO set free fp - SMqConsumerEpInSub *pEpInSub = taosMemoryMalloc(sizeof(SMqConsumerEpInSub)); - pEpInSub->vgs = taosArrayInit(0, sizeof(SMqVgEp)); + SMqConsumerEpInSub epInSub = { + .consumerId = -1, + .vgs = taosArrayInit(0, sizeof(void *)), + }; int64_t unexistKey = -1; - taosHashPut(pSubNew->consumerHash, &unexistKey, sizeof(int64_t), pEpInSub, sizeof(SMqConsumerEpInSub)); + taosHashPut(pSubNew->consumerHash, &unexistKey, sizeof(int64_t), &epInSub, sizeof(SMqConsumerEpInSub)); return pSubNew; } @@ -287,7 +304,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { void *pIter = NULL; SMqConsumerEpInSub *pEpInSub = NULL; while (1) { - pIter = taosHashIterate(pSubNew->consumerHash, pIter); + pIter = taosHashIterate(pSub->consumerHash, pIter); if (pIter == NULL) break; pEpInSub = (SMqConsumerEpInSub *)pIter; SMqConsumerEpInSub newEp = { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 5629421a84..b3583af1dc 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -434,7 +434,9 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib return -1; } - ASSERT(pSub->vgNum == 0); + ASSERT(pSub->vgNum == -1); + + pSub->vgNum = 0; int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); if (levelNum != 1) { @@ -455,6 +457,9 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib int64_t unexistKey = -1; SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t)); + ASSERT(pEpInSub); + + ASSERT(taosHashGetSize(pSub->consumerHash) == 1); void* pIter = NULL; while (1) { @@ -492,10 +497,18 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib } taosArrayPush(pEpInSub->vgs, &pVgEp); + ASSERT(taosHashGetSize(pSub->consumerHash) == 1); + /*taosArrayPush(pSub->unassignedVg, &consumerEp);*/ } - taosHashRelease(pSub->consumerHash, pEpInSub); + ASSERT(pEpInSub->vgs->size > 0); + pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t)); + + ASSERT(pEpInSub->vgs->size > 0); + + ASSERT(taosHashGetSize(pSub->consumerHash) == 1); + qDestroyQueryPlan(pPlan); return 0; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 48c9804619..7b2ec83230 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -109,6 +109,7 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + ASSERT(taosHashGetSize(pSub->consumerHash) == 1); if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { tDeleteSubscribeObj(pSub); @@ -116,6 +117,8 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic, return NULL; } + ASSERT(taosHashGetSize(pSub->consumerHash) == 1); + return pSub; } @@ -255,7 +258,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const char *subK req.oldConsumerId = pRebVg->oldConsumerId; req.newConsumerId = pRebVg->newConsumerId; req.vgId = pRebVg->pVgEp->vgId; - req.qmsg = req.qmsg; + req.qmsg = pRebVg->pVgEp->qmsg; strncpy(req.subKey, subKey, TSDB_SUBSCRIBE_KEY_LEN); int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); @@ -663,14 +666,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR if (pInput->pTopic != NULL) { // create subscribe pOutput->pSub = mndCreateSub(pMnode, pInput->pTopic, pInput->pRebInfo->key); + ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 1); } else { pOutput->pSub = tCloneSubscribeObj(pInput->pOldSub); } int32_t totalVgNum = pOutput->pSub->vgNum; + mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum); + // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) > 0); // 2. check and get actual removed consumers, put their vg into hash int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t actualRemoved = 0; @@ -692,18 +699,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); } - taosHashRelease(pOutput->pSub->consumerHash, pEpInSub); taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); // put into removed taosArrayPush(pOutput->removedConsumers, &consumerId); } } ASSERT(removedNum == actualRemoved); + ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) > 0); // if previously no consumer, there are vgs not assigned { - int64_t key = -1; - SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &key, sizeof(int64_t)); + int64_t unexistKey = -1; + SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &unexistKey, sizeof(int64_t)); ASSERT(pEpInSub); int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs); for (int32_t i = 0; i < consumerVgNum; i++) { @@ -715,18 +722,22 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR }; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); } - taosHashRelease(pOutput->pSub->consumerHash, pEpInSub); } // 3. calc vg number of each consumer - int32_t actualConsumerNum = taosHashGetSize(pInput->pOldSub->consumerHash) - 1 + - taosArrayGetSize(pInput->pRebInfo->newConsumers) - - taosArrayGetSize(pInput->pRebInfo->removedConsumers); - int32_t afterRebConsumerNum = taosHashGetSize(pOutput->pSub->consumerHash) - 1; - ASSERT(afterRebConsumerNum == actualConsumerNum); + int32_t oldSz = 0; + if (pInput->pOldSub) { + oldSz = taosHashGetSize(pInput->pOldSub->consumerHash) - 1; + } + int32_t afterRebConsumerNum = + oldSz + taosArrayGetSize(pInput->pRebInfo->newConsumers) - taosArrayGetSize(pInput->pRebInfo->removedConsumers); + int32_t minVgCnt = 0; + int32_t imbConsumerNum = 0; // calc num - int32_t minVgCnt = totalVgNum / actualConsumerNum; - int32_t imbConsumerNum = totalVgNum % actualConsumerNum; + if (afterRebConsumerNum) { + minVgCnt = totalVgNum / afterRebConsumerNum; + imbConsumerNum = totalVgNum % afterRebConsumerNum; + } // 4. first scan: remove consumer more than wanted, put to remove hash int32_t imbCnt = 0; @@ -735,6 +746,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter; + if (pEpInSub->consumerId == -1) continue; ASSERT(pEpInSub->consumerId > 0); int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs); // all old consumers still existing are touched @@ -783,6 +795,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEpInSub)); + /*SMqConsumerEpInSub *pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/ + /*ASSERT(pTestNew->consumerId == consumerId);*/ + /*ASSERT(pTestNew->vgs == newConsumerEp.vgs);*/ taosArrayPush(pOutput->newConsumers, &consumerId); } } @@ -796,6 +811,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter == NULL) break; SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter; + if (pEpInSub->consumerId == -1) continue; ASSERT(pEpInSub->consumerId > 0); /*int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);*/ if (imbCnt < imbConsumerNum) { @@ -833,8 +849,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ASSERT(pRemovedIter == NULL); } else { // if all consumer is removed, put all vg into unassigned - int64_t key = -1; - SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &key, sizeof(int64_t)); + int64_t unexistKey = -1; + SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &unexistKey, sizeof(int64_t)); ASSERT(pEpInSub); ASSERT(pEpInSub->consumerId == -1); @@ -845,7 +861,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR if (pIter == NULL) break; pRebOutput = (SMqRebOutputVg *)pIter; ASSERT(pRebOutput->newConsumerId == -1); - taosArrayPush(pEpInSub->vgs, pRebOutput->pVgEp); + taosArrayPush(pEpInSub->vgs, &pRebOutput->pVgEp); + taosArrayPush(pOutput->rebVgs, pRebOutput); } } @@ -895,7 +912,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO // 3.2 set new consumer consumerNum = taosArrayGetSize(pOutput->newConsumers); for (int32_t i = 0; i < consumerNum; i++) { - int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i); + int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i); ASSERT(consumerId > 0); SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); @@ -948,17 +965,22 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { while (1) { pIter = taosHashIterate(pReq->rebSubHash, pIter); if (pIter == NULL) break; - SMqRebInputObj rebInput = {0}; - SMqRebOutputObj rebOutput = {0}; + SMqRebInputObj rebInput = {0}; + + SMqRebOutputObj rebOutput = {0}; + rebOutput.newConsumers = taosArrayInit(0, sizeof(void *)); + rebOutput.removedConsumers = taosArrayInit(0, sizeof(void *)); + rebOutput.touchedConsumers = taosArrayInit(0, sizeof(void *)); + rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); + SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter; SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key); if (pSub == NULL) { - taosRLockLatch(&pSub->lock); // split sub key and extract topic char topic[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pSub->key, topic, cgroup); + mndSplitSubscribeKey(pRebSub->key, topic, cgroup); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); ASSERT(pTopic); taosRLockLatch(&pTopic->lock); @@ -968,24 +990,16 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { rebInput.pRebInfo = pRebSub; rebInput.pOldSub = pSub; - int32_t unassignedVgNum = 0; - int64_t key = -1; - SMqConsumerEpInSub *pEpInSub = taosHashGet(pSub->consumerHash, &key, sizeof(int64_t)); - if (pEpInSub != NULL) { - ASSERT(pEpInSub->consumerId == key); - unassignedVgNum = taosArrayGetSize(pEpInSub->vgs); - } - mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum, unassignedVgNum); - // TODO replace assert with error check ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0); + ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0); ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0); if (rebInput.pTopic) { SMqTopicObj *pTopic = (SMqTopicObj *)rebInput.pTopic; taosRUnLockLatch(&pTopic->lock); mndReleaseTopic(pMnode, pTopic); - taosRUnLockLatch(&pSub->lock); + } else { mndReleaseSubscribe(pMnode, pSub); } } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 3ac1c522b2..5a61cefee1 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -22,12 +22,14 @@ #include "mndDb.h" #include "mndDnode.h" #include "mndFunc.h" +#include "mndGrant.h" #include "mndInfoSchema.h" -#include "mndPerfSchema.h" #include "mndMnode.h" #include "mndOffset.h" +#include "mndPerfSchema.h" #include "mndProfile.h" #include "mndQnode.h" +#include "mndQuery.h" #include "mndShow.h" #include "mndSma.h" #include "mndSnode.h" @@ -40,8 +42,6 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" -#include "mndQuery.h" -#include "mndGrant.h" #define MQ_TIMER_MS 3000 #define TRNAS_TIMER_MS 6000 diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ed33473b16..5e074eeb12 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -16,6 +16,13 @@ #ifndef _TD_VNODE_TQ_H_ #define _TD_VNODE_TQ_H_ +#include "executor.h" +#include "os.h" +#include "thash.h" +#include "tmsg.h" +#include "ttimer.h" +#include "wal.h" + #ifdef __cplusplus extern "C" { #endif @@ -30,12 +37,6 @@ extern "C" { #define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -enum { - TQ_STREAM_TOKEN__DATA = 1, - TQ_STREAM_TOKEN__WATERMARK, - TQ_STREAM_TOKEN__CHECKPOINT, -}; - #define TQ_BUFFER_SIZE 4 #define TQ_BUCKET_MASK 0xFF @@ -151,22 +152,27 @@ typedef struct { } STqMetaStore; typedef struct { - SMemAllocatorFactory* pAllocatorFactory; - SMemAllocator* pAllocator; -} STqMemRef; + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; + int64_t consumerId; + int32_t epoch; + char* qmsg; + // SRWLatch lock; + SWalReadHandle* pReadHandle; + // number should be identical to fetch thread num + qTaskInfo_t task[4]; +} STqExec; struct STQ { // the collection of groups // the handle of meta kvstore bool writeTrigger; char* path; - STqMemRef tqMemRef; STqMetaStore* tqMeta; - // STqPushMgr* tqPushMgr; - SHashObj* pStreamTasks; - SVnode* pVnode; - SWal* pWal; - SMeta* pVnodeMeta; + SHashObj* tqMetaNew; // subKey -> tqExec + SHashObj* pStreamTasks; + SVnode* pVnode; + SWal* pWal; + SMeta* pVnodeMeta; }; typedef struct { @@ -230,10 +236,6 @@ typedef struct { // TODO sync function } STqStreamPusher; -typedef struct { - int8_t type; // mq or stream -} STqPusher; - typedef struct { SHashObj* pHash; // } STqPushMgr; @@ -257,9 +259,10 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); int tqCommit(STQ*); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); -int32_t tqProcessSetConnReq(STQ* pTq, char* msg); -int32_t tqProcessRebReq(STQ* pTq, char* msg); -int32_t tqProcessCancelConnReq(STQ* pTq, char* msg); +int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); +// int32_t tqProcessSetConnReq(STQ* pTq, char* msg); +// int32_t tqProcessRebReq(STQ* pTq, char* msg); +// int32_t tqProcessCancelConnReq(STQ* pTq, char* msg); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); @@ -314,4 +317,4 @@ STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet } #endif -#endif /*_TD_VNODE_TQ_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_TQ_H_*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0aa6023eaf..16c5a4752e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -29,31 +29,14 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe pTq->pVnode = pVnode; pTq->pWal = pWal; pTq->pVnodeMeta = pVnodeMeta; -#if 0 - pTq->tqMemRef.pAllocatorFactory = allocFac; - pTq->tqMemRef.pAllocator = allocFac->create(allocFac); - if (pTq->tqMemRef.pAllocator == NULL) { - // TODO: error code of buffer pool - } -#endif pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, (FTqDelete)taosMemoryFree, 0); if (pTq->tqMeta == NULL) { taosMemoryFree(pTq); -#if 0 - allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator); -#endif return NULL; } -#if 0 - pTq->tqPushMgr = tqPushMgrOpen(); - if (pTq->tqPushMgr == NULL) { - // free store - taosMemoryFree(pTq); - return NULL; - } -#endif + pTq->tqMetaNew = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -248,16 +231,15 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu return 0; } -#if 0 + int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { - SMqPollReq* pReq = pMsg->pCont; - int64_t consumerId = pReq->consumerId; - int64_t fetchOffset; - int64_t blockingTime = pReq->blockingTime; - int32_t reqEpoch = pReq->epoch; + SMqPollReqV2* pReq = pMsg->pCont; + int64_t consumerId = pReq->consumerId; + int32_t reqEpoch = pReq->epoch; + int64_t fetchOffset; if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { - fetchOffset = 0; + fetchOffset = walGetFirstVer(pTq->pWal); } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { fetchOffset = walGetLastVer(pTq->pWal); } else { @@ -267,65 +249,29 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); - SMqPollRsp rsp = { - /*.consumerId = consumerId,*/ - .numOfTopics = 0, - .pBlockData = NULL, - }; - - STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); - if (pConsumer == NULL) { - vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode)); - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = -1; - tmsgSendRsp(pMsg); - return 0; - } + STqExec* pExec = taosHashGet(pTq->tqMetaNew, pReq->subKey, strlen(pReq->subKey)); + ASSERT(pExec); - int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch); + int32_t consumerEpoch = atomic_load_32(&pExec->epoch); while (consumerEpoch < reqEpoch) { - consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch); - } - - STqTopic* pTopic = NULL; - int32_t sz = taosArrayGetSize(pConsumer->topics); - for (int32_t i = 0; i < sz; i++) { - STqTopic* topic = taosArrayGet(pConsumer->topics, i); - // TODO race condition - ASSERT(pConsumer->consumerId == consumerId); - if (strcmp(topic->topicName, pReq->topic) == 0) { - pTopic = topic; - break; - } - } - if (pTopic == NULL) { - vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, - TD_VID(pTq->pVnode)); - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = -1; - tmsgSendRsp(pMsg); - return 0; + consumerEpoch = atomic_val_compare_exchange_32(&pExec->epoch, consumerEpoch, reqEpoch); } - vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, - TD_VID(pTq->pVnode)); - + SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pReq->currentOffset; - rsp.skipLogNum = 0; + rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); + rsp.blockData = taosArrayInit(0, sizeof(void*)); while (1) { - /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/ - // TODO - consumerEpoch = atomic_load_32(&pConsumer->epoch); + consumerEpoch = atomic_load_32(&pExec->epoch); if (consumerEpoch > reqEpoch) { vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); break; } + SWalReadHead* pHead; - if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) { + if (walReadWithHandle_s(pExec->pReadHandle, fetchOffset, &pHead) < 0) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user @@ -333,101 +279,86 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { TD_VID(pTq->pVnode), fetchOffset); break; } + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); - /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ - /*pHead = pTopic->pReadhandle->pHead;*/ + if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - qTaskInfo_t task = pTopic->buffer.output[workerId].task; + qTaskInfo_t task = pExec->task[workerId]; ASSERT(task); qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); while (1) { SSDataBlock* pDataBlock = NULL; - uint64_t ts; + uint64_t ts = 0; if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(false); - } - if (pDataBlock == NULL) { - /*pos = fetchOffset % TQ_BUFFER_SIZE;*/ - break; + ASSERT(0); } + if (pDataBlock == NULL) break; - taosArrayPush(pRes, pDataBlock); - } + ASSERT(pDataBlock->info.rows != 0); + ASSERT(pDataBlock->info.numOfCols != 0); - if (taosArrayGetSize(pRes) == 0) { - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, - pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); - fetchOffset++; - rsp.skipLogNum++; - taosArrayDestroy(pRes); - continue; + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock); + void* buf = taosMemoryCalloc(1, dataStrLen); + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; + pRetrieve->useconds = ts; + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->numOfRows = htonl(pDataBlock->info.rows); + + // TODO enable compress + int32_t actualLen = 0; + blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false); + actualLen += sizeof(SRetrieveTableRsp); + ASSERT(actualLen <= dataStrLen); + taosArrayPush(rsp.blockDataLen, &actualLen); + taosArrayPush(rsp.blockData, &buf); + rsp.blockNum++; } - rsp.schema = pTopic->buffer.output[workerId].pReadHandle->pSchemaWrapper; - rsp.rspOffset = fetchOffset; + } - rsp.numOfTopics = 1; - rsp.pBlockData = pRes; + // TODO batch optimization + if (rsp.blockNum != 0) break; + rsp.skipLogNum++; + fetchOffset++; + } - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - pMsg->code = -1; - taosMemoryFree(pHead); - return -1; - } - ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - ((SMqRspHead*)buf)->epoch = pReq->epoch; - ((SMqRspHead*)buf)->consumerId = consumerId; + ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum); + ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum); - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqPollRsp(&abuf, &rsp); - /*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/ - pMsg->pCont = buf; - pMsg->contLen = tlen; - pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset, - pHead->msgType, consumerId, pReq->epoch); - tmsgSendRsp(pMsg); - taosMemoryFree(pHead); - return 0; - } else { - taosMemoryFree(pHead); - fetchOffset++; - rsp.skipLogNum++; - } - } + if (rsp.blockNum != 0) + rsp.rspOffset = fetchOffset; + else + rsp.rspOffset = fetchOffset - 1; - /*if (blockingTime != 0) {*/ - /*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/ - /*} else {*/ - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp); + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); void* buf = rpcMallocCont(tlen); if (buf == NULL) { pMsg->code = -1; return -1; } + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; ((SMqRspHead*)buf)->epoch = pReq->epoch; - rsp.rspOffset = fetchOffset - 1; + ((SMqRspHead*)buf)->consumerId = consumerId; void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqPollRsp(&abuf, &rsp); - rsp.pBlockData = NULL; + tEncodeSMqDataBlkRsp(&abuf, &rsp); pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId, - pReq->epoch); - /*}*/ + vDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", + TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); + + // TODO destroy return 0; } -#endif +#if 0 int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqPollReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; @@ -436,7 +367,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t reqEpoch = pReq->epoch; if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { - fetchOffset = 0; + fetchOffset = walGetFirstVer(pTq->pWal); } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { fetchOffset = walGetLastVer(pTq->pWal); } else { @@ -635,7 +566,56 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; } +#endif +// TODO: persist meta into tdb +int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { + SMqRebVgReq req; + tDecodeSMqRebVgReq(msg, &req); + // todo lock + STqExec* pExec = taosHashGet(pTq->tqMetaNew, req.subKey, strlen(req.subKey)); + if (pExec == NULL) { + ASSERT(req.oldConsumerId == -1); + ASSERT(req.newConsumerId != -1); + STqExec exec = {0}; + pExec = &exec; + /*taosInitRWLatch(&pExec->lock);*/ + + memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); + pExec->consumerId = req.newConsumerId; + pExec->epoch = -1; + pExec->qmsg = req.qmsg; + req.qmsg = NULL; + pExec->pReadHandle = walOpenReadHandle(pTq->pVnode->pWal); + for (int32_t i = 0; i < 4; i++) { + STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + SReadHandle handle = { + .reader = pReadHandle, + .meta = pTq->pVnodeMeta, + }; + pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); + ASSERT(pExec->task[i]); + } + taosHashPut(pTq->tqMetaNew, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); + return 0; + } else { + if (req.newConsumerId != -1) { + /*taosWLockLatch(&pExec->lock);*/ + ASSERT(pExec->consumerId == req.oldConsumerId); + // TODO handle qmsg and exec modification + atomic_store_64(&pExec->consumerId, req.newConsumerId); + atomic_add_fetch_32(&pExec->epoch, 1); + /*taosWUnLockLatch(&pExec->lock);*/ + return 0; + } else { + // TODO + /*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/ + return 0; + } + } +} + +#if 0 int32_t tqProcessRebReq(STQ* pTq, char* msg) { SMqMVRebReq req = {0}; terrno = TSDB_CODE_SUCCESS; @@ -754,6 +734,7 @@ int32_t tqProcessCancelConnReq(STQ* pTq, char* msg) { terrno = TSDB_CODE_SUCCESS; return 0; } +#endif int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { if (pTask->execType == TASK_EXEC__NONE) return 0; diff --git a/source/dnode/vnode/src/tq/tqMetaStore.c b/source/dnode/vnode/src/tq/tqMetaStore.c index 357917e0ba..809fd7cb06 100644 --- a/source/dnode/vnode/src/tq/tqMetaStore.c +++ b/source/dnode/vnode/src/tq/tqMetaStore.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ #include "vnodeInt.h" -// TODO:replace by an abstract file layer // #include // #include // #include diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fface9d6c5..1486d18b5c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -80,6 +80,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg pRsp->msgType = TDMT_VND_SUBMIT_RSP; vnodeProcessSubmitReq(pVnode, ptr, pRsp); break; + case TDMT_VND_MQ_VG_CHANGE: + if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { + // TODO: handle error + } + break; +#if 0 case TDMT_VND_MQ_SET_CONN: { if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { // TODO: handle error @@ -93,6 +100,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg if (tqProcessCancelConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { } } break; +#endif case TDMT_VND_TASK_DEPLOY: { if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { @@ -307,4 +315,4 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg pRsp->contLen = sizeof(SSubmitRsp); return 0; -} \ No newline at end of file +} diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index c2c4af18e5..3d3d13fcb6 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -38,7 +38,7 @@ extern int openU(const char *, int, ...); /* MsvcLibX UTF-8 version of open */ #include #if !defined(_TD_DARWIN_64) - #include +#include #endif #include #include @@ -58,9 +58,9 @@ typedef int32_t FileFd; typedef struct TdFile { TdThreadRwlock rwlock; - int refId; - FileFd fd; - FILE *fp; + int refId; + FileFd fd; + FILE *fp; } * TdFilePtr, TdFile; #define FILE_WITH_LOCK 1 @@ -182,7 +182,7 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) { return 0; #else struct stat fileStat; - int32_t code = stat(path, &fileStat); + int32_t code = stat(path, &fileStat); if (code < 0) { return code; } @@ -203,7 +203,7 @@ int32_t taosDevInoFile(const char *path, int64_t *stDev, int64_t *stIno) { return 0; #else struct stat fileStat; - int32_t code = stat(path, &fileStat); + int32_t code = stat(path, &fileStat); if (code < 0) { return code; } @@ -226,7 +226,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) return NULL; #else - int fd = -1; + int fd = -1; FILE *fp = NULL; if (tdFileOptions & TD_FILE_STREAM) { char *mode = NULL; @@ -325,7 +325,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { #if FILE_WITH_LOCK taosThreadRwlockRdlock(&(pFile->rwlock)); #endif - assert(pFile->fd >= 0); // Please check if you have closed the file. + assert(pFile->fd >= 0); // Please check if you have closed the file. int64_t leftbytes = count; int64_t readbytes; char *tbuf = (char *)buf; @@ -365,7 +365,7 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset) #if FILE_WITH_LOCK taosThreadRwlockRdlock(&(pFile->rwlock)); #endif - assert(pFile->fd >= 0); // Please check if you have closed the file. + assert(pFile->fd >= 0); // Please check if you have closed the file. int64_t ret = pread(pFile->fd, buf, count, offset); #if FILE_WITH_LOCK taosThreadRwlockUnlock(&(pFile->rwlock)); @@ -380,7 +380,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { #if FILE_WITH_LOCK taosThreadRwlockWrlock(&(pFile->rwlock)); #endif - assert(pFile->fd >= 0); // Please check if you have closed the file. + /*assert(pFile->fd >= 0); // Please check if you have closed the file.*/ int64_t nleft = count; int64_t nwritten = 0; @@ -414,7 +414,7 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) { #if FILE_WITH_LOCK taosThreadRwlockRdlock(&(pFile->rwlock)); #endif - assert(pFile->fd >= 0); // Please check if you have closed the file. + assert(pFile->fd >= 0); // Please check if you have closed the file. int64_t ret = lseek(pFile->fd, (long)offset, whence); #if FILE_WITH_LOCK taosThreadRwlockUnlock(&(pFile->rwlock)); @@ -429,10 +429,10 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) { if (pFile == NULL) { return 0; } - assert(pFile->fd >= 0); // Please check if you have closed the file. + assert(pFile->fd >= 0); // Please check if you have closed the file. struct stat fileStat; - int32_t code = fstat(pFile->fd, &fileStat); + int32_t code = fstat(pFile->fd, &fileStat); if (code < 0) { return code; } @@ -456,7 +456,7 @@ int32_t taosLockFile(TdFilePtr pFile) { if (pFile == NULL) { return 0; } - assert(pFile->fd >= 0); // Please check if you have closed the file. + assert(pFile->fd >= 0); // Please check if you have closed the file. return (int32_t)flock(pFile->fd, LOCK_EX | LOCK_NB); #endif @@ -469,7 +469,7 @@ int32_t taosUnLockFile(TdFilePtr pFile) { if (pFile == NULL) { return 0; } - assert(pFile->fd >= 0); // Please check if you have closed the file. + assert(pFile->fd >= 0); // Please check if you have closed the file. return (int32_t)flock(pFile->fd, LOCK_UN | LOCK_NB); #endif @@ -529,7 +529,7 @@ int32_t taosFtruncateFile(TdFilePtr pFile, int64_t l_size) { if (pFile == NULL) { return 0; } - assert(pFile->fd >= 0); // Please check if you have closed the file. + assert(pFile->fd >= 0); // Please check if you have closed the file. return ftruncate(pFile->fd, l_size); #endif @@ -637,7 +637,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co } off_t len = count; while (len > 0) { - char buf[1024 * 16]; + char buf[1024 * 16]; off_t n = sizeof(buf); if (len < n) n = len; size_t m = fread(buf, 1, n, in_file); @@ -662,7 +662,7 @@ int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t count) { } off_t len = count; while (len > 0) { - char buf[1024 * 16]; + char buf[1024 * 16]; off_t n = sizeof(buf); if (len < n) n = len; size_t m = read(sfd, buf, n); @@ -750,7 +750,7 @@ void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length) { if (pFile == NULL) { return NULL; } - assert(pFile->fd >= 0); // Please check if you have closed the file. + assert(pFile->fd >= 0); // Please check if you have closed the file. void *ptr = mmap(NULL, length, PROT_READ, MAP_SHARED, pFile->fd, 0); return ptr; @@ -811,4 +811,4 @@ bool taosCheckAccessFile(const char *pathname, int32_t tdFileAccessOptions) { bool taosCheckExistFile(const char *pathname) { return taosCheckAccessFile(pathname, TD_FILE_ACCESS_EXIST_OK); }; -#endif // WINDOWS +#endif // WINDOWS diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 0181b63380..1c655fc2bf 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -76,7 +76,7 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) { } void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) { - if (pArray == NULL || pData == NULL) { + if (pData == NULL) { return NULL; } @@ -318,7 +318,6 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) { pArray->size = 0; } - void* taosArrayDestroy(SArray* pArray) { if (pArray) { taosMemoryFree(pArray->pData); @@ -445,7 +444,8 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void } SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy) { - SArray* pArray = taosArrayInit(pSrc->size, pSrc->elemSize); + ASSERT(pSrc->elemSize == sizeof(void*)); + SArray* pArray = taosArrayInit(pSrc->size, sizeof(void*)); for (int32_t i = 0; i < pSrc->size; i++) { void* clone = deepCopy(taosArrayGetP(pSrc, i)); taosArrayPush(pArray, &clone); @@ -471,6 +471,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t for (int32_t i = 0; i < sz; i++) { void* data = taosMemoryCalloc(1, dataSz); buf = decode(buf, data); + taosArrayPush(*pArray, &data); } return (void*)buf; } diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index f7bd273624..60064b174e 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -33,8 +33,8 @@ endi sql connect $dbNamme = d0 -print =============== create database , vgroup 1 -sql create database $dbNamme vgroups 1 +print =============== create database , vgroup 4 +sql create database $dbNamme vgroups 4 sql use $dbNamme print =============== create super table diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 5ea15a6622..9b627e187c 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -276,7 +276,7 @@ tmq_t* build_consumer() { } tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); - tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); + ASSERT(TMQ_CONF_OK == tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName)); tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0); assert(tmq); tmq_conf_destroy(conf); @@ -437,12 +437,14 @@ void* threadFunc(void* param) { pInfo->consumeMsgCnt = parallel_consume(tmq, 1); //} +#if 0 err = tmq_unsubscribe(tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); pInfo->consumeMsgCnt = -1; return NULL; } +#endif return NULL; } @@ -485,11 +487,13 @@ int main(int32_t argc, char* argv[]) { totalMsgs = parallel_consume(tmq, 0); } +#if 0 err = tmq_unsubscribe(tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } +#endif if (g_stConfInfo.numOfTopic1) { for (int32_t i = 0; i < numOfThreads; i++) { -- GitLab