提交 7616a283 编写于 作者: wmmhello's avatar wmmhello

fix:error in TD-23218 & remove useless logic

上级 5b75ee43
......@@ -216,7 +216,6 @@ typedef struct SSDataBlock {
enum {
FETCH_TYPE__DATA = 1,
FETCH_TYPE__META,
FETCH_TYPE__SEP,
FETCH_TYPE__NONE,
};
......
......@@ -196,12 +196,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
//
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo);
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo);
......
......@@ -146,8 +146,8 @@ typedef struct {
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
int8_t curInvalid;
int8_t curStopped;
// int8_t curInvalid;
// int8_t curStopped;
TdThreadMutex mutex;
SWalFilterCond cond;
// TODO remove it
......
......@@ -756,6 +756,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
#define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
......
......@@ -230,12 +230,7 @@ typedef struct SSnapContext {
} SSnapContext;
typedef struct STqReader {
// const SSubmitReq *pMsg;
// SSubmitBlk *pBlock;
// SSubmitMsgIter msgIter;
// SSubmitBlkIter blkIter;
int64_t ver;
// int64_t ver;
SPackedData msg2;
int8_t setMsg;
......@@ -264,8 +259,13 @@ int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
<<<<<<< Updated upstream
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char* id);
int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret);
=======
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
void tqNextBlock(STqReader *pReader, SFetchRet *ret);
>>>>>>> Stashed changes
int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
......
......@@ -211,6 +211,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
return 0;
}
<<<<<<< Updated upstream
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
int32_t len = 0;
int32_t code = 0;
......@@ -242,6 +243,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
.code = 0,
};
tmsgSendRsp(&rsp);
=======
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
>>>>>>> Stashed changes
char buf1[80] = {0};
char buf2[80] = {0};
......@@ -253,6 +260,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
return 0;
}
<<<<<<< Updated upstream
int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
int32_t len = 0;
int32_t code = 0;
......@@ -284,6 +292,10 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co
.code = 0,
};
tmsgSendRsp(&rsp);
=======
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
>>>>>>> Stashed changes
char buf1[80] = {0};
char buf2[80] = {0};
......@@ -435,6 +447,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
<<<<<<< Updated upstream
// update epoch if need
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
while (savedEpoch < reqEpoch) {
......@@ -552,16 +565,62 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
// for taosx
=======
static int32_t processSubColumn(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
int32_t vgId = TD_VID(pTq->pVnode);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
// lock
taosWLockLatch(&pTq->pushLock);
int code = tqScanData(pTq, pHandle, &dataRsp, offset);
if(code != 0) {
tDeleteSMqDataRsp(&dataRsp);
return TSDB_CODE_TMQ_CONSUMER_ERROR;
}
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
taosWUnLockLatch(&pTq->pushLock);
return code;
}
taosWUnLockLatch(&pTq->pushLock);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// NOTE: this pHandle->consumerId may have been changed already.
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
", ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
dataRsp.rspOffset.ts);
tDeleteSMqDataRsp(&dataRsp);
return code;
}
static int32_t processSubDbOrTable(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
int code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SWalCkHead* pCkHead = NULL;
>>>>>>> Stashed changes
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, &req);
<<<<<<< Updated upstream
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew) < 0) {
=======
if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
>>>>>>> Stashed changes
return -1;
}
if (metaRsp.metaRspLen > 0) {
<<<<<<< Updated upstream
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
code = -1;
}
......@@ -569,17 +628,29 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
",version:%" PRId64,
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.version);
=======
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.ts);
>>>>>>> Stashed changes
taosMemoryFree(metaRsp.metaRsp);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",version:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
taosxRsp.rspOffset.version);
if (taosxRsp.blockNum > 0) {
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
code = -1;
}
tDeleteSTaosxRsp(&taosxRsp);
return code;
<<<<<<< Updated upstream
} else {
fetchOffsetNew = taosxRsp.rspOffset;
}
......@@ -592,6 +663,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = fetchOffsetNew.version + 1;
=======
}
} else {
int64_t fetchVer = offset->version + 1;
>>>>>>> Stashed changes
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
tDeleteSTaosxRsp(&taosxRsp);
......@@ -601,28 +677,45 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
walSetReaderCapacity(pHandle->pWalReader, 2048);
while (1) {
<<<<<<< Updated upstream
savedEpoch = atomic_load_32(&pHandle->epoch);
if (savedEpoch > reqEpoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch);
=======
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
if (savedEpoch > pRequest->epoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
>>>>>>> Stashed changes
break;
}
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
<<<<<<< Updated upstream
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
code = -1;
}
=======
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
>>>>>>> Stashed changes
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
}
SWalCont* pHead = &pCkHead->head;
<<<<<<< Updated upstream
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
=======
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", pRequest->consumerId,
pRequest->epoch, vgId, fetchVer, pHead->msgType);
>>>>>>> Stashed changes
if (pHead->msgType == TDMT_VND_SUBMIT) {
SPackedData submit = {
......@@ -631,8 +724,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
<<<<<<< Updated upstream
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode),
req.subKey);
=======
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
pRequest->subKey);
>>>>>>> Stashed changes
return -1;
}
if (taosxRsp.blockNum > 0 /* threshold */) {
......@@ -648,8 +746,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
} else {
/*A(pHandle->fetchMeta);*/
/*A(IS_META_MSG(pHead->msgType));*/
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
metaRsp.resMsgType = pHead->msgType;
......@@ -674,6 +770,89 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
<<<<<<< Updated upstream
=======
static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
int32_t code = -1;
STqOffsetVal offset = {0};
STqOffsetVal reqOffset = pRequest->reqOffset;
// 1. reset the offset if needed
if (reqOffset.type > 0) {
offset = reqOffset;
} else { // handle the reset offset cases, according to the consumer's choice.
bool blockReturned = false;
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) {
return code;
}
// empty block returned, quit
if (blockReturned) {
return 0;
}
}
// this is a normal subscription requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
return processSubColumn(pTq, pHandle, pRequest, pMsg, &offset);
}
// todo handle the case where re-balance occurs.
// for taosx
return processSubDbOrTable(pTq, pHandle, pRequest, pMsg, &offset);
}
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq req = {0};
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int64_t consumerId = req.consumerId;
int32_t reqEpoch = req.epoch;
STqOffsetVal reqOffset = req.reqOffset;
int32_t vgId = TD_VID(pTq->pVnode);
// 1. find handle
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
// 2. check re-balance status
taosRLockLatch(&pTq->pushLock);
if (pHandle->consumerId != consumerId) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosRUnLockLatch(&pTq->pushLock);
return -1;
}
taosRUnLockLatch(&pTq->pushLock);
taosWLockLatch(&pTq->pushLock);
// 3. update the epoch value
int32_t savedEpoch = pHandle->epoch;
if (savedEpoch < reqEpoch) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
pHandle->epoch = reqEpoch;
}
taosWUnLockLatch(&pTq->pushLock);
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
return extractDataForMq(pTq, pHandle, &req, pMsg);
}
>>>>>>> Stashed changes
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
......
......@@ -65,18 +65,8 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
qTaskInfo_t task = pExec->task;
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return");
if (pOffset->type == TMQ_OFFSET__LOG) {
pRsp->rspOffset = *pOffset;
return 0;
} else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return");
pRsp->rspOffset = *pOffset;
return 0;
}
}
tqError("prepare scan failed, return");
return -1;
}
int32_t rowCnt = 0;
......@@ -103,20 +93,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
}
}
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) {
return -1;
}
if (pRsp->rspOffset.type == 0) {
tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts,
pRsp->rspOffset.uid, pRsp->rspOffset.version);
return -1;
}
if(pRsp->withTbName || pRsp->withSchema){
tqError("get column should not with meta:%d,%d", pRsp->withTbName, pRsp->withSchema);
return -1;
}
qStreamExtractOffset(task, &pRsp->rspOffset);
return 0;
}
......@@ -125,18 +102,8 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
qTaskInfo_t task = pExec->task;
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return");
if (pOffset->type == TMQ_OFFSET__LOG) {
pRsp->rspOffset = *pOffset;
return 0;
} else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return");
pRsp->rspOffset = *pOffset;
return 0;
}
}
tqDebug("tqScanTaosx prepare scan failed, return");
return -1;
}
int32_t rowCnt = 0;
......@@ -183,9 +150,6 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
}
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (qStreamExtractPrepareUid(task) != 0) {
continue;
}
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1);
break;
......@@ -198,28 +162,17 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts);
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
tqDebug("tmqsnap task exec change to get data");
continue;
}
*pMetaRsp = *tmp;
tqDebug("tmqsnap task exec exited, get meta");
tqDebug("task exec exited");
break;
}
qStreamExtractOffset(task, &pRsp->rspOffset);
if (pRsp->rspOffset.type == 0) {
tqError("expected rsp offset: type %d %" PRId64 " %" PRId64 " %" PRId64, pRsp->rspOffset.type, pRsp->rspOffset.ts,
pRsp->rspOffset.uid, pRsp->rspOffset.version);
return -1;
}
return 0;
}
......@@ -232,14 +185,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
STqReader* pReader = pExec->pExecReader;
/*tqReaderSetDataMsg(pReader, pReq, 0);*/
tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlock2(pReader)) {
/*SSDataBlock block = {0};*/
/*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL;
......
此差异已折叠。
......@@ -127,10 +127,10 @@ enum {
typedef struct {
// TODO remove prepareStatus
STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq
// STqOffsetVal prepareStatus; // for tmq
STqOffsetVal currentOffset; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int8_t returned;
// int8_t returned;
int64_t snapshotVer;
// const SSubmitReq* pReq;
......
......@@ -995,15 +995,9 @@ SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
return &pTaskInfo->streamInfo.metaRsp;
}
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) {
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.prepareStatus.uid;
}
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
return 0;
memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal));
}
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
......@@ -1052,21 +1046,19 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
pTaskInfo->streamInfo.prepareStatus = *pOffset;
pTaskInfo->streamInfo.returned = 0;
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
return 0;
}
pTaskInfo->streamInfo.currentOffset = *pOffset;
if (subType == TOPIC_SUB_TYPE__COLUMN) {
pOperator->status = OP_OPENED;
// TODO add more check
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if(pOperator->numOfDownstream != 1){
qError("pOperator->numOfDownstream != 1:%d", pOperator->numOfDownstream);
return -1;
return TSDB_CODE_TMQ_CONSUMER_ERROR;
}
pOperator = pOperator->pDownstream[0];
}
......
......@@ -2774,9 +2774,18 @@ void qStreamCloseTsdbReader(void* task) {
if (task == NULL) return;
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
SOperatorInfo* pOp = pTaskInfo->pRoot;
<<<<<<< Updated upstream
qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid,
pTaskInfo->streamInfo.lastStatus.ts);
pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0};
=======
qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.currentOffset.uid,
pTaskInfo->streamInfo.currentOffset.ts);
// todo refactor, other thread may already use this read to extract data.
pTaskInfo->streamInfo.currentOffset = (STqOffsetVal){0};
>>>>>>> Stashed changes
while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
......
......@@ -673,9 +673,9 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
// todo refactor
/*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
/*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
// pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
// pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
ASSERT(pBlock->info.id.uid != 0);
return pBlock;
......@@ -853,9 +853,11 @@ static void destroyTableScanOperatorInfo(void* param) {
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
int32_t code = 0;
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
......@@ -863,8 +865,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
int32_t code =
extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -1567,17 +1568,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg2.msgStr == NULL) {
/*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/
/*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
/*void* msgStr = pTaskInfo->streamInfo.*/
SPackedData submit = pTaskInfo->streamInfo.submit;
if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
pInfo->tqReader->msg2 = (SPackedData){0};
pInfo->tqReader->setMsg = 0;
ASSERT(0);
return NULL;
}
}
......@@ -1605,13 +1599,14 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
return NULL;
}
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion);
pTaskInfo->streamInfo.returned = 1;
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
return pResult;
<<<<<<< Updated upstream
} else {
if (!pTaskInfo->streamInfo.returned) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
......@@ -1626,45 +1621,35 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} else {
return NULL;
}
=======
}
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
return NULL;
>>>>>>> Stashed changes
}
}
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
while (1) {
SFetchRet ret = {0};
if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
// if the end is reached, terrno is 0
if (terrno != 0) {
qError("failed to get next log block since %s", terrstr());
}
}
tqNextBlock(pInfo->tqReader, &ret);
pTaskInfo->streamInfo.currentOffset = ret.offset;
if (ret.fetchType == FETCH_TYPE__DATA) {
blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo, &ret.data, true);
if (pInfo->pRes->info.rows > 0) {
pOperator->status = OP_EXEC_RECV;
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
return pInfo->pRes;
}
} else if (ret.fetchType == FETCH_TYPE__META) {
qError("unexpected ret.fetchType:%d", ret.fetchType);
continue;
// pTaskInfo->streamInfo.lastStatus = ret.offset;
// pTaskInfo->streamInfo.metaBlk = ret.meta;
// return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE ||
(ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
pTaskInfo->streamInfo.lastStatus = ret.offset;
char formatBuf[80];
tFormatOffset(formatBuf, 80, &ret.offset);
qDebug("queue scan log return null, offset %s", formatBuf);
pOperator->status = OP_OPENED;
return NULL;
}
}
} else {
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type);
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
return NULL;
}
}
......@@ -2094,7 +2079,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
qDebug("tmqsnap doRawScan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) {
tsdbReleaseDataBlock(pInfo->dataReader);
......@@ -2107,28 +2092,22 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
}
qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey);
return pBlock;
}
SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal
qDebug("tmqsnap read snapshot done, change to get data from wal");
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->sContext->snapVersion);
} else {
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, mtInfo.uid, INT64_MIN);
qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
}
tDeleteSSchemaWrapper(mtInfo.schema);
qDebug("tmqsnap stream scan tsdb return null");
return NULL;
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
} else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
SSnapContext* sContext = pInfo->sContext;
void* data = NULL;
int32_t dataLen = 0;
......@@ -2141,15 +2120,11 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
}
if (!sContext->queryMetaOrData) { // change to get data next poll request
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
pTaskInfo->streamInfo.lastStatus.uid = uid;
pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, 0, INT64_MIN);
pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.currentOffset;
} else {
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
pTaskInfo->streamInfo.lastStatus.uid = uid;
pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.currentOffset;
pTaskInfo->streamInfo.metaRsp.resMsgType = type;
pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
pTaskInfo->streamInfo.metaRsp.metaRsp = data;
......@@ -2351,7 +2326,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->base.dataReader = NULL;
pTaskInfo->streamInfo.lastStatus.uid = -1;
}
if (pHandle->initTqReader) {
......
......@@ -33,7 +33,6 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
pReader->pLogFile = NULL;
pReader->curVersion = -1;
pReader->curFileFirstVer = -1;
pReader->curInvalid = 1;
pReader->capacity = 0;
if (cond) {
pReader->cond = *cond;
......@@ -81,7 +80,6 @@ int32_t walNextValidMsg(SWalReader *pReader) {
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", applied index:%" PRId64 ", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
pReader->curStopped = 0;
while (fetchVer <= endVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) {
return -1;
......@@ -99,7 +97,6 @@ int32_t walNextValidMsg(SWalReader *pReader) {
fetchVer = pReader->curVersion;
}
}
pReader->curStopped = 1;
return -1;
}
......@@ -196,17 +193,16 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
return -1;
}
wDebug("vgId:%d, wal version reset from %" PRId64 "(invalid:%d) to %" PRId64, pReader->pWal->cfg.vgId,
pReader->curVersion, pReader->curInvalid, ver);
wDebug("vgId:%d, wal version reset from %" PRId64 " to %" PRId64, pReader->pWal->cfg.vgId,
pReader->curVersion, ver);
pReader->curVersion = ver;
pReader->curInvalid = 0;
return 0;
}
int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
SWal *pWal = pReader->pWal;
if (!pReader->curInvalid && ver == pReader->curVersion) {
if (ver == pReader->curVersion) {
wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
return 0;
}
......@@ -238,7 +234,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) {
// pRead->curVersion = fetchVer;
// pRead->curInvalid = 1;
......@@ -344,7 +340,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return -1;
}
if (pRead->curInvalid || pRead->curVersion != ver) {
if (pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver);
if (code < 0) {
// pRead->curVersion = ver;
......@@ -479,7 +475,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
taosThreadMutexLock(&pReader->mutex);
if (pReader->curInvalid || pReader->curVersion != ver) {
if (pReader->curVersion != ver) {
if (walReadSeekVer(pReader, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
taosThreadMutexUnlock(&pReader->mutex);
......@@ -575,7 +571,6 @@ void walReadReset(SWalReader *pReader) {
taosThreadMutexLock(&pReader->mutex);
taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pReader->pLogFile);
pReader->curInvalid = 1;
pReader->curFileFirstVer = -1;
pReader->curVersion = -1;
taosThreadMutexUnlock(&pReader->mutex);
......
......@@ -627,6 +627,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册