未验证 提交 85dc25e5 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20160 from taosdata/fix/TD-22671

fix:optimize version logic in tmq and remove useless code
......@@ -2684,7 +2684,7 @@ typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t subType;
int8_t withMeta;
char* qmsg;
char* qmsg; //SubPlanToString
int64_t suid;
} SMqRebVgReq;
......
......@@ -542,7 +542,7 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer
typedef struct {
int32_t vgId;
char* qmsg;
char* qmsg; //SubPlanToString
SEpSet epSet;
} SMqVgEp;
......
......@@ -67,7 +67,7 @@ typedef struct {
// tqExec
typedef struct {
char* qmsg;
char* qmsg; // SubPlanToString
} STqExecCol;
typedef struct {
......
......@@ -575,7 +575,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
#if 1
// till now, all data has been rsp to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
......@@ -597,7 +596,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
}
#endif
taosWUnLockLatch(&pTq->pushLock);
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
......@@ -613,10 +611,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
// for taosx
/*A(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);*/
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, &req);
......
......@@ -307,10 +307,10 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while (1) {
if (!fromProcessedMsg) {
SWalReader* pWalReader = pReader->pWalReader;
if (walNextValidMsg(pWalReader) < 0) {
pReader->ver = pWalReader->curVersion - (pWalReader->curInvalid | pWalReader->curStopped);
if (walNextValidMsg(pReader->pWalReader) < 0) {
pReader->ver =
pReader->pWalReader->curVersion - pReader->pWalReader->curStopped;
// pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped);
ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver;
ret->fetchType = FETCH_TYPE__NONE;
......@@ -318,25 +318,11 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
return -1;
}
void* body = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pWalReader->pHead->head.version;
tqDebug("tmq poll: extract submit msg from wal, version:%"PRId64" len:%d", ver, bodyLen);
void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pReader->pWalReader->pHead->head.version;
#if 0
if (pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
// TODO do filter
ret->fetchType = FETCH_TYPE__META;
ret->meta = pWalReader->pHead->head.body;
return 0;
} else {
#endif
tqReaderSetSubmitReq2(pReader, body, bodyLen, ver);
/*tqReaderSetDataMsg(pReader, body, pWalReader->pHead->head.version);*/
#if 0
}
#endif
}
while (tqNextDataBlock2(pReader)) {
......
......@@ -1651,8 +1651,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} else if (ret.fetchType == FETCH_TYPE__NONE ||
(ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
char formatBuf[80];
tFormatOffset(formatBuf, 80, &ret.offset);
qDebug("queue scan log return null, offset %s", formatBuf);
......@@ -1660,16 +1658,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
return NULL;
}
}
#if 0
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
qDebug("stream scan tsdb return %d rows", pResult->info.rows);
return pResult;
}
qDebug("stream scan tsdb return null");
return NULL;
#endif
} else {
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type);
return NULL;
......
......@@ -200,6 +200,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
pReader->curVersion, pReader->curInvalid, ver);
pReader->curVersion = ver;
pReader->curInvalid = 0;
return 0;
}
......@@ -210,8 +211,8 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
return 0;
}
pReader->curInvalid = 1;
pReader->curVersion = ver;
// pReader->curInvalid = 1;
// pReader->curVersion = ver;
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
......@@ -219,8 +220,8 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
if (ver < pWal->vers.snapshotVer) {
}
// if (ver < pWal->vers.snapshotVer) {
// }
if (walReadSeekVerImpl(pReader, ver) < 0) {
return -1;
......@@ -239,8 +240,8 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
if (pRead->curInvalid || pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) {
pRead->curVersion = fetchVer;
pRead->curInvalid = 1;
// pRead->curVersion = fetchVer;
// pRead->curInvalid = 1;
return -1;
}
seeked = true;
......@@ -259,11 +260,11 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curInvalid = 1;
// pRead->curInvalid = 1;
return -1;
}
}
pRead->curInvalid = 0;
// pRead->curInvalid = 0;
return 0;
}
......@@ -295,13 +296,14 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pReader->curInvalid = 1;
// pRead->curInvalid = 1;
return -1;
}
if (walValidBodyCksum(pReader->pHead) != 0) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver);
pReader->curInvalid = 1;
// pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
......@@ -317,7 +319,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curInvalid = 1;
// pRead->curInvalid = 1;
return -1;
}
......@@ -345,8 +347,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
if (pRead->curInvalid || pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver);
if (code < 0) {
pRead->curVersion = ver;
pRead->curInvalid = 1;
// pRead->curVersion = ver;
// pRead->curInvalid = 1;
return -1;
}
seeked = true;
......@@ -366,7 +368,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curInvalid = 1;
// pRead->curInvalid = 1;
return -1;
}
}
......@@ -379,7 +381,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return -1;
}
pRead->curInvalid = 0;
// pRead->curInvalid = 0;
return 0;
}
......@@ -394,7 +396,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curInvalid = 1;
// pRead->curInvalid = 1;
return -1;
}
......@@ -433,14 +435,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
pRead->pWal->cfg.vgId, pReadHead->version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curInvalid = 1;
// pRead->curInvalid = 1;
return -1;
}
if (pReadHead->version != ver) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pReadHead->version, ver);
pRead->curInvalid = 1;
// pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
......@@ -448,7 +450,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
if (walValidBodyCksum(*ppHead) != 0) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver);
pRead->curInvalid = 1;
// pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
......@@ -544,7 +546,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
if (pReader->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
pReader->pHead->head.version, ver);
pReader->curInvalid = 1;
// pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pReader->mutex);
return -1;
......@@ -557,7 +559,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen);
uint32_t logCkSum = pReader->pHead->cksumBody;
wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
pReader->curInvalid = 1;
// pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pReader->mutex);
return -1;
......@@ -575,5 +577,6 @@ void walReadReset(SWalReader *pReader) {
taosCloseFile(&pReader->pLogFile);
pReader->curInvalid = 1;
pReader->curFileFirstVer = -1;
pReader->curVersion = -1;
taosThreadMutexUnlock(&pReader->mutex);
}
......@@ -80,16 +80,16 @@ class TDTestCase:
tdLog.debug("del data ............ [OK]")
return
def threadFunctionForDeletaData(self, **paraDict):
def threadFunctionForDeletaData(self, paraDict):
# create new connector for new tdSql instance in my thread
newTdSql = tdCom.newTdSql()
self.delData(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["startTs"],paraDict["endTs"],paraDict["ctbStartIdx"])
return
def asyncDeleteData(self, paraDict):
pThread = threading.Thread(target=self.threadFunctionForDeletaData, kwargs=paraDict)
pThread.start()
return pThread
# def asyncDeleteData(self, paraDict):
# pThread = threading.Thread(target=self.threadFunctionForDeletaData, kwargs=paraDict)
# pThread.start()
# return pThread
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
......@@ -340,7 +340,8 @@ class TDTestCase:
# del some data
rowsOfDelete = int(self.rowsPerTbl / 4 )
paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1
pDeleteThread = self.asyncDeleteData(paraDict)
# pDeleteThread = self.asyncDeleteData(paraDict)
self.threadFunctionForDeletaData(paraDict)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册