提交 d54ae0c8 编写于 作者: H Haojun Liao

fix(tmq): free reader after check the reader status.

上级 72ddb045
......@@ -79,8 +79,7 @@ typedef struct {
} STqExecDb;
typedef struct {
int8_t subType;
int8_t subType;
STqReader* pExecReader;
qTaskInfo_t task;
union {
......@@ -89,27 +88,19 @@ typedef struct {
STqExecDb execDb;
};
int32_t numOfCols; // number of out pout column, temporarily used
bool stop; // denote if needs to be stopped or not
} STqExecHandle;
typedef struct {
// info
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId;
int32_t epoch;
int8_t fetchMeta;
int64_t snapshotVer;
SWalReader* pWalReader;
SWalRef* pRef;
// push
STqPushHandle pushHandle;
// exec
STqExecHandle execHandle;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId;
int32_t epoch;
int8_t fetchMeta;
int64_t snapshotVer;
SWalReader* pWalReader;
SWalRef* pRef;
STqPushHandle pushHandle; // push
STqExecHandle execHandle; // exec
} STqHandle;
typedef struct {
......
......@@ -569,16 +569,19 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
// lock
taosWLockLatch(&pTq->pushLock);
if (tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
return -1;
}
// todo handle the case where re-balance occurs.
// till now, all data has been rsp to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
dataRsp.reqOffset.version == dataRsp.rspOffset.version && (pHandle->execHandle.stop != false)) {
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
if (pPushEntry != NULL) {
pPushEntry->pInfo = pMsg->info;
memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
......@@ -591,17 +594,21 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr",
consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode));
// unlock
taosWUnLockLatch(&pTq->pushLock);
return 0;
}
}
taosWUnLockLatch(&pTq->pushLock);
taosWUnLockLatch(&pTq->pushLock);
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
code = -1;
}
pHandle->execHandle.stop = false;
//NOTE: this pHandle->consumerId may have been changed already.
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
......@@ -610,6 +617,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return code;
}
// todo handle the case where re-balance occurs.
// for taosx
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
......@@ -894,11 +902,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
} else {
// TODO handle qmsg and exec modification
tqInfo("update the consumer info, old consumer id:0x%"PRIx64", new Id:0x%"PRIx64, pHandle->consumerId, req.newConsumerId);
tqInfo("vgId:%d switch consumer from Id:0x%"PRIx64" to Id:0x%"PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, -1);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
taosMemoryFree(req.qmsg);
pHandle->execHandle.stop = true;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pHandle->execHandle.task);
}
......@@ -906,7 +917,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
return -1;
}
// close handle
pHandle->execHandle.stop = false;
}
return 0;
......
......@@ -46,11 +46,13 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success
if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
metaReaderClear(&mr);
return -1;
}
for (int32_t i = 0; i < n; i++) {
char* tbName = taosStrdup(mr.me.name);
taosArrayPush(pRsp->blockTbName, &tbName);
......@@ -83,13 +85,16 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId);
if (qExecTask(task, &pDataBlock, &ts) < 0) {
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
return -1;
}
tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
// current scan should be stopped asap, since the rebalance occurs.
if (pDataBlock == NULL) {
break;
}
......@@ -99,7 +104,15 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
rowCnt += pDataBlock->info.rows;
if (rowCnt >= 4096) break;
if (rowCnt >= 4096) {
break;
}
}
if (pExec->stop) {
tqDebug("vgId:%d, current vgroups has been transferred to other consumer, return results asap",
TD_VID(pTq->pVnode));
break;
}
}
......
......@@ -709,7 +709,6 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
......@@ -717,7 +716,6 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo, rspCode);
qStopTaskOperators(pTaskInfo);
return TSDB_CODE_SUCCESS;
......
......@@ -2771,11 +2771,17 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
}
void qStreamCloseTsdbReader(void* task) {
if (task == NULL) return;
if (task == NULL) {
return;
}
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
SOperatorInfo* pOp = pTaskInfo->pRoot;
qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid,
qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.lastStatus.uid,
pTaskInfo->streamInfo.lastStatus.ts);
// todo refactor, other thread may already use this read to extract data.
pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0};
while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
......@@ -2783,8 +2789,19 @@ void qStreamCloseTsdbReader(void* task) {
SStreamScanInfo* pInfo = pDownstreamOp->info;
if (pInfo->pTableScanOp) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
setOperatorCompleted(pInfo->pTableScanOp);
while(pTaskInfo->owner != 0) {
taosMsleep(100);
qDebug("wait for the reader stopping");
}
tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
// restore the status, todo refactor.
pInfo->pTableScanOp->status = OP_OPENED;
pTaskInfo->status = TASK_NOT_COMPLETED;
return;
}
}
......
......@@ -751,7 +751,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
while (1) {
SSDataBlock* result = doGroupedTableScan(pOperator);
if (result) {
if (result || (pOperator->status == OP_EXEC_DONE)) {
return result;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册