未验证 提交 366cd187 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21314 from taosdata/fix/liaohj_main

fix(stream):extract delete msg from wal.
......@@ -135,7 +135,6 @@ typedef struct {
typedef struct {
int8_t type;
int64_t ver;
int32_t* dataRef;
SSDataBlock* pBlock;
} SStreamRefDataBlock;
......
......@@ -135,6 +135,7 @@ typedef struct {
// int8_t scanUncommited;
int8_t scanNotApplied;
int8_t scanMeta;
int8_t deleteMsg;
int8_t enableRef;
} SWalFilterCond;
......@@ -193,9 +194,10 @@ SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void walCloseReader(SWalReader *pRead);
void walReadReset(SWalReader *pReader);
int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver);
int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader* pReader);
int64_t walReaderGetValidFirstVer(const SWalReader* pReader);
// only for tq usage
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
......
......@@ -264,7 +264,7 @@ int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
int32_t tqNextBlockInWal(STqReader* pReader);
bool tqNextBlockImpl(STqReader *pReader);
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
......
......@@ -187,8 +187,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqStreamTasksScanWal(STQ* pTq);
// tq util
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
#ifdef __cplusplus
......
......@@ -213,7 +213,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
......
......@@ -664,7 +664,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
SWalFilterCond cond = {.deleteMsg = 1};
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
}
streamSetupTrigger(pTask);
......@@ -868,7 +869,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
memcpy(serializedReq, &req, len);
// dispatch msg
tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
tqDebug("s-task:%s start to recover blocking stage", pTask->id.idStr);
SRpcMsg rpcMsg = {
.code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
......@@ -892,6 +893,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
return -1;
}
qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
walReaderSeekVer(pTask->exec.pWalReader, sversion);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
......@@ -958,7 +962,60 @@ int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock) {
SDecoder* pCoder = &(SDecoder){0};
SDeleteRes* pRes = &(SDeleteRes){0};
*pRefBlock = NULL;
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
if (pRes->uidList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tDecoderInit(pCoder, (uint8_t*)pData, len);
tDecodeDeleteRes(pCoder, pRes);
tDecoderClear(pCoder);
int32_t numOfTables = taosArrayGetSize(pRes->uidList);
if (numOfTables == 0 || pRes->affectedRows == 0) {
taosArrayDestroy(pRes->uidList);
return TSDB_CODE_SUCCESS;
}
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
blockDataEnsureCapacity(pDelBlock, numOfTables);
pDelBlock->info.rows = numOfTables;
pDelBlock->info.version = ver;
for (int32_t i = 0; i < numOfTables; i++) {
// start key column
SColumnInfoData* pStartCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataSetVal(pStartCol, i, (const char*)&pRes->skey, false); // end key column
SColumnInfoData* pEndCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
colDataSetVal(pEndCol, i, (const char*)&pRes->ekey, false);
// uid column
SColumnInfoData* pUidCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
int64_t* pUid = taosArrayGet(pRes->uidList, i);
colDataSetVal(pUidCol, i, (const char*)pUid, false);
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, GROUPID_COLUMN_INDEX), i);
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), i);
colDataSetNULL(taosArrayGet(pDelBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), i);
}
taosArrayDestroy(pRes->uidList);
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
if (pRefBlock == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pRefBlock)->type = STREAM_INPUT__REF_DATA_BLOCK;
(*pRefBlock)->pBlock = pDelBlock;
return TSDB_CODE_SUCCESS;
}
int32_t tqProcessDeleteDataReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
bool failed = false;
SDecoder* pCoder = &(SDecoder){0};
SDeleteRes* pRes = &(SDeleteRes){0};
......@@ -978,6 +1035,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
taosArrayDestroy(pRes->uidList);
return 0;
}
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
blockDataEnsureCapacity(pDelBlock, sz);
pDelBlock->info.rows = sz;
......@@ -1024,8 +1082,6 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
pRefBlock->pBlock = pDelBlock;
pRefBlock->dataRef = pRef;
atomic_add_fetch_32(pRefBlock->dataRef, 1);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
atomic_sub_fetch_32(pRef, 1);
......
......@@ -34,12 +34,12 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
return 0;
}
if (msgType == TDMT_VND_SUBMIT) {
if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) {
tqStartStreamTasks(pTq);
}
if (msgType == TDMT_VND_DELETE) {
tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
// tqProcessDeleteDataReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
}
}
......
......@@ -294,32 +294,51 @@ void tqCloseReader(STqReader* pReader) {
}
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
return -1;
}
tqDebug("wal reader seek to ver:%"PRId64" %s", ver, id);
return 0;
}
int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) {
if (walNextValidMsg(pReader) < 0) {
return -1;
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) {
int32_t code = walNextValidMsg(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pReader->pHead->head.version;
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return -1;
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
return -1;
}
memcpy(data, pBody, len);
SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
*pItem = (SStreamQueueItem*)streamDataSubmitNew(data1, STREAM_INPUT__DATA_SUBMIT);
if (*pItem == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("%s failed to create data submit for stream since out of memory", id);
return terrno;
}
} else if (pReader->pHead->head.msgType == TDMT_VND_DELETE) {
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
} else {
ASSERT(0);
}
memcpy(data, pBody, len);
*pPackedData = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
return 0;
}
......
......@@ -107,42 +107,54 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = false;
// seek the stored version and extract data from WAL
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
SWal *pWal = pTask->exec.pWalReader->pWal;
if (pTask->chkInfo.currentVer < pWal->vers.firstVer ) {
pTask->chkInfo.currentVer = pWal->vers.firstVer;
code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) {
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
if (pTask->chkInfo.currentVer < firstVer) {
pTask->chkInfo.currentVer = firstVer;
tqWarn("vgId:%d s-task:%s ver earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId,
pTask->id.idStr, firstVer, pTask->chkInfo.currentVer);
// todo need retry if failed
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
} else {
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
if (currentVer == -1) {
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
}
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// append the data for the stream
tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
// tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer);
SPackedData packData = {0};
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
SStreamQueueItem* pItem = NULL;
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT);
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr);
// delete ignore
if (pItem == NULL) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
noNewDataInWal = false;
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
code = tqAddInputBlockNLaunchTask(pTask, pItem);
if (code == TSDB_CODE_SUCCESS) {
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
......@@ -151,8 +163,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
tqError("s-task:%s append input queue failed, ver:%"PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
}
streamDataSubmitDestroy(p);
taosFreeQitem(p);
streamMetaReleaseTask(pStreamMeta, pTask);
}
......
......@@ -82,6 +82,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
qStreamSetOpen(task);
tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) {
tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr());
......
......@@ -26,10 +26,10 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
return taosStrdup(buf);
}
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) {
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) {
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
if (code < 0) {
tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver);
tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr);
return -1;
}
......
......@@ -1821,7 +1821,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info;
qDebug("stream scan called");
qDebug("stream scan started, %s", GET_TASKID(pTaskInfo));
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
......@@ -1830,13 +1830,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
pTSInfo->base.cond.startVersion = 0;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
} else {
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
}
......@@ -1941,7 +1941,6 @@ FETCH_NEXT_BLOCK:
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
if (pInfo->validBlockIndex >= total) {
doClearBufferedBlocks(pInfo);
/*pOperator->status = OP_EXEC_DONE;*/
return NULL;
}
......
......@@ -288,14 +288,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
int8_t type = pItem->type;
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem);
if (pSubmitBlock == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
SStreamDataSubmit2* pSubmitBlock = (SStreamDataSubmit2*)pItem;
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
qDebug("s-task:%s submit enqueue %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr,
pItem, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
......
......@@ -194,13 +194,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosFreeQitem(pMerge);
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
int32_t ref = atomic_sub_fetch_32(pRefBlock->dataRef, 1);
ASSERT(ref >= 0);
if (ref == 0) {
blockDataDestroy(pRefBlock->pBlock);
taosMemoryFree(pRefBlock->dataRef);
}
blockDataDestroy(pRefBlock->pBlock);
taosFreeQitem(pRefBlock);
}
}
......@@ -60,7 +60,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("st-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data;
......
......@@ -224,12 +224,19 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req*
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.pExecutor;
const char* id = pTask->id.idStr;
qDebug("s-task:%s recover step2(blocking stage) started", pTask->id.idStr);
int64_t st = taosGetTimestampMs();
qDebug("s-task:%s recover step2(blocking stage) started", id);
if (qStreamSourceRecoverStep2(exec, ver) < 0) {
}
return streamScanExec(pTask, 100);
int32_t code = streamScanExec(pTask, 100);
double el = (taosGetTimestampMs() - st) / 1000.0;
qDebug("s-task:%s recover step2(blocking stage) ended, elapsed time:%.2fs", id, el);
return code;
}
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
......
......@@ -62,9 +62,6 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
void walCloseReader(SWalReader *pReader) {
taosCloseFile(&pReader->pIdxFile);
taosCloseFile(&pReader->pLogFile);
/*if (pReader->cond.enableRef) {*/
/*taosHashRemove(pReader->pWal->pRefHash, &pReader->readerId, sizeof(int64_t));*/
/*}*/
taosMemoryFreeClear(pReader->pHead);
taosMemoryFree(pReader);
}
......@@ -74,22 +71,25 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal);
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
// taosMsleep(10);
}
// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
int64_t endVer = TMIN(appliedVer, committedVer);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", applied index:%" PRId64", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
while (fetchVer <= endVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) {
return -1;
}
if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT ||
(IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) {
int32_t type = pReader->pHead->head.msgType;
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
if (walFetchBodyNew(pReader) < 0) {
return -1;
}
......@@ -98,13 +98,16 @@ int32_t walNextValidMsg(SWalReader *pReader) {
if (walSkipFetchBodyNew(pReader) < 0) {
return -1;
}
fetchVer = pReader->curVersion;
}
}
return -1;
}
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); }
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0;
......@@ -206,7 +209,7 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
return 0;
}
int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
SWal *pWal = pReader->pWal;
if (ver == pReader->curVersion) {
wDebug("vgId:%d, wal index:%" PRId64 " match, no need to reset", pReader->pWal->cfg.vgId, ver);
......@@ -236,7 +239,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) {
if (walReaderSeekVer(pRead, fetchVer) < 0) {
return -1;
}
seeked = true;
......@@ -278,6 +281,7 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReader->pHead = ptr;
pReadHead = &pReader->pHead->head;
pReader->capacity = pReadHead->bodyLen;
......@@ -293,14 +297,11 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
// pRead->curInvalid = 1;
return -1;
}
if (walValidBodyCksum(pReader->pHead) != 0) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver);
// pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
......@@ -342,7 +343,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
}
if (pRead->curVersion != ver) {
code = walReadSeekVer(pRead, ver);
code = walReaderSeekVer(pRead, ver);
if (code < 0) {
// pRead->curVersion = ver;
// pRead->curInvalid = 1;
......@@ -479,7 +480,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
taosThreadMutexLock(&pReader->mutex);
if (pReader->curVersion != ver) {
if (walReadSeekVer(pReader, ver) < 0) {
if (walReaderSeekVer(pReader, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
taosThreadMutexUnlock(&pReader->mutex);
return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册