提交 b28c0ad9 编写于 作者: wmmhello's avatar wmmhello

fix:remove useless logic in tmq

上级 6052018e
...@@ -61,7 +61,7 @@ static int32_t init_env() { ...@@ -61,7 +61,7 @@ static int32_t init_env() {
printf("create database\n"); printf("create database\n");
pRes = taos_query(pConn, "drop topic topicname"); pRes = taos_query(pConn, "drop topic topicname");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes)); printf("error in drop topicname, reason:%s\n", taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
......
...@@ -126,7 +126,7 @@ typedef struct SWal { ...@@ -126,7 +126,7 @@ typedef struct SWal {
typedef struct { typedef struct {
int64_t refId; int64_t refId;
int64_t refVer; int64_t refVer;
int64_t refFile; // int64_t refFile;
SWal *pWal; SWal *pWal;
} SWalRef; } SWalRef;
......
...@@ -297,11 +297,8 @@ void tqCloseReader(STqReader* pReader) { ...@@ -297,11 +297,8 @@ void tqCloseReader(STqReader* pReader) {
int32_t tqSeekVer(STqReader* pReader, int64_t ver) { int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
if (walReadSeekVer(pReader->pWalReader, ver) < 0) { if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
ASSERT(pReader->pWalReader->curInvalid);
ASSERT(pReader->pWalReader->curVersion == ver);
return -1; return -1;
} }
ASSERT(pReader->pWalReader->curVersion == ver);
return 0; return 0;
} }
...@@ -362,11 +359,13 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -362,11 +359,13 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) { int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) {
pReader->pMsg = pMsg; pReader->pMsg = pMsg;
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; // if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
while (true) { // while (true) {
if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1; // if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
if (pReader->pBlock == NULL) break; // tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pReader->pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen,
} // pReader->msgIter.len, pReader->msgIter.uid);
// if (pReader->pBlock == NULL) break;
// }
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
pReader->ver = ver; pReader->ver = ver;
......
...@@ -1049,18 +1049,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1049,18 +1049,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader); tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL; pTSInfo->base.dataReader = NULL;
#if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version,
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
ASSERT(0);
}
#endif
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
return -1; return -1;
} }
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t uid = pOffset->uid; int64_t uid = pOffset->uid;
......
...@@ -1618,7 +1618,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1618,7 +1618,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer); tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
return NULL; return NULL;
} }
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
} else { } else {
return NULL; return NULL;
} }
......
...@@ -96,8 +96,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { ...@@ -96,8 +96,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
if (walSkipFetchBodyNew(pReader) < 0) { if (walSkipFetchBodyNew(pReader) < 0) {
return -1; return -1;
} }
fetchVer++; fetchVer = pReader->curVersion;
ASSERT(fetchVer == pReader->curVersion);
} }
} }
pReader->curStopped = 1; pReader->curStopped = 1;
...@@ -144,7 +143,7 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int ...@@ -144,7 +143,7 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int
} }
static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) { static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) {
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN] = {0};
taosCloseFile(&pReader->pIdxFile); taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pReader->pLogFile); taosCloseFile(&pReader->pLogFile);
...@@ -300,14 +299,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { ...@@ -300,14 +299,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
return -1; return -1;
} }
if (pReadHead->version != ver) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver);
pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
if (walValidBodyCksum(pRead->pHead) != 0) { if (walValidBodyCksum(pRead->pHead) != 0) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
......
...@@ -26,7 +26,7 @@ SWalRef *walOpenRef(SWal *pWal) { ...@@ -26,7 +26,7 @@ SWalRef *walOpenRef(SWal *pWal) {
} }
pRef->refId = tGenIdPI64(); pRef->refId = tGenIdPI64();
pRef->refVer = -1; pRef->refVer = -1;
pRef->refFile = -1; // pRef->refFile = -1;
pRef->pWal = pWal; pRef->pWal = pWal;
taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *)); taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *));
return pRef; return pRef;
...@@ -58,11 +58,11 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { ...@@ -58,11 +58,11 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
pRef->refVer = ver; pRef->refVer = ver;
// bsearch in fileSet // bsearch in fileSet
SWalFileInfo tmpInfo; // SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver; // tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); // SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL); // ASSERT(pRet != NULL);
pRef->refFile = pRet->firstVer; // pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
} }
...@@ -73,7 +73,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { ...@@ -73,7 +73,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
#if 1 #if 1
void walUnrefVer(SWalRef *pRef) { void walUnrefVer(SWalRef *pRef) {
pRef->refId = -1; pRef->refId = -1;
pRef->refFile = -1; // pRef->refFile = -1;
} }
#endif #endif
...@@ -85,20 +85,18 @@ SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) { ...@@ -85,20 +85,18 @@ SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
} }
} }
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
int64_t ver = walGetFirstVer(pWal); int64_t ver = walGetFirstVer(pWal);
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
pRef->refVer = ver; pRef->refVer = ver;
// bsearch in fileSet // bsearch in fileSet
SWalFileInfo tmpInfo; // SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver; // tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); // SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL); // ASSERT(pRet != NULL);
pRef->refFile = pRet->firstVer; // pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);
return pRef; return pRef;
} }
...@@ -119,7 +117,7 @@ SWalRef *walRefCommittedVer(SWal *pWal) { ...@@ -119,7 +117,7 @@ SWalRef *walRefCommittedVer(SWal *pWal) {
tmpInfo.firstVer = ver; tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL); ASSERT(pRet != NULL);
pRef->refFile = pRet->firstVer; // pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return pRef; return pRef;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册