From 21e5ddbb8d708f2dc955586a1165ef4d78721a26 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 28 Feb 2022 20:46:23 +0800 Subject: [PATCH] refactor tmq msg handle --- include/client/taos.h | 6 +- include/common/tmsg.h | 20 +- source/client/src/tmq.c | 373 ++++++++++++++++------ source/dnode/mgmt/impl/src/dndTransport.c | 1 - source/dnode/mnode/impl/src/mndTopic.c | 85 +---- source/dnode/vnode/inc/tq.h | 2 +- source/dnode/vnode/inc/vnode.h | 1 - source/dnode/vnode/src/inc/tqInt.h | 17 +- source/dnode/vnode/src/tq/tq.c | 189 ++--------- source/dnode/vnode/src/vnd/vnodeWrite.c | 2 +- 10 files changed, 324 insertions(+), 372 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 2c8135c8ff..8b1517c6ff 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -224,10 +224,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); -#if 0 -DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); -DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics); -#endif +DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); +DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); #if 0 diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ae3586e735..5d989421f6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1380,8 +1380,6 @@ typedef struct SMqCMGetSubEpReq { char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqCMGetSubEpReq; -#pragma pack(pop) - static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pMsg->contLen); @@ -1851,6 +1849,12 @@ typedef struct { SMqTbData* tbData; } SMqTopicData; +typedef struct { + int8_t mqMsgType; + int32_t code; + int32_t epoch; +} SMqRspHead; + typedef struct { int64_t consumerId; SSchemaWrapper* schemas; @@ -1867,6 +1871,7 @@ typedef struct { int64_t consumerId; int64_t blockingTime; + int32_t epoch; char cgroup[TSDB_CONSUMER_GROUP_LEN]; int64_t currentOffset; @@ -1886,11 +1891,18 @@ typedef struct { typedef struct { int64_t consumerId; - int32_t epoch; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray* topics; // SArray } SMqCMGetSubEpRsp; +struct tmq_message_t { + SMqRspHead head; + union { + SMqConsumeRsp consumeRsp; + SMqCMGetSubEpRsp getEpRsp; + }; +}; + static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { @@ -1972,6 +1984,8 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p return buf; } +#pragma pack(pop) + #ifdef __cplusplus } #endif diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 9a1025c4bd..5b4afda923 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -26,6 +26,7 @@ #include "tmsgtype.h" #include "tnote.h" #include "tpagedbuf.h" +#include "tqueue.h" #include "tref.h" struct tmq_list_t { @@ -59,22 +60,34 @@ struct tmq_t { char groupId[256]; char clientId[256]; int8_t autoCommit; - SRWLatch lock; int64_t consumerId; int32_t epoch; int32_t resetOffsetCfg; int64_t status; - tsem_t rspSem; STscObj* pTscObj; tmq_commit_cb* commit_cb; int32_t nextTopicIdx; SArray* clientTopics; // SArray + STaosQueue* mqueue; // queue of tmq_message_t + STaosQall* qall; + SRWLatch pollLock; // stat int64_t pollCnt; }; -struct tmq_message_t { - SMqConsumeRsp rsp; +enum { + TMQ_MSG_TYPE__POLL_RSP = 0, + TMQ_MSG_TYPE__EP_RSP, +}; + +enum { + TMQ_VG_STATUS__IDLE = 0, + TMQ_VG_STATUS__WAIT, +}; + +enum { + TMQ_CONSUMER_STATUS__INIT = 0, + TMQ_CONSUMER_STATUS__READY, }; typedef struct { @@ -84,6 +97,7 @@ typedef struct { int64_t currentOffset; // connection info int32_t vgId; + int32_t vgStatus; SEpSet epSet; } SMqClientVg; @@ -105,15 +119,16 @@ typedef struct { typedef struct { tmq_t* tmq; - int32_t wait; + int32_t sync; + tsem_t rspSem; } SMqAskEpCbParam; typedef struct { - tmq_t* tmq; - SMqClientVg* pVg; - tmq_message_t** retMsg; - tsem_t rspSem; -} SMqConsumeCbParam; + tmq_t* tmq; + SMqClientVg* pVg; + tmq_message_t* rspMsg; + tsem_t rspSem; +} SMqPollCbParam; typedef struct { tmq_t* tmq; @@ -210,6 +225,22 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } +tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { + if (*topics == NULL) { + *topics = tmq_list_new(); + } + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i); + tmq_list_append(*topics, strdup(topic->topicName)); + } + return TMQ_RESP_ERR__SUCCESS; +} + +tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) { + tmq_list_t* lst = tmq_list_new(); + return tmq_subscribe(tmq, lst); +} + tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = calloc(sizeof(tmq_t), 1); if (pTmq == NULL) { @@ -219,7 +250,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; - taosInitRWLatch(&pTmq->lock); + taosInitRWLatch(&pTmq->pollLock); // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -227,9 +258,11 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->commit_cb = conf->commit_cb; pTmq->resetOffsetCfg = conf->resetOffset; - tsem_init(&pTmq->rspSem, 0, 0); pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); + + pTmq->mqueue = taosOpenQueue(); + pTmq->qall = taosAllocateQall(); return pTmq; } @@ -291,7 +324,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in pParam->tmq = tmq; tsem_init(&pParam->rspSem, 0, 0); - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->param = pParam; @@ -366,10 +403,17 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tscError("failed to malloc request"); } - SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq}; + SMqSubscribeCbParam param = { + .rspErr = TMQ_RESP_ERR__SUCCESS, + .tmq = tmq, + }; tsem_init(¶m.rspSem, 0, 0); - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->param = ¶m; @@ -392,36 +436,6 @@ _return: void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; } -SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { - tmq_t* pTmq = (void*)param; - SArray* pArray = taosArrayInit(0, sizeof(SKv)); - if (pArray == NULL) { - return NULL; - } - SKv kv = {0}; - kv.key = HEARTBEAT_KEY_MQ_TMP; - - SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg)); - if (pMqHb == NULL) { - return pArray; - } - pMqHb->consumerId = connKey.connId; - SArray* clientTopics = pTmq->clientTopics; - int sz = taosArrayGetSize(clientTopics); - for (int i = 0; i < sz; i++) { - SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i); - /*if (pCTopic->vgId == -1) {*/ - /*pMqHb->status = 1;*/ - /*break;*/ - /*}*/ - } - kv.value = pMqHb; - kv.valueLen = sizeof(SMqHbMsg); - taosArrayPush(pArray, &kv); - - return pArray; -} - TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { STscObj* pTscObj = (STscObj*)taos; SRequestObj* pRequest = NULL; @@ -579,7 +593,7 @@ void tmqShowMsg(tmq_message_t* tmq_message) { static bool noPrintSchema; char pBuf[128]; - SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; int32_t colNum = pRsp->schemas->nCols; if (!noPrintSchema) { printf("|"); @@ -619,17 +633,16 @@ void tmqShowMsg(tmq_message_t* tmq_message) { } int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { - SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; - SMqClientVg* pVg = pParam->pVg; + SMqPollCbParam* pParam = (SMqPollCbParam*)param; + SMqClientVg* pVg = pParam->pVg; if (code != 0) { printf("msg discard\n"); - tsem_post(&pParam->rspSem); return 0; } SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp)); if (pRsp == NULL) { - tsem_post(&pParam->rspSem); + taosWUnLockLatch(&pParam->tmq->pollLock); return -1; } tDecodeSMqConsumeRsp(pMsg->pData, pRsp); @@ -637,76 +650,80 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { if (pRsp->numOfTopics == 0) { /*printf("no data\n");*/ free(pRsp); - tsem_post(&pParam->rspSem); + taosWUnLockLatch(&pParam->tmq->pollLock); return 0; } - *pParam->retMsg = (tmq_message_t*)pRsp; + pParam->rspMsg = (tmq_message_t*)pRsp; pVg->currentOffset = pRsp->rspOffset; /*printf("rsp offset: %ld\n", rsp.rspOffset);*/ /*printf("-----msg begin----\n");*/ - tsem_post(&pParam->rspSem); + taosWUnLockLatch(&pParam->tmq->pollLock); /*printf("\n-----msg end------\n");*/ return 0; } +bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { + bool set = false; + int32_t sz = taosArrayGetSize(pRsp->topics); + if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); + tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); + for (int32_t i = 0; i < sz; i++) { + SMqClientTopic topic = {0}; + SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); + topic.topicName = strdup(pTopicEp->topic); + int32_t vgSz = taosArrayGetSize(pTopicEp->vgs); + topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); + for (int32_t j = 0; j < vgSz; j++) { + SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); + SMqClientVg clientVg = { + .pollCnt = 0, + .currentOffset = pVgEp->offset, + .vgId = pVgEp->vgId, + .epSet = pVgEp->epSet, + .vgStatus = TMQ_VG_STATUS__IDLE, + }; + taosArrayPush(topic.vgs, &clientVg); + set = true; + } + taosArrayPush(tmq->clientTopics, &topic); + } + atomic_store_32(&tmq->epoch, epoch); + return set; +} + int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = pParam->tmq; if (code != 0) { - printf("get topic endpoint error, not ready, wait:%d\n", pParam->wait); - if (pParam->wait) { - tsem_post(&tmq->rspSem); + printf("get topic endpoint error, not ready, wait:%d\n", pParam->sync); + if (pParam->sync) { + tsem_post(&pParam->rspSem); } return 0; } tscDebug("tmq ask ep cb called"); - bool set = false; - SMqCMGetSubEpRsp rsp; - tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); - int32_t sz = taosArrayGetSize(rsp.topics); - // TODO: lock - /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ - /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ - if (rsp.epoch != tmq->epoch) { - // TODO - if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); - tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); - for (int32_t i = 0; i < sz; i++) { - SMqClientTopic topic = {0}; - SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i); - topic.topicName = strdup(pTopicEp->topic); - int32_t vgSz = taosArrayGetSize(pTopicEp->vgs); - topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); - for (int32_t j = 0; j < vgSz; j++) { - SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); - // clang-format off - SMqClientVg clientVg = { - .pollCnt = 0, - .currentOffset = pVgEp->offset, - .vgId = pVgEp->vgId, - .epSet = pVgEp->epSet - }; - // clang-format on - taosArrayPush(topic.vgs, &clientVg); - set = true; - } - taosArrayPush(tmq->clientTopics, &topic); + if (pParam->sync) { + SMqRspHead* head = pMsg->pData; + SMqCMGetSubEpRsp rsp; + tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); + /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ + /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ + int32_t epoch = atomic_load_32(&tmq->epoch); + if (head->epoch > epoch && tmqUpdateEp(tmq, head->epoch, &rsp)) { + atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY); } - tmq->epoch = rsp.epoch; - } - if (set) { - atomic_store_64(&tmq->status, 1); - } - // unlock - /*tsem_post(&tmq->rspSem);*/ - if (pParam->wait) { - tsem_post(&tmq->rspSem); + tsem_post(&pParam->rspSem); + tDeleteSMqCMGetSubEpRsp(&rsp); + } else { + tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); + memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqCMGetSubEpRsp(pMsg->pData, &pRsp->getEpRsp); + taosWriteQitem(tmq->mqueue, pRsp); } - tDeleteSMqCMGetSubEpRsp(&rsp); return 0; } -int32_t tmqAskEp(tmq_t* tmq, bool wait) { +int32_t tmqAskEp(tmq_t* tmq, bool sync) { int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { @@ -723,7 +740,11 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { goto END; } - pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam)); if (pParam == NULL) { @@ -731,7 +752,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { goto END; } pParam->tmq = tmq; - pParam->wait = wait; + pParam->sync = sync; + tsem_init(&pParam->rspSem, 0, 0); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; @@ -744,7 +766,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool wait) { asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); END: - if (wait) tsem_wait(&tmq->rspSem); + if (sync) tsem_wait(&pParam->rspSem); return 0; } @@ -792,6 +814,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie pReq->blockingTime = blocking_time; pReq->consumerId = tmq->consumerId; + pReq->epoch = tmq->epoch; pReq->currentOffset = reqOffset; pReq->head.vgId = htonl(pVg->vgId); @@ -799,11 +822,146 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, SMqClie return pReq; } +void tmqClearUnhandleMsg(tmq_t* tmq) { + tmq_message_t* msg; + while (1) { + taosGetQitem(tmq->qall, (void**)&msg); + if (msg) + taosFreeQitem(msg); + else + break; + } + + taosReadAllQitems(tmq->mqueue, tmq->qall); + while (1) { + taosGetQitem(tmq->qall, (void**)&msg); + if (msg) + taosFreeQitem(msg); + else + break; + } +} + +int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); + if (vgStatus != TMQ_VG_STATUS__IDLE) { + continue; + } + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); + if (pReq == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return -1; + } + SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); + if (param == NULL) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // TODO: out of mem + return -1; + } + param->tmq = tmq; + param->pVg = pVg; + SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); + pRequest->body.requestMsg = (SDataBuf){ + .pData = pReq, + .len = sizeof(SMqConsumeReq), + .handle = NULL, + }; + + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = param; + sendInfo->fp = tmqPollCb; + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + pVg->pollCnt++; + tmq->pollCnt++; + } + } + return 0; +} + +void tmqFetchLeftRes(tmq_t* tmq, tmq_message_t** pRspMsg) { + taosGetQitem(tmq->qall, (void**)pRspMsg); + if (pRspMsg == NULL) { + taosReadAllQitems(tmq->mqueue, tmq->qall); + taosGetQitem(tmq->qall, (void**)pRspMsg); + } +} + +// return +int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { + if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) { + if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) { + tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp); + tmqClearUnhandleMsg(tmq); + *pReset = true; + } else { + *pReset = false; + } + } else { + *pReset = false; + return -1; + } + return 0; +} + tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { - tmq_message_t* tmq_message = NULL; + tmq_message_t* rspMsg = NULL; + int64_t startTime = taosGetTimestampMs(); + // TODO: put into another thread or delayed queue int64_t status = atomic_load_64(&tmq->status); - tmqAskEp(tmq, status == 0); + tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); + + tmqFetchLeftRes(tmq, &rspMsg); + + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) { + taosReadAllQitems(tmq->mqueue, tmq->qall); + } + + while (1) { + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) break; + if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { + return rspMsg; + } + bool reset = false; + tmqHandleRes(tmq, rspMsg, &reset); + taosFreeQitem(rspMsg); + } + + tmqPollImpl(tmq, blocking_time); + + while (1) { + taosReadAllQitems(tmq->mqueue, tmq->qall); + while (1) { + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) break; + + if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { + return rspMsg; + } else { + bool reset = false; + tmqHandleRes(tmq, rspMsg, &reset); + taosFreeQitem(rspMsg); + if (reset) tmqPollImpl(tmq, blocking_time); + } + } + int64_t endTime = taosGetTimestampMs(); + if (endTime - startTime > blocking_time) { + return NULL; + } + } +} + +#if 0 if (blocking_time <= 0) blocking_time = 1; if (blocking_time > 1000) blocking_time = 1000; @@ -835,7 +993,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { return NULL; } - SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam)); + SMqPollCbParam* param = malloc(sizeof(SMqPollCbParam)); if (param == NULL) { ASSERT(false); usleep(blocking_time * 1000); @@ -847,7 +1005,11 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tsem_init(¶m->rspSem, 0, 0); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL}; + pRequest->body.requestMsg = (SDataBuf){ + .pData = pReq, + .len = sizeof(SMqConsumeReq), + .handle = NULL, + }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; @@ -887,6 +1049,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { /*return pRequest;*/ } +#endif #if 0 tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 0aae145d2f..b68bed8789 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -114,7 +114,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = dndProcessMnodeWriteMsg; - /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index d2318009d5..9faabc3874 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -72,7 +72,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { if (pRaw == NULL) goto TOPIC_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER); SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER); @@ -121,7 +121,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { int32_t len; int32_t dataPos = 0; - SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER); + SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER); SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER); @@ -206,7 +206,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { SName name = {0}; tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - char db[TSDB_TABLE_FNAME_LEN] = {0}; + char db[TSDB_TOPIC_FNAME_LEN] = {0}; tNameGetFullDbName(&name, db); return mndAcquireDb(pMnode, db); @@ -223,7 +223,7 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq pDrop->head.contLen = htonl(contLen); pDrop->head.vgId = htonl(pVgroup->vgId); - memcpy(pDrop->name, pTopic->name, TSDB_TABLE_FNAME_LEN); + memcpy(pDrop->name, pTopic->name, TSDB_TOPIC_FNAME_LEN); pDrop->tuid = htobe64(pTopic->uid); return pDrop; @@ -343,6 +343,7 @@ CREATE_TOPIC_OVER: } static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { + // TODO: cannot drop when subscribed STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); @@ -408,76 +409,7 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - STableInfoReq infoReq = {0}; - - if (tSerializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - mDebug("topic:%s, start to retrieve meta", infoReq.tbName); - #if 0 - SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname); - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("topic:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - STopicObj *pTopic = mndAcquireTopic(pMnode, pInfo->tableFname); - if (pTopic == NULL) { - mndReleaseDb(pMnode, pDb); - terrno = TSDB_CODE_MND_INVALID_TOPIC; - mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - taosRLockLatch(&pTopic->lock); - int32_t totalCols = pTopic->numOfColumns + pTopic->numOfTags; - int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema); - - STableMetaRsp *pMeta = rpcMallocCont(contLen); - if (pMeta == NULL) { - taosRUnLockLatch(&pTopic->lock); - mndReleaseDb(pMnode, pDb); - mndReleaseTopic(pMnode, pTopic); - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("topic:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); - return -1; - } - - memcpy(pMeta->topicFname, pTopic->name, TSDB_TABLE_FNAME_LEN); - pMeta->numOfTags = htonl(pTopic->numOfTags); - pMeta->numOfColumns = htonl(pTopic->numOfColumns); - pMeta->precision = pDb->cfg.precision; - pMeta->tableType = TSDB_SUPER_TABLE; - pMeta->update = pDb->cfg.update; - pMeta->sversion = htonl(pTopic->version); - pMeta->tuid = htonl(pTopic->uid); - - for (int32_t i = 0; i < totalCols; ++i) { - SSchema *pSchema = &pMeta->pSchemas[i]; - SSchema *pSrcSchema = &pTopic->pSchema[i]; - memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); - pSchema->type = pSrcSchema->type; - pSchema->colId = htonl(pSrcSchema->colId); - pSchema->bytes = htonl(pSrcSchema->bytes); - } - taosRUnLockLatch(&pTopic->lock); - mndReleaseDb(pMnode, pDb); - mndReleaseTopic(pMnode, pTopic); - - pReq->pCont = pMeta; - pReq->contLen = contLen; - - mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags); -#endif - return 0; -} - static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); @@ -504,6 +436,7 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo mndReleaseDb(pMnode, pDb); return 0; } +#endif static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; @@ -571,7 +504,7 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in if (pTopic->dbUid != pDb->uid) { if (strncmp(pTopic->name, prefix, prefixLen) != 0) { - mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid); + mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid); } sdbRelease(pSdb, pTopic); @@ -580,8 +513,8 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in cols = 0; - char topicName[TSDB_TABLE_NAME_LEN] = {0}; - tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TABLE_NAME_LEN); + char topicName[TSDB_TOPIC_NAME_LEN] = {0}; + tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; STR_TO_VARSTR(pWrite, topicName); cols++; diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index a516f423bb..d8c9d11ce9 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -52,7 +52,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl void tqClose(STQ*); // required by vnode -int tqPushMsg(STQ*, void* msg, int64_t version); +int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version); int tqCommit(STQ*); int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 3a06674e3c..7c8f97bb8b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -221,7 +221,6 @@ static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const S for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i); taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); - // pHandle->tbUid = tbUid; } return 0; } diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index 344ad992f0..a801b6f7ae 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -79,19 +79,19 @@ extern int32_t tqDebugFlag; // 4096 - 4080 #define TQ_IDX_PAGE_HEAD_SIZE 16 -#define TQ_ACTION_CONST 0 -#define TQ_ACTION_INUSE 1 +#define TQ_ACTION_CONST 0 +#define TQ_ACTION_INUSE 1 #define TQ_ACTION_INUSE_CONT 2 -#define TQ_ACTION_INTXN 3 +#define TQ_ACTION_INTXN 3 #define TQ_SVER 0 // TODO: inplace mode is not implemented #define TQ_UPDATE_INPLACE 0 -#define TQ_UPDATE_APPEND 1 +#define TQ_UPDATE_APPEND 1 #define TQ_DUP_INTXN_REWRITE 0 -#define TQ_DUP_INTXN_REJECT 2 +#define TQ_DUP_INTXN_REJECT 2 static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } @@ -160,7 +160,7 @@ struct STQ { STqMemRef tqMemRef; STqMetaStore* tqMeta; SWal* pWal; - SMeta* pMeta; + SMeta* pVnodeMeta; }; typedef struct { @@ -190,9 +190,6 @@ typedef struct { char* logicalPlan; char* physicalPlan; char* qmsg; - int64_t persistedOffset; - int64_t committedOffset; - int64_t currentOffset; STqBuffer buffer; SWalReadHandle* pReadhandle; } STqTopic; @@ -201,7 +198,7 @@ typedef struct { int64_t consumerId; int64_t epoch; char cgroup[TSDB_TOPIC_FNAME_LEN]; - SArray* topics; // SArray + SArray* topics; // SArray } STqConsumer; int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ac9dde3597..7c8c96fb54 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -42,7 +42,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl pTq->path = strdup(path); pTq->tqConfig = tqConfig; pTq->pWal = pWal; - pTq->pMeta = pMeta; + pTq->pVnodeMeta = pMeta; #if 0 pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocator = allocFac->create(allocFac); @@ -71,9 +71,11 @@ void tqClose(STQ* pTq) { // TODO } -int tqPushMsg(STQ* pTq, void* p, int64_t version) { - // add reference - // judge and launch new query +int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { + // TODO: add reference + // if handle waiting, launch query and response to consumer + // + // if no waiting handle, return return 0; } @@ -101,9 +103,9 @@ static FORCE_INLINE int32_t tEncodeSTqTopic(void** buf, const STqTopic* pTopic) /*tlen += taosEncodeString(buf, pTopic->logicalPlan);*/ /*tlen += taosEncodeString(buf, pTopic->physicalPlan);*/ tlen += taosEncodeString(buf, pTopic->qmsg); - tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset); - tlen += taosEncodeFixedI64(buf, pTopic->committedOffset); - tlen += taosEncodeFixedI64(buf, pTopic->currentOffset); + /*tlen += taosEncodeFixedI64(buf, pTopic->persistedOffset);*/ + /*tlen += taosEncodeFixedI64(buf, pTopic->committedOffset);*/ + /*tlen += taosEncodeFixedI64(buf, pTopic->currentOffset);*/ return tlen; } @@ -113,9 +115,9 @@ static FORCE_INLINE const void* tDecodeSTqTopic(const void* buf, STqTopic* pTopi /*buf = taosDecodeString(buf, &pTopic->logicalPlan);*/ /*buf = taosDecodeString(buf, &pTopic->physicalPlan);*/ buf = taosDecodeString(buf, &pTopic->qmsg); - buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset); - buf = taosDecodeFixedI64(buf, &pTopic->committedOffset); - buf = taosDecodeFixedI64(buf, &pTopic->currentOffset); + /*buf = taosDecodeFixedI64(buf, &pTopic->persistedOffset);*/ + /*buf = taosDecodeFixedI64(buf, &pTopic->committedOffset);*/ + /*buf = taosDecodeFixedI64(buf, &pTopic->currentOffset);*/ return buf; } @@ -194,8 +196,8 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu } for (int j = 0; j < TQ_BUFFER_SIZE; j++) { pTopic->buffer.output[j].status = 0; - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); - SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; + STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta}; pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); } @@ -243,7 +245,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and - // rsponse to user + // response to user break; } pHead = pTopic->pReadhandle->pHead; @@ -268,7 +270,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { taosArrayPush(pRes, pDataBlock); rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; rsp.rspOffset = fetchOffset; - pTopic->currentOffset = fetchOffset; rsp.numOfTopics = 1; rsp.pBlockData = pRes; @@ -312,158 +313,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } -#if 0 -int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) { - SMqConsumeReq* pReq = pMsg->pCont; - int64_t reqId = pReq->reqId; - int64_t consumerId = pReq->consumerId; - int64_t fetchOffset = pReq->offset; - int64_t blockingTime = pReq->blockingTime; - - SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; - - /*printf("vg %d get consume req\n", pReq->head.vgId);*/ - - STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); - if (pConsumer == NULL) { - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = -1; - rpcSendResponse(pMsg); - return 0; - } - int sz = taosArrayGetSize(pConsumer->topics); - - for (int i = 0; i < sz; i++) { - STqTopic* pTopic = taosArrayGet(pConsumer->topics, i); - // TODO: support multiple topic in one req - if (strcmp(pTopic->topicName, pReq->topic) != 0) { - ASSERT(false); - continue; - } - - if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) { - pTopic->committedOffset = pReq->offset; - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = 0; - rpcSendResponse(pMsg); - return 0; - } - - if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) { - pTopic->committedOffset = pReq->offset - 1; - } - - rsp.committedOffset = pTopic->committedOffset; - rsp.reqOffset = pReq->offset; - rsp.skipLogNum = 0; - - if (fetchOffset <= pTopic->committedOffset) { - fetchOffset = pTopic->committedOffset + 1; - } - /*printf("vg %d fetch Offset %ld\n", pReq->head.vgId, fetchOffset);*/ - int8_t pos; - int8_t skip = 0; - SWalHead* pHead; - while (1) { - pos = fetchOffset % TQ_BUFFER_SIZE; - skip = atomic_val_compare_exchange_8(&pTopic->buffer.output[pos].status, 0, 1); - if (skip == 1) { - // do nothing - break; - } - if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { - printf("read offset %ld\n", fetchOffset); - // check err - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - skip = 1; - break; - } - // read until find TDMT_VND_SUBMIT - pHead = pTopic->pReadhandle->pHead; - if (pHead->head.msgType == TDMT_VND_SUBMIT) { - } - rsp.skipLogNum++; - if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { - printf("read offset %ld\n", fetchOffset); - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - skip = 1; - break; - } - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - fetchOffset++; - } - if (skip == 1) continue; - SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body; - qTaskInfo_t task = pTopic->buffer.output[pos].task; - - printf("current fetch offset %ld\n", fetchOffset); - qSetStreamInput(task, pCont); - - // SArray - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - while (1) { - SSDataBlock* pDataBlock; - uint64_t ts; - if (qExecTask(task, &pDataBlock, &ts) < 0) { - break; - } - if (pDataBlock != NULL) { - taosArrayPush(pRes, pDataBlock); - } else { - break; - } - } - // TODO copy - rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; - rsp.rspOffset = fetchOffset; - pTopic->currentOffset = fetchOffset; - - atomic_store_8(&pTopic->buffer.output[pos].status, 0); - - if (taosArrayGetSize(pRes) == 0) { - taosArrayDestroy(pRes); - fetchOffset++; - continue; - } else { - rsp.numOfTopics++; - } - - rsp.pBlockData = pRes; - -#if 0 - pTopic->buffer.output[pos].dst = pRes; - if (pTopic->buffer.firstOffset == -1 || pReq->offset < pTopic->buffer.firstOffset) { - pTopic->buffer.firstOffset = pReq->offset; - } - if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) { - pTopic->buffer.lastOffset = pReq->offset; - } -#endif - } - int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - pMsg->code = -1; - return -1; - } - void* abuf = buf; - tEncodeSMqConsumeRsp(&abuf, &rsp); - - if (rsp.pBlockData) { - taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); - rsp.pBlockData = NULL; - } - - pMsg->pCont = buf; - pMsg->contLen = tlen; - pMsg->code = 0; - rpcSendResponse(pMsg); - return 0; -} -#endif - int32_t tqProcessRebReq(STQ* pTq, char* msg) { SMqMVRebReq req = {0}; tDecodeSMqMVRebReq(msg, &req); @@ -505,8 +354,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { pTopic->logicalPlan = req.logicalPlan; pTopic->physicalPlan = req.physicalPlan; pTopic->qmsg = req.qmsg; - pTopic->committedOffset = -1; - pTopic->currentOffset = -1; + /*pTopic->committedOffset = -1;*/ + /*pTopic->currentOffset = -1;*/ pTopic->buffer.firstOffset = -1; pTopic->buffer.lastOffset = -1; @@ -516,8 +365,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { } for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); - SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pMeta}; + STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta}; pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); } diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index c3947da459..81eb09f48f 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -59,7 +59,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // todo: change the interface here int64_t ver; taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver); - if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) { + if (tqPushMsg(pVnode->pTq, ptr, pMsg->msgType, ver) < 0) { // TODO: handle error } -- GitLab