未验证 提交 3f824e20 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #21355 from taosdata/mark/tmq

fix:error in pHandle lock
...@@ -90,6 +90,8 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 ...@@ -90,6 +90,8 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
*/ */
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
// todo refactor // todo refactor
......
...@@ -87,6 +87,7 @@ struct tmq_t { ...@@ -87,6 +87,7 @@ struct tmq_t {
void* commitCbUserParam; void* commitCbUserParam;
// status // status
SRWLatch lock;
int8_t status; int8_t status;
int32_t epoch; int32_t epoch;
#if 0 #if 0
...@@ -156,6 +157,7 @@ typedef struct { ...@@ -156,6 +157,7 @@ typedef struct {
int8_t tmqRspType; int8_t tmqRspType;
int32_t epoch; // epoch can be used to guard the vgHandle int32_t epoch; // epoch can be used to guard the vgHandle
int32_t vgId; int32_t vgId;
char topicName[TSDB_TOPIC_FNAME_LEN];
SMqClientVg* vgHandle; SMqClientVg* vgHandle;
SMqClientTopic* topicHandle; SMqClientTopic* topicHandle;
uint64_t reqId; uint64_t reqId;
...@@ -168,8 +170,8 @@ typedef struct { ...@@ -168,8 +170,8 @@ typedef struct {
} SMqPollRspWrapper; } SMqPollRspWrapper;
typedef struct { typedef struct {
int64_t refId; // int64_t refId;
int32_t epoch; // int32_t epoch;
tsem_t rspSem; tsem_t rspSem;
int32_t rspErr; int32_t rspErr;
} SMqSubscribeCbParam; } SMqSubscribeCbParam;
...@@ -184,8 +186,9 @@ typedef struct { ...@@ -184,8 +186,9 @@ typedef struct {
typedef struct { typedef struct {
int64_t refId; int64_t refId;
int32_t epoch; int32_t epoch;
SMqClientVg* pVg; char topicName[TSDB_TOPIC_FNAME_LEN];
SMqClientTopic* pTopic; // SMqClientVg* pVg;
// SMqClientTopic* pTopic;
int32_t vgId; int32_t vgId;
uint64_t requestId; // request id for debug purpose uint64_t requestId; // request id for debug purpose
} SMqPollCbParam; } SMqPollCbParam;
...@@ -709,8 +712,8 @@ static void generateTimedTask(int64_t refId, int32_t type) { ...@@ -709,8 +712,8 @@ static void generateTimedTask(int64_t refId, int32_t type) {
*pTaskType = type; *pTaskType = type;
taosWriteQitem(tmq->delayedTask, pTaskType); taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
} }
taosReleaseRef(tmqMgmt.rsetId, refId);
} }
void tmqAssignAskEpTask(void* param, void* tmrId) { void tmqAssignAskEpTask(void* param, void* tmrId) {
...@@ -1037,6 +1040,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -1037,6 +1040,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->commitCb = conf->commitCb; pTmq->commitCb = conf->commitCb;
pTmq->commitCbUserParam = conf->commitCbUserParam; pTmq->commitCbUserParam = conf->commitCbUserParam;
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
taosInitRWLatch(&pTmq->lock);
pTmq->hbBgEnable = conf->hbBgEnable; pTmq->hbBgEnable = conf->hbBgEnable;
...@@ -1085,7 +1089,7 @@ _failed: ...@@ -1085,7 +1089,7 @@ _failed:
} }
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most const int32_t MAX_RETRY_COUNT = 120 * 60; // let's wait for 2 mins at most
const SArray* container = &topic_list->container; const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container); int32_t sz = taosArrayGetSize(container);
void* buf = NULL; void* buf = NULL;
...@@ -1140,8 +1144,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1140,8 +1144,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SMqSubscribeCbParam param = { SMqSubscribeCbParam param = {
.rspErr = 0, .rspErr = 0,
.refId = tmq->refId, // .refId = tmq->refId,
.epoch = tmq->epoch, // .epoch = tmq->epoch,
}; };
if (tsem_init(&param.rspSem, 0, 0) != 0) { if (tsem_init(&param.rspSem, 0, 0) != 0) {
...@@ -1182,7 +1186,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1182,7 +1186,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) { while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
if (retryCnt++ > MAX_RETRY_COUNT) { if (retryCnt++ > MAX_RETRY_COUNT) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
code = TSDB_CODE_TSC_INTERNAL_ERROR; code = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto FAIL; goto FAIL;
} }
...@@ -1217,12 +1221,40 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para ...@@ -1217,12 +1221,40 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para
conf->commitCbUserParam = param; conf->commitCbUserParam = param;
} }
static SMqClientVg* getVgInfo(tmq_t* tmq, char* topicName, int32_t vgId){
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for(int i = 0; i < topicNumCur; i++){
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
if(strcmp(pTopicCur->topicName, topicName) == 0){
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
for (int32_t j = 0; j < vgNumCur; j++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
if(pVgCur->vgId == vgId){
return pVgCur;
}
}
}
}
return NULL;
}
static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for(int i = 0; i < topicNumCur; i++){
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
if(strcmp(pTopicCur->topicName, topicName) == 0){
return pTopicCur;
}
}
return NULL;
}
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqPollCbParam* pParam = (SMqPollCbParam*)param;
int64_t refId = pParam->refId; int64_t refId = pParam->refId;
SMqClientVg* pVg = pParam->pVg; // SMqClientVg* pVg = pParam->pVg;
SMqClientTopic* pTopic = pParam->pTopic; // SMqClientTopic* pTopic = pParam->pTopic;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) { if (tmq == NULL) {
...@@ -1237,15 +1269,13 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1237,15 +1269,13 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
int32_t vgId = pParam->vgId; int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId; uint64_t requestId = pParam->requestId;
taosMemoryFree(pParam);
if (code != 0) { if (code != 0) {
if (pMsg->pData) taosMemoryFree(pMsg->pData); if (pMsg->pData) taosMemoryFree(pMsg->pData);
if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet); if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);
// in case of consumer mismatch, wait for 500ms and retry // in case of consumer mismatch, wait for 500ms and retry
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
taosMsleep(500); // taosMsleep(500);
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER", tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER",
tmq->consumerId); tmq->consumerId);
...@@ -1259,8 +1289,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1259,8 +1289,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
taosWriteQitem(tmq->mqueue, pRspWrapper); taosWriteQitem(tmq->mqueue, pRspWrapper);
} else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert // } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert
taosMsleep(500); // taosMsleep(5);
} else{ } else{
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId, tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
vgId, epoch, tstrerror(code), requestId); vgId, epoch, tstrerror(code), requestId);
...@@ -1282,6 +1312,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1282,6 +1312,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam);
return 0; return 0;
} }
...@@ -1303,11 +1335,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1303,11 +1335,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
pRspWrapper->tmqRspType = rspType; pRspWrapper->tmqRspType = rspType;
pRspWrapper->vgHandle = pVg; // pRspWrapper->vgHandle = pVg;
pRspWrapper->topicHandle = pTopic; // pRspWrapper->topicHandle = pTopic;
pRspWrapper->reqId = requestId; pRspWrapper->reqId = requestId;
pRspWrapper->pEpset = pMsg->pEpSet; pRspWrapper->pEpset = pMsg->pEpSet;
pRspWrapper->vgId = pVg->vgId; pRspWrapper->vgId = vgId;
strcpy(pRspWrapper->topicName, pParam->topicName);
pMsg->pEpSet = NULL; pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_RSP) { if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
...@@ -1346,16 +1379,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1346,16 +1379,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
taosMemoryFree(pParam);
return 0; return 0;
CREATE_MSG_FAIL: CREATE_MSG_FAIL:
if (epoch == tmq->epoch) { if (epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
if(pVg){
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
taosWUnLockLatch(&tmq->lock);
} }
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
taosMemoryFree(pParam);
return -1; return -1;
} }
...@@ -1472,11 +1512,13 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) ...@@ -1472,11 +1512,13 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
taosHashCleanup(pVgOffsetHashMap); taosHashCleanup(pVgOffsetHashMap);
taosWLockLatch(&tmq->lock);
// destroy current buffered existed topics info // destroy current buffered existed topics info
if (tmq->clientTopics) { if (tmq->clientTopics) {
taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo); taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
} }
tmq->clientTopics = newTopics; tmq->clientTopics = newTopics;
taosWUnLockLatch(&tmq->lock);
int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY; int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
atomic_store_8(&tmq->status, flag); atomic_store_8(&tmq->status, flag);
...@@ -1492,7 +1534,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1492,7 +1534,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
if (tmq == NULL) { if (tmq == NULL) {
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
pParam->pUserFn(tmq, terrno, NULL, pParam->pParam); // pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
...@@ -1652,8 +1694,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p ...@@ -1652,8 +1694,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
pParam->refId = pTmq->refId; pParam->refId = pTmq->refId;
pParam->epoch = pTmq->epoch; pParam->epoch = pTmq->epoch;
pParam->pVg = pVg; // pVg may be released,fix it // pParam->pVg = pVg; // pVg may be released,fix it
pParam->pTopic = pTopic; // pParam->pTopic = pTopic;
strcpy(pParam->topicName, pTopic->topicName);
pParam->vgId = pVg->vgId; pParam->vgId = pVg->vgId;
pParam->requestId = req.reqId; pParam->requestId = req.reqId;
...@@ -1688,6 +1731,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p ...@@ -1688,6 +1731,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
// broadcast the poll request to all related vnodes // broadcast the poll request to all related vnodes
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){
return 0;
}
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
...@@ -1779,8 +1825,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1779,8 +1825,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
if (pDataRsp->head.epoch == consumerEpoch) { if (pDataRsp->head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pollRspWrapper->vgHandle = pVg;
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
return NULL;
}
// update the epset // update the epset
if (pollRspWrapper->pEpset != NULL) { if (pollRspWrapper->pEpset != NULL) {
SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset); SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
...@@ -1829,7 +1881,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1829,7 +1881,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pollRspWrapper->vgHandle = pVg;
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
return NULL;
}
if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate
pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
} }
...@@ -1849,7 +1908,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1849,7 +1908,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pollRspWrapper->vgHandle = pVg;
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
return NULL;
}
if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate
pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
} }
......
...@@ -75,7 +75,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -75,7 +75,7 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg);
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, terrstr(code)); dGError("vgId:%d, msg:%p failed to query since %s", pVnode->vgId, pMsg, tstrerror(code));
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
......
...@@ -297,13 +297,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t ...@@ -297,13 +297,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
} }
if (offset.val.type == TMQ_OFFSET__LOG) { if (offset.val.type == TMQ_OFFSET__LOG) {
taosWLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle && (walSetRefVer(pHandle->pRef, offset.val.version) < 0)) { if (pHandle && (walSetRefVer(pHandle->pRef, offset.val.version) < 0)) {
taosWUnLockLatch(&pTq->lock);
return -1; return -1;
} }
taosWUnLockLatch(&pTq->lock);
} }
return 0; return 0;
...@@ -337,6 +334,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { ...@@ -337,6 +334,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq req = {0}; SMqPollReq req = {0};
int code = 0;
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
...@@ -347,38 +345,45 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -347,38 +345,45 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t reqEpoch = req.epoch; int32_t reqEpoch = req.epoch;
STqOffsetVal reqOffset = req.reqOffset; STqOffsetVal reqOffset = req.reqOffset;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
STqHandle* pHandle = NULL;
taosWLockLatch(&pTq->lock); while (1) {
// 1. find handle taosWLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); // 1. find handle
if (pHandle == NULL) { pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); if (pHandle == NULL) {
terrno = TSDB_CODE_INVALID_MSG; tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
taosWUnLockLatch(&pTq->lock); terrno = TSDB_CODE_INVALID_MSG;
return -1; taosWUnLockLatch(&pTq->lock);
} return -1;
}
while (tqIsHandleExec(pHandle)) { // 2. check re-balance status
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey); if (pHandle->consumerId != consumerId) {
taosMsleep(5); tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
} consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosWUnLockLatch(&pTq->lock);
return -1;
}
// 2. check re-balance status bool exec = tqIsHandleExec(pHandle);
if (pHandle->consumerId != consumerId) { if(!exec) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, tqSetHandleExec(pHandle);
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); // qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
taosWUnLockLatch(&pTq->lock);
break;
}
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
return -1;
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
taosMsleep(10);
} }
tqSetHandleExec(pHandle);
taosWUnLockLatch(&pTq->lock);
// 3. update the epoch value // 3. update the epoch value
int32_t savedEpoch = pHandle->epoch; if (pHandle->epoch < reqEpoch) {
if (savedEpoch < reqEpoch) { tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, reqEpoch);
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
reqEpoch);
pHandle->epoch = reqEpoch; pHandle->epoch = reqEpoch;
} }
...@@ -387,8 +392,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -387,8 +392,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
int code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
tqSetHandleIdle(pHandle); tqSetHandleIdle(pHandle);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle);
return code; return code;
} }
...@@ -403,8 +410,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -403,8 +410,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (pHandle) { if (pHandle) {
while (tqIsHandleExec(pHandle)) { while (tqIsHandleExec(pHandle)) {
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle);
taosMsleep(5); taosMsleep(10);
} }
if (pHandle->pRef) { if (pHandle->pRef) {
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
...@@ -471,7 +478,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -471,7 +478,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey, tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId); req.oldConsumerId, req.newConsumerId);
taosWLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
if (req.oldConsumerId != -1) { if (req.oldConsumerId != -1) {
...@@ -554,37 +560,35 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -554,37 +560,35 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
goto end; goto end;
} else { } else {
while (tqIsHandleExec(pHandle)) { taosWLockLatch(&pTq->lock);
tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
if (pHandle->consumerId == req.newConsumerId) { // do nothing if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
} else { } else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId); req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
} }
// atomic_add_fetch_32(&pHandle->epoch, 1);
// kill executing task // kill executing task
qTaskInfo_t pTaskInfo = pHandle->execHandle.task; if(tqIsHandleExec(pHandle)) {
if (pTaskInfo != NULL) { qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); if (pTaskInfo != NULL) {
} qKillTask(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { }
qStreamCloseTsdbReader(pTaskInfo);
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
} }
// remove if it has been register in the push manager, and return one empty block to consumer // remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle); tqUnregisterPushHandle(pTq, pHandle);
taosWUnLockLatch(&pTq->lock);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
goto end;
} }
end: end:
taosWUnLockLatch(&pTq->lock);
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
return ret; return ret;
} }
......
...@@ -352,9 +352,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -352,9 +352,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
} }
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId); tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
taosWLockLatch(&pTq->lock);
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
taosWUnLockLatch(&pTq->lock);
} }
end: end:
......
...@@ -78,7 +78,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { ...@@ -78,7 +78,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
// todo remove this // todo remove this
if (offset.val.type == TMQ_OFFSET__LOG) { if (offset.val.type == TMQ_OFFSET__LOG) {
taosWLockLatch(&pStore->pTq->lock);
STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) { if (pHandle) {
if (walSetRefVer(pHandle->pRef, offset.val.version) < 0) { if (walSetRefVer(pHandle->pRef, offset.val.version) < 0) {
...@@ -86,7 +85,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { ...@@ -86,7 +85,6 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
// offset.val.version); // offset.val.version);
} }
} }
taosWUnLockLatch(&pStore->pTq->lock);
} }
taosMemoryFree(pMemBuf); taosMemoryFree(pMemBuf);
......
...@@ -73,8 +73,11 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { ...@@ -73,8 +73,11 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
STqHandle *pHandle = (STqHandle*)handle; STqHandle *pHandle = (STqHandle*)handle;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
if(taosHashGetSize(pTq->pPushMgr) <= 0) {
return 0;
}
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
if(pHandle->msg != NULL) { if(pHandle->msg != NULL) {
tqPushDataRsp(pTq, pHandle); tqPushDataRsp(pTq, pHandle);
......
...@@ -1035,7 +1035,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -1035,7 +1035,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
// update the table list for each consumer handle // update the table list for each consumer handle
taosWLockLatch(&pTq->lock);
while (1) { while (1) {
pIter = taosHashIterate(pTq->pHandle, pIter); pIter = taosHashIterate(pTq->pHandle, pIter);
if (pIter == NULL) { if (pIter == NULL) {
...@@ -1092,7 +1091,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -1092,7 +1091,6 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
} }
} }
} }
taosWUnLockLatch(&pTq->lock);
// update the table list handle for each stream scanner/wal reader // update the table list handle for each stream scanner/wal reader
taosWLockLatch(&pTq->pStreamMeta->lock); taosWLockLatch(&pTq->pStreamMeta->lock);
......
...@@ -84,8 +84,10 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs ...@@ -84,8 +84,10 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
qStreamSetOpen(task); qStreamSetOpen(task);
tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId); tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) { code = qExecTask(task, &pDataBlock, &ts);
tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr()); if (code != TSDB_CODE_SUCCESS) {
tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
terrno = code;
return -1; return -1;
} }
...@@ -128,8 +130,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta ...@@ -128,8 +130,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
tqDebug("tmqsnap task start to execute"); tqDebug("tmqsnap task start to execute");
if (qExecTask(task, &pDataBlock, &ts) < 0) { int code = qExecTask(task, &pDataBlock, &ts);
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr()); if (code != 0) {
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code));
terrno = code;
return -1; return -1;
} }
......
...@@ -180,6 +180,11 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { ...@@ -180,6 +180,11 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
doSetTaskId(pTaskInfo->pRoot); doSetTaskId(pTaskInfo->pRoot);
} }
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
// SExecTaskInfo* pTaskInfo = tinfo;
// pTaskInfo->code = code;
//}
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
......
...@@ -1632,37 +1632,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1632,37 +1632,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("start to exec queue scan, %s", id); qDebug("start to exec queue scan, %s", id);
#if 0 if (isTaskKilled(pTaskInfo)) {
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg.msgStr == NULL) {
SPackedData submit = pTaskInfo->streamInfo.submit;
if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
return NULL;
}
}
blockDataCleanup(pInfo->pRes);
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextBlockImpl(pInfo->tqReader)) {
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
continue;
}
setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, true);
if (pBlockInfo->rows > 0) {
return pInfo->pRes;
}
}
pInfo->tqReader->msg = (SPackedData){0};
pTaskInfo->streamInfo.submit = (SPackedData){0};
return NULL; return NULL;
} }
#endif
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册