提交 a2107992 编写于 作者: L Liu Jicong

refactor(stream)

上级 0fd4f3b5
...@@ -176,7 +176,7 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts); ...@@ -176,7 +176,7 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
......
...@@ -161,6 +161,7 @@ typedef struct { ...@@ -161,6 +161,7 @@ typedef struct {
int64_t curFileFirstVer; int64_t curFileFirstVer;
int64_t curVersion; int64_t curVersion;
int64_t capacity; int64_t capacity;
int8_t curInvalid;
TdThreadMutex mutex; TdThreadMutex mutex;
SWalFilterCond cond; SWalFilterCond cond;
SWalCkHead *pHead; SWalCkHead *pHead;
......
...@@ -270,6 +270,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -270,6 +270,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
// 2.reset offset if needed // 2.reset offset if needed
if (reqOffset.type > 0) { if (reqOffset.type > 0) {
fetchOffsetNew = reqOffset; fetchOffsetNew = reqOffset;
...@@ -293,40 +296,24 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -293,40 +296,24 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal)); tqOffsetResetToLog(&fetchOffsetNew, walGetFirstVer(pTq->pVnode->pWal));
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
tqOffsetResetToLog(&fetchOffsetNew, walGetLastVer(pTq->pVnode->pWal)); tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer %ld, subkey %s, offset reset to %ld", consumerId, pHandle->subKey, tqDebug("tmq poll: consumer %ld, subkey %s, offset reset to %ld", consumerId, pHandle->subKey,
fetchOffsetNew.version); dataRsp.rspOffset.version);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
dataRsp.rspOffset = fetchOffsetNew;
code = 0;
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1; code = -1;
} }
taosArrayDestroy(dataRsp.blockDataLen); goto OVER;
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
if (dataRsp.withSchema) {
taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
}
if (dataRsp.withTbName) {
taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree);
}
return code;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey %s, no offset committed for consumer %ld in vg %d, subkey %s, reset none failed", tqError("tmq poll: subkey %s, no offset committed for consumer %ld in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey); pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1; code = -1;
goto OVER;
} }
} }
} }
// 3.query // 3.query
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
fetchOffsetNew.version++; fetchOffsetNew.version++;
if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) { if (tqScanLog(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
...@@ -335,7 +322,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -335,7 +322,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
goto OVER; goto OVER;
} }
if (dataRsp.blockNum == 0) { if (dataRsp.blockNum == 0) {
// TODO add to async task // TODO add to async task pool
/*dataRsp.rspOffset.version--;*/ /*dataRsp.rspOffset.version--;*/
} }
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
...@@ -348,7 +335,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -348,7 +335,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
return -1; code = -1;
goto OVER;
} }
walSetReaderCapacity(pHandle->pWalReader, 2048); walSetReaderCapacity(pHandle->pWalReader, 2048);
......
...@@ -62,7 +62,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { ...@@ -62,7 +62,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { int64_t tqScanLog(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
qTaskInfo_t task = pExec->execCol.task[0]; qTaskInfo_t task = pExec->execCol.task[0];
if (qStreamPrepareScan1(task, pOffset) < 0) { if (qStreamPrepareScan(task, pOffset) < 0) {
pRsp->rspOffset = *pOffset; pRsp->rspOffset = *pOffset;
pRsp->rspOffset.version--; pRsp->rspOffset.version--;
return 0; return 0;
...@@ -110,10 +110,6 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S ...@@ -110,10 +110,6 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId]; qTaskInfo_t task = pExec->execCol.task[workerId];
/*if (qStreamScanSnapshot(task) < 0) {*/
/*ASSERT(0);*/
/*}*/
if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) { if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) {
ASSERT(0); ASSERT(0);
} }
......
...@@ -104,8 +104,13 @@ void tqCloseReader(STqReader* pReader) { ...@@ -104,8 +104,13 @@ void tqCloseReader(STqReader* pReader) {
} }
int32_t tqSeekVer(STqReader* pReader, int64_t ver) { int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
// if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
return walReadSeekVer(pReader->pWalReader, ver); ASSERT(pReader->pWalReader->curInvalid);
ASSERT(pReader->pWalReader->curVersion == ver);
return -1;
}
ASSERT(pReader->pWalReader->curVersion == ver);
return 0;
} }
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
...@@ -114,9 +119,11 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -114,9 +119,11 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while (1) { while (1) {
if (!fromProcessedMsg) { if (!fromProcessedMsg) {
if (walNextValidMsg(pReader->pWalReader) < 0) { if (walNextValidMsg(pReader->pWalReader) < 0) {
pReader->ver = pReader->pWalReader->curVersion;
ret->offset.type = TMQ_OFFSET__LOG; ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver; ret->offset.version = pReader->ver;
ret->fetchType = FETCH_TYPE__NONE; ret->fetchType = FETCH_TYPE__NONE;
ASSERT(ret->offset.version >= 0);
return -1; return -1;
} }
void* body = pReader->pWalReader->pHead->head.body; void* body = pReader->pWalReader->pHead->head.body;
...@@ -131,19 +138,12 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -131,19 +138,12 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
} }
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
// TODO mem free
memset(&ret->data, 0, sizeof(SSDataBlock)); memset(&ret->data, 0, sizeof(SSDataBlock));
int32_t code = tqRetrieveDataBlock(&ret->data, pReader); int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
if (code != 0 || ret->data.info.rows == 0) { if (code != 0 || ret->data.info.rows == 0) {
ASSERT(0); ASSERT(0);
continue; continue;
#if 0
if (fromProcessedMsg) {
ret->fetchType = FETCH_TYPE__NONE;
return 0;
} else {
break;
}
#endif
} }
ret->fetchType = FETCH_TYPE__DATA; ret->fetchType = FETCH_TYPE__DATA;
return 0; return 0;
...@@ -152,7 +152,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -152,7 +152,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
if (fromProcessedMsg) { if (fromProcessedMsg) {
ret->offset.type = TMQ_OFFSET__LOG; ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver; ret->offset.version = pReader->ver;
ASSERT(pReader->ver != -1); ASSERT(pReader->ver >= 0);
ret->fetchType = FETCH_TYPE__NONE; ret->fetchType = FETCH_TYPE__NONE;
return 0; return 0;
} }
......
...@@ -280,7 +280,7 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { ...@@ -280,7 +280,7 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
return 0; return 0;
} }
int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
...@@ -293,8 +293,55 @@ int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { ...@@ -293,8 +293,55 @@ int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) { if (pOffset->type == TMQ_OFFSET__LOG) {
return -1; if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) {
return -1;
}
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
pInfo->blockType = STREAM_INPUT__TABLE_SCAN;
int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts;
if (uid == 0) {
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
}
}
if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||
pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
bool found = false;
for (int32_t i = 0; i < tableSz; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
if (pTableInfo->uid == uid) {
found = true;
pTableScanInfo->currentTable = i;
}
}
// TODO after dropping table, table may be not found
ASSERT(found);
tsdbSetTableId(pTableScanInfo->dataReader, uid);
int64_t oldSkey = pTableScanInfo->cond.twindows[0].skey;
pTableScanInfo->cond.twindows[0].skey = ts + 1;
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
pTableScanInfo->cond.twindows[0].skey = oldSkey;
pTableScanInfo->scanTimes = 0;
pTableScanInfo->curTWinIdx = 0;
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
pTableScanInfo->currentTable, tableSz);
} else {
// switch to log
}
} else {
ASSERT(0);
} }
return 0; return 0;
} else { } else {
......
...@@ -1234,17 +1234,21 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1234,17 +1234,21 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.metaBlk = ret.meta; pTaskInfo->streamInfo.metaBlk = ret.meta;
return NULL; return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) { } else if (ret.fetchType == FETCH_TYPE__NONE) {
if (ret.offset.version == -1) { /*if (ret.offset.version == -1) {*/
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; /*pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;*/
pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1; /*pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;*/
} else { /*} else {*/
pTaskInfo->streamInfo.lastStatus = ret.offset; pTaskInfo->streamInfo.lastStatus = ret.offset;
} ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
/*}*/
return NULL; return NULL;
} else { } else {
ASSERT(0); ASSERT(0);
} }
} }
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL;
} }
size_t total = taosArrayGetSize(pInfo->pBlockLists); size_t total = taosArrayGetSize(pInfo->pBlockLists);
...@@ -1448,6 +1452,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1448,6 +1452,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) { } else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) {
ASSERT(0);
// check reader last status // check reader last status
// if not match, reset status // if not match, reset status
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
......
...@@ -30,8 +30,9 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { ...@@ -30,8 +30,9 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
pRead->pWal = pWal; pRead->pWal = pWal;
pRead->pIdxFile = NULL; pRead->pIdxFile = NULL;
pRead->pLogFile = NULL; pRead->pLogFile = NULL;
pRead->curVersion = -1; pRead->curVersion = -5;
pRead->curFileFirstVer = -1; pRead->curFileFirstVer = -1;
pRead->curInvalid = 1;
pRead->capacity = 0; pRead->capacity = 0;
if (cond) if (cond)
pRead->cond = *cond; pRead->cond = *cond;
...@@ -149,10 +150,14 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) { ...@@ -149,10 +150,14 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
SWal *pWal = pRead->pWal; SWal *pWal = pRead->pWal;
if (ver == pRead->curVersion) { if (!pRead->curInvalid && ver == pRead->curVersion) {
wDebug("wal version %ld match, no need to reset", ver); wDebug("wal version %ld match, no need to reset", ver);
return 0; return 0;
} }
pRead->curInvalid = 1;
pRead->curVersion = ver;
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pWal->vers.firstVer, pWal->vers.lastVer); wError("invalid version: % " PRId64 ", first ver %ld, last ver %ld", ver, pWal->vers.firstVer, pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
...@@ -189,9 +194,11 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity ...@@ -189,9 +194,11 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
int64_t contLen; int64_t contLen;
if (pRead->curVersion != fetchVer) { if (pRead->curInvalid || pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) { if (walReadSeekVer(pRead, fetchVer) < 0) {
ASSERT(0); ASSERT(0);
pRead->curVersion = fetchVer;
pRead->curInvalid = 1;
return -1; return -1;
} }
} }
...@@ -203,7 +210,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -203,7 +210,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
ASSERT(0); ASSERT(0);
pRead->curVersion = -1; pRead->curInvalid = 1;
return -1; return -1;
} }
return 0; return 0;
...@@ -234,14 +241,14 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { ...@@ -234,14 +241,14 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
pRead->pHead->head.version, ver); pRead->pHead->head.version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
pRead->curVersion = -1; pRead->curInvalid = 1;
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
if (pReadHead->version != ver) { if (pReadHead->version != ver) {
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver); wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
pRead->curVersion = -1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
return -1; return -1;
...@@ -249,7 +256,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { ...@@ -249,7 +256,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
if (walValidBodyCksum(pRead->pHead) != 0) { if (walValidBodyCksum(pRead->pHead) != 0) {
wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver); wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver);
pRead->curVersion = -1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
return -1; return -1;
...@@ -268,7 +275,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { ...@@ -268,7 +275,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR); code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1; pRead->curInvalid = 1;
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
...@@ -287,7 +294,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { ...@@ -287,7 +294,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return -1; return -1;
} }
if (pRead->curVersion != ver) { if (pRead->curInvalid || pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver); code = walReadSeekVer(pRead, ver);
if (code < 0) return -1; if (code < 0) return -1;
} }
...@@ -318,7 +325,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { ...@@ -318,7 +325,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1; pRead->curInvalid = 1;
return -1; return -1;
} }
...@@ -349,14 +356,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -349,14 +356,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
if (pReadHead->version != ver) { if (pReadHead->version != ver) {
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver); wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
pRead->curVersion = -1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
if (walValidBodyCksum(*ppHead) != 0) { if (walValidBodyCksum(*ppHead) != 0) {
wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver); wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver);
pRead->curVersion = -1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -373,8 +380,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -373,8 +380,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
return -1; return -1;
} }
// TODO: check wal life if (pRead->curInvalid || pRead->curVersion != ver) {
if (pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) { if (walReadSeekVer(pRead, ver) < 0) {
wError("unexpected wal log version: % " PRId64 ", since %s", ver, terrstr()); wError("unexpected wal log version: % " PRId64 ", since %s", ver, terrstr());
return -1; return -1;
...@@ -432,7 +438,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -432,7 +438,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
if (pRead->pHead->head.version != ver) { if (pRead->pHead->head.version != ver) {
wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, wError("unexpected wal log version: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version,
ver); ver);
pRead->curVersion = -1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -440,7 +446,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -440,7 +446,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
code = walValidBodyCksum(pRead->pHead); code = walValidBodyCksum(pRead->pHead);
if (code != 0) { if (code != 0) {
wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver); wError("unexpected wal log version: % " PRId64 ", since body checksum not passed", ver);
pRead->curVersion = -1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册