diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 5fa39b6bb1145fed6d1316b1692dc901f0b5d698..f5f7218ab9f663b6142f7ce3aefb8f6b80d20509 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -90,6 +90,8 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 */ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); +//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code); + int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); // todo refactor diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index b488af9ba18d4910c32f3a080235d102fd11e514..b57ecfa845910cade2f8c51a841d80b30e95fd28 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -87,6 +87,7 @@ struct tmq_t { void* commitCbUserParam; // status + SRWLatch lock; int8_t status; int32_t epoch; #if 0 @@ -156,6 +157,7 @@ typedef struct { int8_t tmqRspType; int32_t epoch; // epoch can be used to guard the vgHandle int32_t vgId; + char topicName[TSDB_TOPIC_FNAME_LEN]; SMqClientVg* vgHandle; SMqClientTopic* topicHandle; uint64_t reqId; @@ -168,8 +170,8 @@ typedef struct { } SMqPollRspWrapper; typedef struct { - int64_t refId; - int32_t epoch; +// int64_t refId; +// int32_t epoch; tsem_t rspSem; int32_t rspErr; } SMqSubscribeCbParam; @@ -184,8 +186,9 @@ typedef struct { typedef struct { int64_t refId; int32_t epoch; - SMqClientVg* pVg; - SMqClientTopic* pTopic; + char topicName[TSDB_TOPIC_FNAME_LEN]; +// SMqClientVg* pVg; +// SMqClientTopic* pTopic; int32_t vgId; uint64_t requestId; // request id for debug purpose } SMqPollCbParam; @@ -709,8 +712,8 @@ static void generateTimedTask(int64_t refId, int32_t type) { *pTaskType = type; taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); + taosReleaseRef(tmqMgmt.rsetId, refId); } - taosReleaseRef(tmqMgmt.rsetId, refId); } void tmqAssignAskEpTask(void* param, void* tmrId) { @@ -1037,6 +1040,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->commitCb = conf->commitCb; pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->resetOffsetCfg = conf->resetOffset; + taosInitRWLatch(&pTmq->lock); pTmq->hbBgEnable = conf->hbBgEnable; @@ -1085,7 +1089,7 @@ _failed: } int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { - const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most + const int32_t MAX_RETRY_COUNT = 120 * 60; // let's wait for 2 mins at most const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); void* buf = NULL; @@ -1140,8 +1144,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SMqSubscribeCbParam param = { .rspErr = 0, - .refId = tmq->refId, - .epoch = tmq->epoch, +// .refId = tmq->refId, +// .epoch = tmq->epoch, }; if (tsem_init(¶m.rspSem, 0, 0) != 0) { @@ -1182,7 +1186,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) { if (retryCnt++ > MAX_RETRY_COUNT) { tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); - code = TSDB_CODE_TSC_INTERNAL_ERROR; + code = TSDB_CODE_MND_CONSUMER_NOT_READY; goto FAIL; } @@ -1217,12 +1221,40 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para conf->commitCbUserParam = param; } +static SMqClientVg* getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId){ + int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); + for(int i = 0; i < topicNumCur; i++){ + SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); + if(strcmp(pTopicCur->topicName, topicName) == 0){ + int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); + for (int32_t j = 0; j < vgNumCur; j++) { + SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); + if(pVgCur->vgId == vgId){ + return pVgCur; + } + } + } + } + return NULL; +} + +static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){ + int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); + for(int i = 0; i < topicNumCur; i++){ + SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); + if(strcmp(pTopicCur->topicName, topicName) == 0){ + return pTopicCur; + } + } + return NULL; +} + int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SMqPollCbParam* pParam = (SMqPollCbParam*)param; int64_t refId = pParam->refId; - SMqClientVg* pVg = pParam->pVg; - SMqClientTopic* pTopic = pParam->pTopic; +// SMqClientVg* pVg = pParam->pVg; +// SMqClientTopic* pTopic = pParam->pTopic; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { @@ -1237,15 +1269,13 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t vgId = pParam->vgId; uint64_t requestId = pParam->requestId; - taosMemoryFree(pParam); - if (code != 0) { if (pMsg->pData) taosMemoryFree(pMsg->pData); if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet); // in case of consumer mismatch, wait for 500ms and retry if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { - taosMsleep(500); +// taosMsleep(500); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER", tmq->consumerId); @@ -1259,8 +1289,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; taosWriteQitem(tmq->mqueue, pRspWrapper); - } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert - taosMsleep(500); +// } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert +// taosMsleep(5); } else{ tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId, vgId, epoch, tstrerror(code), requestId); @@ -1282,6 +1312,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); + taosMemoryFree(pParam); + return 0; } @@ -1303,11 +1335,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } pRspWrapper->tmqRspType = rspType; - pRspWrapper->vgHandle = pVg; - pRspWrapper->topicHandle = pTopic; +// pRspWrapper->vgHandle = pVg; +// pRspWrapper->topicHandle = pTopic; pRspWrapper->reqId = requestId; pRspWrapper->pEpset = pMsg->pEpSet; - pRspWrapper->vgId = pVg->vgId; + pRspWrapper->vgId = vgId; + strcpy(pRspWrapper->topicName, pParam->topicName); pMsg->pEpSet = NULL; if (rspType == TMQ_MSG_TYPE__POLL_RSP) { @@ -1346,16 +1379,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); + taosMemoryFree(pParam); return 0; CREATE_MSG_FAIL: if (epoch == tmq->epoch) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId); + if(pVg){ + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } + taosWUnLockLatch(&tmq->lock); } tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); + taosMemoryFree(pParam); return -1; } @@ -1472,11 +1512,13 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) taosHashCleanup(pVgOffsetHashMap); + taosWLockLatch(&tmq->lock); // destroy current buffered existed topics info if (tmq->clientTopics) { taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo); } tmq->clientTopics = newTopics; + taosWUnLockLatch(&tmq->lock); int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY; atomic_store_8(&tmq->status, flag); @@ -1492,7 +1534,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { if (tmq == NULL) { terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; - pParam->pUserFn(tmq, terrno, NULL, pParam->pParam); +// pParam->pUserFn(tmq, terrno, NULL, pParam->pParam); taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -1652,8 +1694,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p pParam->refId = pTmq->refId; pParam->epoch = pTmq->epoch; - pParam->pVg = pVg; // pVg may be released,fix it - pParam->pTopic = pTopic; +// pParam->pVg = pVg; // pVg may be released,fix it +// pParam->pTopic = pTopic; + strcpy(pParam->topicName, pTopic->topicName); pParam->vgId = pVg->vgId; pParam->requestId = req.reqId; @@ -1688,6 +1731,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p // broadcast the poll request to all related vnodes static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { + if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){ + return 0; + } int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); @@ -1779,8 +1825,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; if (pDataRsp->head.epoch == consumerEpoch) { - SMqClientVg* pVg = pollRspWrapper->vgHandle; - + SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + pollRspWrapper->vgHandle = pVg; + pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); + if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){ + tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, + pollRspWrapper->topicName, pollRspWrapper->vgId); + return NULL; + } // update the epset if (pollRspWrapper->pEpset != NULL) { SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset); @@ -1829,7 +1881,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { - SMqClientVg* pVg = pollRspWrapper->vgHandle; + SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + pollRspWrapper->vgHandle = pVg; + pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); + if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){ + tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, + pollRspWrapper->topicName, pollRspWrapper->vgId); + return NULL; + } if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; } @@ -1849,7 +1908,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { int32_t consumerEpoch = atomic_load_32(&tmq->epoch); if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { - SMqClientVg* pVg = pollRspWrapper->vgHandle; + SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + pollRspWrapper->vgHandle = pVg; + pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); + if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){ + tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, + pollRspWrapper->topicName, pollRspWrapper->vgId); + return NULL; + } if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index a318b9886ea699889b14b41169c521efc122c7a3..922a85c094a7713853854c1f7261b4974631f0ea 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -75,7 +75,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr(code)); + dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, tstrerror(code)); vmSendRsp(pMsg, code); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9730ad2877a248ad98a2648964203c419b3053a2..db03e97556b3bbe5a63ef7c2298ab4bdcc6ac6bc 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -297,13 +297,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t } if (offset.val.type == TMQ_OFFSET__LOG) { - taosWLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); if (pHandle && (walSetRefVer(pHandle->pRef, offset.val.version) < 0)) { - taosWUnLockLatch(&pTq->lock); return -1; } - taosWUnLockLatch(&pTq->lock); } return 0; @@ -337,6 +334,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq req = {0}; + int code = 0; if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); terrno = TSDB_CODE_INVALID_MSG; @@ -347,38 +345,45 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t reqEpoch = req.epoch; STqOffsetVal reqOffset = req.reqOffset; int32_t vgId = TD_VID(pTq->pVnode); + STqHandle* pHandle = NULL; - taosWLockLatch(&pTq->lock); - // 1. find handle - STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle == NULL) { - tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); - terrno = TSDB_CODE_INVALID_MSG; - taosWUnLockLatch(&pTq->lock); - return -1; - } + while (1) { + taosWLockLatch(&pTq->lock); + // 1. find handle + pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + if (pHandle == NULL) { + tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); + terrno = TSDB_CODE_INVALID_MSG; + taosWUnLockLatch(&pTq->lock); + return -1; + } - while (tqIsHandleExec(pHandle)) { - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey); - taosMsleep(5); - } + // 2. check re-balance status + if (pHandle->consumerId != consumerId) { + tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, + consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); + terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + taosWUnLockLatch(&pTq->lock); + return -1; + } - // 2. check re-balance status - if (pHandle->consumerId != consumerId) { - tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, - consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); - terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + bool exec = tqIsHandleExec(pHandle); + if(!exec) { + tqSetHandleExec(pHandle); +// qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + taosWUnLockLatch(&pTq->lock); + break; + } taosWUnLockLatch(&pTq->lock); - return -1; + + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + taosMsleep(10); } - tqSetHandleExec(pHandle); - taosWUnLockLatch(&pTq->lock); // 3. update the epoch value - int32_t savedEpoch = pHandle->epoch; - if (savedEpoch < reqEpoch) { - tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, - reqEpoch); + if (pHandle->epoch < reqEpoch) { + tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, reqEpoch); pHandle->epoch = reqEpoch; } @@ -387,8 +392,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); - int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); + code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); tqSetHandleIdle(pHandle); + + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); return code; } @@ -403,8 +410,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { while (tqIsHandleExec(pHandle)) { - tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); - taosMsleep(5); + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); + taosMsleep(10); } if (pHandle->pRef) { walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); @@ -471,7 +478,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); - taosWLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { @@ -554,37 +560,35 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); goto end; } else { - while (tqIsHandleExec(pHandle)) { - tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); - taosMsleep(5); - } + taosWLockLatch(&pTq->lock); if (pHandle->consumerId == req.newConsumerId) { // do nothing - tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); - atomic_add_fetch_32(&pHandle->epoch, 1); - + tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, req.newConsumerId); } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_store_32(&pHandle->epoch, 0); } +// atomic_add_fetch_32(&pHandle->epoch, 1); + // kill executing task - qTaskInfo_t pTaskInfo = pHandle->execHandle.task; - if (pTaskInfo != NULL) { - qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); - } - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - qStreamCloseTsdbReader(pTaskInfo); + if(tqIsHandleExec(pHandle)) { + qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + if (pTaskInfo != NULL) { + qKillTask(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); + } + +// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { +// qStreamCloseTsdbReader(pTaskInfo); +// } } // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); + taosWUnLockLatch(&pTq->lock); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); - goto end; } end: - taosWUnLockLatch(&pTq->lock); taosMemoryFree(req.qmsg); return ret; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 5654147b6df6d1518285e0c028e23db20997ff57..b3d21887c957073b644cb58e5a22174b5cd39e16 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -352,9 +352,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); } tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId); - taosWLockLatch(&pTq->lock); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); - taosWUnLockLatch(&pTq->lock); } end: diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 377a5d18875c77b38d4ca6036a96cabfafa0ea3f..0a9905b5449a987c926622f26b28ca3e8e7200a2 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -78,7 +78,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { // todo remove this if (offset.val.type == TMQ_OFFSET__LOG) { - taosWLockLatch(&pStore->pTq->lock); STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); if (pHandle) { if (walSetRefVer(pHandle->pRef, offset.val.version) < 0) { @@ -86,7 +85,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { // offset.val.version); } } - taosWUnLockLatch(&pStore->pTq->lock); } taosMemoryFree(pMemBuf); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 50d94c5ed55510cf2a4ac4108df089ad6468fca2..42c60e0007d31c90f9903b19039c51ef8254b319 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -73,8 +73,11 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { STqHandle *pHandle = (STqHandle*)handle; int32_t vgId = TD_VID(pTq->pVnode); + if(taosHashGetSize(pTq->pPushMgr) <= 0) { + return 0; + } int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); - tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); + tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); if(pHandle->msg != NULL) { tqPushDataRsp(pTq, pHandle); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 09574a6b620518f4ca49e74fe91eab21af5d19eb..a26a749af316732158fee6189ea7e9c3620b0c73 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1035,7 +1035,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t vgId = TD_VID(pTq->pVnode); // update the table list for each consumer handle - taosWLockLatch(&pTq->lock); while (1) { pIter = taosHashIterate(pTq->pHandle, pIter); if (pIter == NULL) { @@ -1092,7 +1091,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } } - taosWUnLockLatch(&pTq->lock); // update the table list handle for each stream scanner/wal reader taosWLockLatch(&pTq->pStreamMeta->lock); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index e4b2fa8821689a4bbaf169d00737b49a21dd7e0f..2f470e2221ba1a62fe369c42d00787b623a1f7f3 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -84,8 +84,10 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs qStreamSetOpen(task); tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); - if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) { - tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr()); + code = qExecTask(task, &pDataBlock, &ts); + if (code != TSDB_CODE_SUCCESS) { + tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code)); + terrno = code; return -1; } @@ -128,8 +130,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; tqDebug("tmqsnap task start to execute"); - if (qExecTask(task, &pDataBlock, &ts) < 0) { - tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr()); + int code = qExecTask(task, &pDataBlock, &ts); + if (code != 0) { + tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code)); + terrno = code; return -1; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c4a56d78aebbe0f6a89e77473ba6791ed61c0212..a73deffa528c71999e036370b3a3b021b861f024 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -180,6 +180,11 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { doSetTaskId(pTaskInfo->pRoot); } +//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { +// SExecTaskInfo* pTaskInfo = tinfo; +// pTaskInfo->code = code; +//} + int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f98419f768a8b4d29e8ee94d06630337cff37b9b..a001d99408a82d8e180d778409397658d0fbc8a0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1632,37 +1632,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qDebug("start to exec queue scan, %s", id); -#if 0 - if (pTaskInfo->streamInfo.submit.msgStr != NULL) { - if (pInfo->tqReader->msg.msgStr == NULL) { - SPackedData submit = pTaskInfo->streamInfo.submit; - if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { - qError("submit msg messed up when initing stream submit block %p", submit.msgStr); - return NULL; - } - } - - blockDataCleanup(pInfo->pRes); - SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - - while (tqNextBlockImpl(pInfo->tqReader)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); - if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { - continue; - } - - setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, true); - - if (pBlockInfo->rows > 0) { - return pInfo->pRes; - } - } - - pInfo->tqReader->msg = (SPackedData){0}; - pTaskInfo->streamInfo.submit = (SPackedData){0}; + if (isTaskKilled(pTaskInfo)) { return NULL; } -#endif if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);