diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 57458ff8f71b510f7ad1c82c20af7ef6dc66858f..7267c72ec28834cda4944cb89bac2720f66d6817 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -534,7 +534,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL; if (index) { if (colField[*index].type != kv->type) { - uError("SML:0x%" PRIx64 " point type and db type mismatch. point type: %d, db type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key); + uError("SML:0x%" PRIx64 " point type and db type mismatch. db type: %d, point type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key); return TSDB_CODE_SML_INVALID_DATA; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9292be83e9a05caf41e0e12f8e05bc63cd8350f3..b5ae9116ef89798cdf3fde8ceb381b4b529d8769 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1377,7 +1377,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN); - tscDebug("consumer:0x%" PRIx64 ", update topic:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); + tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); for (int32_t j = 0; j < vgNumGet; j++) { @@ -1447,14 +1447,14 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); if (pTopicCur->vgs) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); - tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur); + tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); char buf[80]; tFormatOffset(buf, 80, &pVgCur->currentOffset); - tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, + tscDebug("consumer:0x%" PRIx64 ", doUpdateLocalEp current vg, epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, tmq->epoch, pVgCur->vgId, vgKey, buf); SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows}; @@ -1790,8 +1790,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->epSet = *pollRspWrapper->pEpset; } - // update the local offset value only for the returned values. - pVg->currentOffset = pDataRsp->rspOffset; + if(pDataRsp->rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pDataRsp->rspOffset; // update the local offset value only for the returned values. + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); char buf[80]; @@ -1801,12 +1802,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { " total:%" PRId64 " reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); + pVg->emptyBlockReceiveTs = taosGetTimestampMs(); taosFreeQitem(pollRspWrapper); } else { // build rsp int64_t numOfRows = 0; SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tmq->totalRows += numOfRows; - + pVg->emptyBlockReceiveTs = 0; tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, @@ -1828,7 +1830,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; + if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); @@ -1846,7 +1850,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; + if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->taosxRsp.blockNum == 0) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 155ef7a62a6c6c7c058a59526ff345d01e7fbf41..8b14b7fbe724406c800d5e5d7699ffaf6ca1e3f8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5329,9 +5329,9 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t headLen = sizeof(SMsgHead); - SMsgHead *pHead = buf; - pHead->vgId = pReq->head.vgId; - pHead->contLen = pReq->head.contLen; +// SMsgHead *pHead = buf; +// pHead->vgId = pReq->head.vgId; +// pHead->contLen = pReq->head.contLen; SDecoder decoder = {0}; tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); @@ -6839,10 +6839,8 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1; } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1; - } else if (pOffsetVal->type < 0) { - // do nothing } else { - ASSERT(0); + // do nothing } return 0; } @@ -6854,10 +6852,8 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1; } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1; - } else if (pOffsetVal->type < 0) { - // do nothing } else { - ASSERT(0); + // do nothing } return 0; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d61eb3ec0390af04e97c2977131d8381c26b351a..fc724f2b45dc43c1a903833884d44f9f7d86e7ac 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -519,7 +519,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 65a2fa72a2f5ed53b6435b62caa757c0fb086ad5..ca71e17d7e07a234b20e0a98b82665f718402d0e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -449,7 +449,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 1. check consumer status int32_t status = atomic_load_32(&pConsumer->status); -#if 1 if (status == MQ_CONSUMER_STATUS__LOST_REBD) { mInfo("try to recover consumer:0x%" PRIx64, consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); @@ -463,7 +462,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); } -#endif if (status != MQ_CONSUMER_STATUS__READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index acc0d29382a17a0fef955bdec489b50d4a7a2233..30b2fb74ca3251184b3d116db81046a3a9d81919 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -100,6 +100,7 @@ typedef struct { SWalRef* pRef; STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec + SRpcMsg* msg; } STqHandle; typedef struct { @@ -113,7 +114,7 @@ struct STQ { char* path; int64_t walLogLastVer; SRWLatch lock; - SHashObj* pPushMgr; // consumerId -> STqPushEntry + SHashObj* pPushMgr; // subKey -> STqHandle SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; @@ -146,7 +147,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type); -int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); +int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle); // tqMeta int32_t tqMetaOpen(STQ* pTq); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 416bc6cdc73a794b77eac85c724f6f21aec83a47..7668d45108b14a1d3e537b52ca8b52912484329a 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -193,9 +193,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode); void tqNotifyClose(STQ*); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); -int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type); -int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); +int tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg); +int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); @@ -214,6 +213,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit); +int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 36c35ab4157c50a1da244d6742b75b6946dbf904..14c917836566ae958165052f2f1f61ca3398ed7e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -71,18 +71,11 @@ static void destroyTqHandle(void* data) { walCloseReader(pData->pWalReader); tqCloseReader(pData->execHandle.pTqReader); } -} - -static void tqPushEntryFree(void* data) { - STqPushEntry* p = *(void**)data; - if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - tDeleteSMqDataRsp(p->pDataRsp); - } else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) { - tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp); + if(pData->msg != NULL) { + rpcFreeCont(pData->msg->pCont); + taosMemoryFree(pData->msg); + pData->msg = NULL; } - - taosMemoryFree(p->pDataRsp); - taosMemoryFree(p); } static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { @@ -105,8 +98,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosHashSetFreeFp(pTq->pHandle, destroyTqHandle); taosInitRWLatch(&pTq->lock); - pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree); + pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); @@ -228,17 +220,19 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData return 0; } -int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { - SMqDataRsp* pRsp = pPushEntry->pDataRsp; - SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; - doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); +int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle) { + SMqDataRsp dataRsp = {0}; + dataRsp.head.consumerId = pHandle->consumerId; + dataRsp.head.epoch = pHandle->epoch; + dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + doSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP); char buf1[80] = {0}; char buf2[80] = {0}; - tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset); - tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset); + tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); + tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", - TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); + TD_VID(pTq->pVnode), dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); return 0; } @@ -382,13 +376,13 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); - - taosWLockLatch(&pTq->lock); - int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); - if (code != 0) { - tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); - } - taosWUnLockLatch(&pTq->lock); + int32_t code = 0; +// taosWLockLatch(&pTq->lock); +// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); +// if (code != 0) { +// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); +// } +// taosWUnLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { @@ -557,13 +551,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } taosWLockLatch(&pTq->lock); - atomic_store_32(&pHandle->epoch, -1); + atomic_store_32(&pHandle->epoch, 0); // remove if it has been register in the push manager, and return one empty block to consumer - tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); + tqUnregisterPushHandle(pTq, pHandle); atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_add_fetch_32(&pHandle->epoch, 1); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { qStreamCloseTsdbReader(pTaskInfo); @@ -1069,6 +1062,34 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { return 0; } +int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + + taosWLockLatch(&pTq->lock); + if(taosHashGetSize(pTq->pPushMgr) > 0){ + void *pIter = taosHashIterate(pTq->pPushMgr, NULL); + while(pIter){ + STqHandle* pHandle = *(STqHandle**)pIter; + tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId); + if(ASSERT(pHandle->msg != NULL)){ + tqError("pHandle->msg should not be null"); + break; + }else{ + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; + tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } + pIter = taosHashIterate(pTq->pPushMgr, pIter); + } + taosHashClear(pTq->pPushMgr); + } + // unlock + taosWUnLockLatch(&pTq->lock); + + return 0; +} + int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { #if 0 void* pIter = NULL; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 7a1a6b74548e285b82663ce2c5b473b825518531..68240195d786847a84983a0740af334c30b32b62 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -206,121 +206,60 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ } #endif -typedef struct { - void* pKey; - int64_t keyLen; -} SItem; - -static void recordPushedEntry(SArray* cachedKey, void* pIter); -static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq); - -static void freeItem(void* param) { - SItem* p = (SItem*)param; - taosMemoryFree(p->pKey); -} - -static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData, - int32_t dataLen, SArray* pCachedKey) { - STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; - - SMqDataRsp* pRsp = pPushEntry->pDataRsp; - if (pRsp->reqOffset.version >= ver) { - tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId, - pRsp->reqOffset.version, ver); - return; - } - - qTaskInfo_t pTaskInfo = pExec->task; - - // prepare scan mem data - SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver}; - - if (qStreamSetScanMemData(pTaskInfo, submit) != 0) { - return; - } - qStreamSetOpen(pTaskInfo); - // here start to scan submit block to extract the subscribed data - int32_t totalRows = 0; - - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) { - tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr()); - } - - if (pDataBlock == NULL) { - break; - } - - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); - pRsp->blockNum++; - totalRows += pDataBlock->info.rows; - } - - tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum, - totalRows); - - if (pRsp->blockNum > 0) { - tqOffsetResetToLog(&pRsp->rspOffset, ver); - tqPushDataRsp(pTq, pPushEntry); - recordPushedEntry(pCachedKey, pIter); - } -} - int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); - int32_t len = msgLen - sizeof(SSubmitReq2Msg); - int32_t vgId = TD_VID(pTq->pVnode); +// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); +// int32_t len = msgLen - sizeof(SSubmitReq2Msg); +// int32_t vgId = TD_VID(pTq->pVnode); if (msgType == TDMT_VND_SUBMIT) { + tqProcessSubmitReqForSubscribe(pTq); // lock push mgr to avoid potential msg lost - taosWLockLatch(&pTq->lock); - - int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); - if (numOfRegisteredPush > 0) { - tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", - vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); - - void* data = taosMemoryMalloc(len); - if (data == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); - taosWUnLockLatch(&pTq->lock); - return -1; - } - - memcpy(data, pReq, len); - - SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); - void* pIter = NULL; - - while (1) { - pIter = taosHashIterate(pTq->pPushMgr, pIter); - if (pIter == NULL) { - break; - } - - STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; - - STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); - if (pHandle == NULL) { - tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, - pPushEntry->subKey); - continue; - } - - STqExecHandle* pExec = &pHandle->execHandle; - doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); - } - - doRemovePushedEntry(cachedKey, pTq); - taosArrayDestroyEx(cachedKey, freeItem); - taosMemoryFree(data); - } - - // unlock - taosWUnLockLatch(&pTq->lock); +// taosWLockLatch(&pTq->lock); +// +// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); +// if (numOfRegisteredPush > 0) { +// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", +// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); +// +// void* data = taosMemoryMalloc(len); +// if (data == NULL) { +// terrno = TSDB_CODE_OUT_OF_MEMORY; +// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); +// taosWUnLockLatch(&pTq->lock); +// return -1; +// } +// +// memcpy(data, pReq, len); +// +// SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); +// void* pIter = NULL; +// +// while (1) { +// pIter = taosHashIterate(pTq->pPushMgr, pIter); +// if (pIter == NULL) { +// break; +// } +// +// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; +// +// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); +// if (pHandle == NULL) { +// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, +// pPushEntry->subKey); +// continue; +// } +// +// STqExecHandle* pExec = &pHandle->execHandle; +// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); +// } +// +// doRemovePushedEntry(cachedKey, pTq); +// taosArrayDestroyEx(cachedKey, freeItem); +// taosMemoryFree(data); +// } +// +// // unlock +// taosWUnLockLatch(&pTq->lock); } tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks)); @@ -362,83 +301,39 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } -int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type) { - uint64_t consumerId = pRequest->consumerId; - int32_t vgId = TD_VID(pTq->pVnode); - STqHandle* pTqHandle = pHandle; - - STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); - if (pPushEntry == NULL) { - tqDebug("tmq poll: consumer:0x%" PRIx64 ", vgId:%d failed to malloc, size:%d", consumerId, vgId, - (int32_t)sizeof(STqPushEntry)); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pPushEntry->info = pRpcMsg->info; - memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); - if (type == TMQ_MSG_TYPE__TAOSX_RSP) { - pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(STaosxRsp)); - memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(STaosxRsp)); - } else if (type == TMQ_MSG_TYPE__POLL_RSP) { - pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(SMqDataRsp)); - memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(SMqDataRsp)); +int32_t tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg) { + int32_t vgId = TD_VID(pTq->pVnode); + STqHandle* pHandle = (STqHandle*) handle; + if(pHandle->msg == NULL){ + pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); + pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); + }else{ + void *tmp = pHandle->msg->pCont; + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); + pHandle->msg->pCont = tmp; } - SMqRspHead* pHead = &pPushEntry->pDataRsp->head; - pHead->consumerId = consumerId; - pHead->epoch = pRequest->epoch; - pHead->mqMsgType = type; - - taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*)); - - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d", - consumerId, pTqHandle->subKey, pDataRsp->reqOffset.version, vgId, taosHashGetSize(pTq->pPushMgr)); + memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); + pHandle->msg->contLen = pMsg->contLen; + int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); return 0; } -int32_t tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) { - int32_t vgId = TD_VID(pTq->pVnode); - STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen); - - if (pEntry != NULL) { - uint64_t cId = (*pEntry)->pDataRsp->head.consumerId; - ASSERT(consumerId == cId); - - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId, - (*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1); +int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { + STqHandle *pHandle = (STqHandle*)handle; + int32_t vgId = TD_VID(pTq->pVnode); - if (rspConsumer) { // rsp the old consumer with empty block. - tqPushDataRsp(pTq, *pEntry); - } + int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); + tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); + if(pHandle->msg != NULL) { + tqPushDataRsp(pTq, pHandle); - taosHashRemove(pTq->pPushMgr, pKey, keyLen); + rpcFreeCont(pHandle->msg->pCont); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; } - return 0; } - -void recordPushedEntry(SArray* cachedKey, void* pIter) { - size_t kLen = 0; - void* key = taosHashGetKey(pIter, &kLen); - SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen}; - taosArrayPush(cachedKey, &item); -} - -void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); - int32_t numOfKeys = (int32_t)taosArrayGetSize(pCachedKeys); - - for (int32_t i = 0; i < numOfKeys; i++) { - SItem* pItem = taosArrayGet(pCachedKeys, i); - if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) { - tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*)pItem->pKey); - } - } - - if (numOfKeys > 0) { - tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr)); - } -} diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 017b479c1b8ca55aaace9c99bbf5420554b88370..d186c6387166b5fe7d31396e3cfab79f356a94ef 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -169,20 +169,20 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); - // lock - taosWLockLatch(&pTq->lock); - qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if(code != 0) { goto end; } - // till now, all data has been transferred to consumer, new data needs to push client once arrived. +// till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + // lock + taosWLockLatch(&pTq->lock); + code = tqRegisterPushEntry(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); + tDeleteSMqDataRsp(&dataRsp); return code; } @@ -197,7 +197,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tFormatOffset(buf, 80, &dataRsp.rspOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); - taosWUnLockLatch(&pTq->lock); +// taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); } return code; @@ -211,6 +211,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); + qTaskInfo_t task = pHandle->execHandle.task; + if(qTaskIsExecuting(task)){ + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + return code; + } if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2e6d452e95335bd6e3cefb230fe22c03947ce153..251efeab3d1cf0f3ebe08ca90bc3d3165cb13050 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -448,7 +448,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp walApplyVer(pVnode->pWal, version); if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { - /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ +// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } @@ -487,11 +487,16 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) { + if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } + if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) { + vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); + return 0; + } + SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { case TDMT_SCH_QUERY: @@ -499,6 +504,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); + case TDMT_VND_TMQ_CONSUME: + return tqProcessPollReq(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; @@ -508,17 +515,12 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || - pMsg->msgType == TDMT_VND_BATCH_META || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && + pMsg->msgType == TDMT_VND_BATCH_META) && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } - if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) { - vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); - return 0; - } - switch (pMsg->msgType) { case TDMT_SCH_FETCH: case TDMT_SCH_MERGE_FETCH: @@ -537,8 +539,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); - case TDMT_VND_TMQ_CONSUME: - return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index dc3ff3e6de14b632a2f4a76927dfbae823e1652a..6154e30938a1c5177017bdec00d525efa060be9d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -262,7 +262,7 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { SWalCont *pReadHead = &pReader->pHead->head; int64_t ver = pReadHead->version; - wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d", pReader->pWal->cfg.vgId, ver, + wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver, pReadHead->bodyLen); if (pReader->capacity < pReadHead->bodyLen) { diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index f2de219f4e740b090a87ebbb7e9ceb9cb1c6dcc2..d98a45f0d3c3873f16ddc0134c58eeea2e6555dd 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -232,7 +232,7 @@ void saveConfigToLogFile() { taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]); } taosFprintfFile(g_fp, "\n"); - taosFprintfFile(g_fp, " expect rows: %" PRIx64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt); + taosFprintfFile(g_fp, " expect rows: %" PRId64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt); } char tmpString[128];