提交 691cb08e 编写于 作者: H Haojun Liao

refactor: remove some unused attributes.

上级 09e84884
......@@ -79,16 +79,15 @@ typedef struct {
} STqExecDb;
typedef struct {
int8_t subType;
STqReader* pExecReader;
qTaskInfo_t task;
int8_t subType;
STqReader* pExecReader;
qTaskInfo_t task;
union {
STqExecCol execCol;
STqExecTb execTb;
STqExecDb execDb;
};
int32_t numOfCols; // number of out pout column, temporarily used
bool stop; // denote if needs to be stopped or not
int32_t numOfCols; // number of out pout column, temporarily used
} STqExecHandle;
typedef struct {
......@@ -101,7 +100,6 @@ typedef struct {
SWalRef* pRef;
STqPushHandle pushHandle; // push
STqExecHandle execHandle; // exec
int8_t execStatus; // this handle is used to handle the poll requirement
} STqHandle;
typedef struct {
......
......@@ -534,17 +534,19 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
int32_t code = -1;
STqOffsetVal reqOffset = pRequest->reqOffset;
STqOffsetVal fetchOffsetNew;
STqOffsetVal offset = {0};
SWalCkHead* pCkHead = NULL;
int32_t vgId = TD_VID(pTq->pVnode);
STqOffsetVal reqOffset = pRequest->reqOffset;
uint64_t consumerId = pRequest->consumerId;
// 1. reset the offset if needed
if (reqOffset.type > 0) {
fetchOffsetNew = reqOffset;
offset = reqOffset;
} else { // handle the reset offset cases, according to the consumer's choice.
bool blockReturned = false;
code = extractResetOffsetVal(&fetchOffsetNew, pTq, pHandle, pRequest, pMsg, &blockReturned);
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) {
return code;
}
......@@ -562,7 +564,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
// lock
taosWLockLatch(&pTq->pushLock);
code = tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
code = tqScanData(pTq, pHandle, &dataRsp, &offset);
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
......@@ -574,12 +576,12 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
taosWUnLockLatch(&pTq->pushLock);
code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp);
pHandle->execHandle.stop = false;
// NOTE: this pHandle->consumerId may have been changed already.
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
", ts:%" PRId64,
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
dataRsp.rspOffset.ts);
tDeleteSMqDataRsp(&dataRsp);
return code;
......@@ -591,16 +593,16 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew) < 0) {
if (offset.type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) {
return -1;
}
if (metaRsp.metaRspLen > 0) {
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
",version:%" PRId64,
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
",version:%" PRId64,
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.version);
taosMemoryFree(metaRsp.metaRsp);
tDeleteSTaosxRsp(&taosxRsp);
......@@ -612,17 +614,17 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
tDeleteSTaosxRsp(&taosxRsp);
return code;
} else {
fetchOffsetNew = taosxRsp.rspOffset;
offset = taosxRsp.rspOffset;
}
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",version:%" PRId64,
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
taosxRsp.rspOffset.version);
}
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = fetchOffsetNew.version + 1;
if (offset.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = offset.version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
tDeleteSTaosxRsp(&taosxRsp);
......@@ -637,8 +639,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
if (savedEpoch > pRequest->epoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
consumerId, pRequest->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch,
pRequest->epoch);
consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
break;
}
......@@ -655,7 +656,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
SWalCont* pHead = &pCkHead->head;
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
pRequest->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
pRequest->epoch, vgId, fetchVer, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) {
SPackedData submit = {
......@@ -664,7 +665,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode),
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
pRequest->subKey);
return -1;
}
......@@ -728,45 +729,35 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
int8_t oldVal = atomic_val_compare_exchange_8(&pHandle->execStatus, 0, 1);
// other thread has started to re-balance this handle, return empty block for this poll
if (oldVal != 0) {
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
int32_t code = tqSendDataRsp(pTq, pMsg, &req, &dataRsp); // here we return an empty block to client
tDeleteSMqDataRsp(&dataRsp);
return code;
}
// pHandle->execStatus == 1, and we are safe now
// keep the epoch in the first plance, this value may be change by other threads.
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
// 2. check rebalance status
// 2. check re-balance status
taosRLockLatch(&pTq->pushLock);
if (pHandle->consumerId != consumerId) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
atomic_store_8(&pHandle->execStatus, 0); // reset the flag
taosRUnLockLatch(&pTq->pushLock);
return -1;
}
taosRUnLockLatch(&pTq->pushLock);
taosWLockLatch(&pTq->pushLock);
int32_t savedEpoch = pHandle->epoch;
// 3. update the epoch value
while (savedEpoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
}
// while (savedEpoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
pHandle->epoch = reqEpoch;
// savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
// }
taosWUnLockLatch(&pTq->pushLock);
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId,
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
int32_t code = extractDataForMq(pTq, pHandle, &req, pMsg);
req.epoch, pHandle->subKey, vgId, buf);
atomic_store_8(&pHandle->execStatus, 0); // restore the flag
return code;
return extractDataForMq(pTq, pHandle, &req, pMsg);
}
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
......@@ -953,7 +944,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
atomic_add_fetch_32(&pHandle->epoch, 1);
taosMemoryFree(req.qmsg);
pHandle->execHandle.stop = true;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pHandle->execHandle.task);
}
......@@ -962,8 +952,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
return -1;
}
pHandle->execHandle.stop = false;
}
return 0;
......
......@@ -108,12 +108,6 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
break;
}
}
if (pExec->stop) {
tqDebug("vgId:%d, current vgroups has been transferred to other consumer, return results asap",
TD_VID(pTq->pVnode));
break;
}
}
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册