提交 54a54693 编写于 作者: wmmhello's avatar wmmhello

fix:optimize version logic in tmq and remove useless code

上级 b28c0ad9
...@@ -2605,7 +2605,7 @@ typedef struct { ...@@ -2605,7 +2605,7 @@ typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN]; char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t subType; int8_t subType;
int8_t withMeta; int8_t withMeta;
char* qmsg; char* qmsg; //SubPlanToString
int64_t suid; int64_t suid;
} SMqRebVgReq; } SMqRebVgReq;
......
...@@ -528,7 +528,7 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer ...@@ -528,7 +528,7 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
char* qmsg; char* qmsg; //SubPlanToString
SEpSet epSet; SEpSet epSet;
} SMqVgEp; } SMqVgEp;
......
...@@ -67,7 +67,7 @@ typedef struct { ...@@ -67,7 +67,7 @@ typedef struct {
// tqExec // tqExec
typedef struct { typedef struct {
char* qmsg; char* qmsg; // SubPlanToString
} STqExecCol; } STqExecCol;
typedef struct { typedef struct {
......
...@@ -568,7 +568,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -568,7 +568,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->pushLock);
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
#if 1
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version) { dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
...@@ -588,7 +587,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -588,7 +587,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
} }
#endif
taosWUnLockLatch(&pTq->pushLock); taosWUnLockLatch(&pTq->pushLock);
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
...@@ -605,10 +603,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -605,10 +603,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
// for taosx // for taosx
ASSERT(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN);
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0}; STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, &req); tqInitTaosxRsp(&taosxRsp, &req);
......
...@@ -309,7 +309,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -309,7 +309,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
if (!fromProcessedMsg) { if (!fromProcessedMsg) {
if (walNextValidMsg(pReader->pWalReader) < 0) { if (walNextValidMsg(pReader->pWalReader) < 0) {
pReader->ver = pReader->ver =
pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped); pReader->pWalReader->curVersion - pReader->pWalReader->curStopped;
// pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped);
ret->offset.type = TMQ_OFFSET__LOG; ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver; ret->offset.version = pReader->ver;
ret->fetchType = FETCH_TYPE__NONE; ret->fetchType = FETCH_TYPE__NONE;
...@@ -318,18 +319,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -318,18 +319,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
return -1; return -1;
} }
void* body = pReader->pWalReader->pHead->head.body; void* body = pReader->pWalReader->pHead->head.body;
#if 0
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
// TODO do filter
ret->fetchType = FETCH_TYPE__META;
ret->meta = pReader->pWalReader->pHead->head.body;
return 0;
} else {
#endif
tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version); tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
#if 0
}
#endif
} }
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
......
...@@ -1647,8 +1647,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1647,8 +1647,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
} else if (ret.fetchType == FETCH_TYPE__NONE || } else if (ret.fetchType == FETCH_TYPE__NONE ||
(ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) { (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
pTaskInfo->streamInfo.lastStatus = ret.offset; pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
char formatBuf[80]; char formatBuf[80];
tFormatOffset(formatBuf, 80, &ret.offset); tFormatOffset(formatBuf, 80, &ret.offset);
qDebug("queue scan log return null, offset %s", formatBuf); qDebug("queue scan log return null, offset %s", formatBuf);
...@@ -1656,16 +1654,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1656,16 +1654,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
} }
#if 0
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
qDebug("stream scan tsdb return %d rows", pResult->info.rows);
return pResult;
}
qDebug("stream scan tsdb return null");
return NULL;
#endif
} else { } else {
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type); qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type);
return NULL; return NULL;
......
...@@ -201,6 +201,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { ...@@ -201,6 +201,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
pReader->curVersion, pReader->curInvalid, ver); pReader->curVersion, pReader->curInvalid, ver);
pReader->curVersion = ver; pReader->curVersion = ver;
pReader->curInvalid = 0;
return 0; return 0;
} }
...@@ -211,8 +212,8 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { ...@@ -211,8 +212,8 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
return 0; return 0;
} }
pReader->curInvalid = 1; // pReader->curInvalid = 1;
pReader->curVersion = ver; // pReader->curVersion = ver;
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId, wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
...@@ -220,8 +221,8 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { ...@@ -220,8 +221,8 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
} }
if (ver < pWal->vers.snapshotVer) { // if (ver < pWal->vers.snapshotVer) {
} // }
if (walReadSeekVerImpl(pReader, ver) < 0) { if (walReadSeekVerImpl(pReader, ver) < 0) {
return -1; return -1;
...@@ -240,8 +241,8 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -240,8 +241,8 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
if (pRead->curInvalid || pRead->curVersion != fetchVer) { if (pRead->curInvalid || pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) { if (walReadSeekVer(pRead, fetchVer) < 0) {
pRead->curVersion = fetchVer; // pRead->curVersion = fetchVer;
pRead->curInvalid = 1; // pRead->curInvalid = 1;
return -1; return -1;
} }
seeked = true; seeked = true;
...@@ -260,11 +261,11 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -260,11 +261,11 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
} else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
pRead->curInvalid = 1; // pRead->curInvalid = 1;
return -1; return -1;
} }
} }
pRead->curInvalid = 0; // pRead->curInvalid = 0;
return 0; return 0;
} }
...@@ -295,13 +296,13 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { ...@@ -295,13 +296,13 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver); pRead->pWal->cfg.vgId, pRead->pHead->head.version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
pRead->curInvalid = 1; // pRead->curInvalid = 1;
return -1; return -1;
} }
if (walValidBodyCksum(pRead->pHead) != 0) { if (walValidBodyCksum(pRead->pHead) != 0) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
pRead->curInvalid = 1; // pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -320,7 +321,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { ...@@ -320,7 +321,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR); code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curInvalid = 1; // pRead->curInvalid = 1;
return -1; return -1;
} }
...@@ -348,8 +349,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { ...@@ -348,8 +349,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
if (pRead->curInvalid || pRead->curVersion != ver) { if (pRead->curInvalid || pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver); code = walReadSeekVer(pRead, ver);
if (code < 0) { if (code < 0) {
pRead->curVersion = ver; // pRead->curVersion = ver;
pRead->curInvalid = 1; // pRead->curInvalid = 1;
return -1; return -1;
} }
seeked = true; seeked = true;
...@@ -369,7 +370,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { ...@@ -369,7 +370,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
} else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
pRead->curInvalid = 1; // pRead->curInvalid = 1;
return -1; return -1;
} }
} }
...@@ -382,7 +383,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { ...@@ -382,7 +383,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return -1; return -1;
} }
pRead->curInvalid = 0; // pRead->curInvalid = 0;
return 0; return 0;
} }
...@@ -400,7 +401,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { ...@@ -400,7 +401,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curInvalid = 1; // pRead->curInvalid = 1;
return -1; return -1;
} }
...@@ -439,14 +440,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -439,14 +440,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
pRead->pWal->cfg.vgId, pReadHead->version, ver); pRead->pWal->cfg.vgId, pReadHead->version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
pRead->curInvalid = 1; // pRead->curInvalid = 1;
return -1; return -1;
} }
if (pReadHead->version != ver) { if (pReadHead->version != ver) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pReadHead->version, ver); pReadHead->version, ver);
pRead->curInvalid = 1; // pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -454,7 +455,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -454,7 +455,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
if (walValidBodyCksum(*ppHead) != 0) { if (walValidBodyCksum(*ppHead) != 0) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver); ver);
pRead->curInvalid = 1; // pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -550,7 +551,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -550,7 +551,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
if (pReader->pHead->head.version != ver) { if (pReader->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId, wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
pReader->pHead->head.version, ver); pReader->pHead->head.version, ver);
pReader->curInvalid = 1; // pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pReader->mutex); taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
...@@ -563,7 +564,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -563,7 +564,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen); uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen);
uint32_t logCkSum = pReader->pHead->cksumBody; uint32_t logCkSum = pReader->pHead->cksumBody;
wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum); wError("checksum written into log:%u, checksum calculated:%u", logCkSum, readCkSum);
pReader->curInvalid = 1; // pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pReader->mutex); taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
...@@ -581,5 +582,6 @@ void walReadReset(SWalReader *pReader) { ...@@ -581,5 +582,6 @@ void walReadReset(SWalReader *pReader) {
taosCloseFile(&pReader->pLogFile); taosCloseFile(&pReader->pLogFile);
pReader->curInvalid = 1; pReader->curInvalid = 1;
pReader->curFileFirstVer = -1; pReader->curFileFirstVer = -1;
pReader->curVersion = -1;
taosThreadMutexUnlock(&pReader->mutex); taosThreadMutexUnlock(&pReader->mutex);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册