未验证 提交 6e39bec4 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20089 from taosdata/fix/TD-22671

fix:remove useless logic in tmq
......@@ -61,7 +61,7 @@ static int32_t init_env() {
printf("create database\n");
pRes = taos_query(pConn, "drop topic topicname");
if (taos_errno(pRes) != 0) {
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
printf("error in drop topicname, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
......
......@@ -126,7 +126,7 @@ typedef struct SWal {
typedef struct {
int64_t refId;
int64_t refVer;
int64_t refFile;
// int64_t refFile;
SWal *pWal;
} SWalRef;
......
......@@ -297,11 +297,8 @@ void tqCloseReader(STqReader* pReader) {
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
ASSERT(pReader->pWalReader->curInvalid);
ASSERT(pReader->pWalReader->curVersion == ver);
return -1;
}
ASSERT(pReader->pWalReader->curVersion == ver);
return 0;
}
......@@ -362,11 +359,13 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) {
pReader->pMsg = pMsg;
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
if (pReader->pBlock == NULL) break;
}
// if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
// while (true) {
// if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pReader->pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen,
// pReader->msgIter.len, pReader->msgIter.uid);
// if (pReader->pBlock == NULL) break;
// }
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
pReader->ver = ver;
......
......@@ -1047,18 +1047,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
#if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version,
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
ASSERT(0);
}
#endif
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
return -1;
}
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t uid = pOffset->uid;
......
......@@ -1618,7 +1618,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
return NULL;
}
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
} else {
return NULL;
}
......
......@@ -96,8 +96,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
if (walSkipFetchBodyNew(pReader) < 0) {
return -1;
}
fetchVer++;
ASSERT(fetchVer == pReader->curVersion);
fetchVer = pReader->curVersion;
}
}
pReader->curStopped = 1;
......@@ -144,7 +143,7 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int
}
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
char fnameStr[WAL_FILE_LEN];
char fnameStr[WAL_FILE_LEN] = {0};
taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pReader->pLogFile);
......@@ -300,14 +299,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
return -1;
}
if (pReadHead->version != ver) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver);
pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (walValidBodyCksum(pRead->pHead) != 0) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
pRead->curInvalid = 1;
......
......@@ -26,7 +26,7 @@ SWalRef *walOpenRef(SWal *pWal) {
}
pRef->refId = tGenIdPI64();
pRef->refVer = -1;
pRef->refFile = -1;
// pRef->refFile = -1;
pRef->pWal = pWal;
taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *));
return pRef;
......@@ -58,11 +58,11 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
pRef->refVer = ver;
// bsearch in fileSet
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
pRef->refFile = pRet->firstVer;
// SWalFileInfo tmpInfo;
// tmpInfo.firstVer = ver;
// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
// ASSERT(pRet != NULL);
// pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
}
......@@ -73,7 +73,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
#if 1
void walUnrefVer(SWalRef *pRef) {
pRef->refId = -1;
pRef->refFile = -1;
// pRef->refFile = -1;
}
#endif
......@@ -85,20 +85,18 @@ SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
}
}
taosThreadMutexLock(&pWal->mutex);
int64_t ver = walGetFirstVer(pWal);
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
pRef->refVer = ver;
// bsearch in fileSet
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
pRef->refFile = pRet->firstVer;
// SWalFileInfo tmpInfo;
// tmpInfo.firstVer = ver;
// SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
// ASSERT(pRet != NULL);
// pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
return pRef;
}
......@@ -119,7 +117,7 @@ SWalRef *walRefCommittedVer(SWal *pWal) {
tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
pRef->refFile = pRet->firstVer;
// pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex);
return pRef;
......
......@@ -132,7 +132,8 @@ int tsem_wait(tsem_t *psem) {
int tsem_timewait(tsem_t *psem, int64_t milis) {
if (psem == NULL || *psem == NULL) return -1;
dispatch_semaphore_wait(*psem, milis * 1000 * 1000);
dispatch_time_t time = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(milis * USEC_PER_SEC));
dispatch_semaphore_wait(*psem, time);
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册