提交 5c8c1eb8 编写于 作者: J jiajingbin

Merge branch 'main' of https://github.com/taosdata/TDengine into main

...@@ -593,7 +593,6 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock ...@@ -593,7 +593,6 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask); bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamTaskEndScanWAL(SStreamTask* pTask);
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
......
...@@ -153,7 +153,6 @@ struct SWalReader { ...@@ -153,7 +153,6 @@ struct SWalReader {
int64_t capacity; int64_t capacity;
TdThreadMutex mutex; TdThreadMutex mutex;
SWalFilterCond cond; SWalFilterCond cond;
// TODO remove it
SWalCkHead *pHead; SWalCkHead *pHead;
}; };
...@@ -207,10 +206,9 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64 ...@@ -207,10 +206,9 @@ void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset); void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset);
// only for tq usage // only for tq usage
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); int32_t walFetchHead(SWalReader *pRead, int64_t ver);
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchBody(SWalReader *pRead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walSkipFetchBody(SWalReader *pRead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
void walRefFirstVer(SWal *, SWalRef *); void walRefFirstVer(SWal *, SWalRef *);
void walRefLastVer(SWal *, SWalRef *); void walRefLastVer(SWal *, SWalRef *);
......
...@@ -127,7 +127,7 @@ void tqDestroyTqHandle(void* data); ...@@ -127,7 +127,7 @@ void tqDestroyTqHandle(void* data);
// tqRead // tqRead
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset); int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
// tqExec // tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
......
...@@ -1274,6 +1274,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1274,6 +1274,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pTask->tsInfo.step2Start = taosGetTimestampMs(); pTask->tsInfo.step2Start = taosGetTimestampMs();
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
appendTranstateIntoInputQ(pTask); appendTranstateIntoInputQ(pTask);
streamTryExec(pTask); // exec directly
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
...@@ -1525,7 +1526,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1525,7 +1526,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return TSDB_CODE_SUCCESS;
} else { } else {
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId); tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
......
...@@ -184,70 +184,63 @@ end: ...@@ -184,70 +184,63 @@ end:
return tbSuid == realTbSuid; return tbSuid == realTbSuid;
} }
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) { int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
int32_t code = 0; int32_t code = -1;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
taosThreadMutexLock(&pHandle->pWalReader->mutex);
int64_t offset = *fetchOffset; int64_t offset = *fetchOffset;
int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
while (1) { wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64,
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { vgId, offset, lastVer, committedVer, appliedVer);
while (offset <= appliedVer) {
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
", no more log to return, reqId:0x%" PRIx64, ", no more log to return, reqId:0x%" PRIx64,
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId); pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
*fetchOffset = offset;
code = -1;
goto END; goto END;
} }
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId, tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId); pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId);
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppCkHead);
if (code < 0) { if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
*fetchOffset = offset; code = walFetchBody(pHandle->pWalReader);
code = -1;
goto END;
}
*fetchOffset = offset;
code = 0;
goto END; goto END;
} else { } else {
if (pHandle->fetchMeta != WITH_DATA) { if (pHandle->fetchMeta != WITH_DATA) {
SWalCont* pHead = &((*ppCkHead)->head); SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) { if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
code = walFetchBody(pHandle->pWalReader, ppCkHead); code = walFetchBody(pHandle->pWalReader);
if (code < 0) { if (code < 0) {
*fetchOffset = offset;
code = -1;
goto END; goto END;
} }
pHead = &(pHandle->pWalReader->pHead->head);
if (isValValidForTable(pHandle, pHead)) { if (isValValidForTable(pHandle, pHead)) {
*fetchOffset = offset;
code = 0; code = 0;
goto END; goto END;
} else { } else {
offset++; offset++;
code = -1;
continue; continue;
} }
} }
} }
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead); code = walSkipFetchBody(pHandle->pWalReader);
if (code < 0) { if (code < 0) {
*fetchOffset = offset;
code = -1;
goto END; goto END;
} }
offset++; offset++;
} }
code = -1;
} }
END: END:
taosThreadMutexUnlock(&pHandle->pWalReader->mutex); *fetchOffset = offset;
return code; return code;
} }
...@@ -339,8 +332,12 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con ...@@ -339,8 +332,12 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead)); void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead); int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem); code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver); if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
} else {
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
}
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
...@@ -211,12 +211,13 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { ...@@ -211,12 +211,13 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int64_t maxVer = pTask->dataRange.range.maxVer;
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
if (!pTask->status.appendTranstateBlock) { if (!pTask->status.appendTranstateBlock) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
", not scan wal anymore, add transfer-state block into inputQ", ", not scan wal anymore, add transfer-state block into inputQ",
id, ver, pTask->dataRange.range.maxVer); id, ver, maxVer);
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
...@@ -224,7 +225,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { ...@@ -224,7 +225,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
/*int32_t code = */streamSchedExec(pTask); /*int32_t code = */streamSchedExec(pTask);
} else { } else {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal", qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
id, ver, pTask->dataRange.range.maxVer); id, ver, maxVer);
} }
} }
} }
......
...@@ -179,7 +179,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -179,7 +179,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SRpcMsg* pMsg, STqOffsetVal* offset) { SRpcMsg* pMsg, STqOffsetVal* offset) {
int code = 0; int code = 0;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SWalCkHead* pCkHead = NULL;
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0}; STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, *offset); tqInitTaosxRsp(&taosxRsp, *offset);
...@@ -216,14 +215,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -216,14 +215,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (offset->type == TMQ_OFFSET__LOG) { if (offset->type == TMQ_OFFSET__LOG) {
walReaderVerifyOffset(pHandle->pWalReader, offset); walReaderVerifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version; int64_t fetchVer = offset->version;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto end;
}
walSetReaderCapacity(pHandle->pWalReader, 2048);
int totalRows = 0; int totalRows = 0;
while (1) { while (1) {
int32_t savedEpoch = atomic_load_32(&pHandle->epoch); int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
...@@ -234,14 +226,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -234,14 +226,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
break; break;
} }
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
// setRequestVersion(&taosxRsp.reqOffset, offset->version); // setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} }
SWalCont* pHead = &pCkHead->head; SWalCont* pHead = &pHandle->pWalReader->pHead->head;
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
...@@ -291,7 +283,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -291,7 +283,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
end: end:
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code; return code;
} }
......
...@@ -757,11 +757,11 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ...@@ -757,11 +757,11 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id); qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
ASSERT(pTask->info.fillHistory == 1); ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask); code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
return code;
} }
streamFreeQitem(pTask->msgInfo.pData);
return TSDB_CODE_SUCCESS;
} }
pTask->msgInfo.retryCount = 0; pTask->msgInfo.retryCount = 0;
......
...@@ -292,9 +292,20 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -292,9 +292,20 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo: destroy the fill-history task here qError(
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related "
pTask->streamTaskId.taskId); "fill-history task",
pTask->id.idStr, pTask->streamTaskId.taskId);
// 1. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
// 2. save to disk
taosWLockLatch(&pMeta->lock);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
taosWUnLockLatch(&pMeta->lock);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} else { } else {
qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr,
...@@ -334,9 +345,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -334,9 +345,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
} }
// todo check the output queue for fill-history task, and wait for it complete
// 1. expand the query time window for stream task of WAL scanner // 1. expand the query time window for stream task of WAL scanner
pTimeWindow->skey = INT64_MIN; pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
...@@ -390,15 +398,10 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -390,15 +398,10 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel; int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__SOURCE) {
streamTaskFillHistoryFinished(pTask); streamTaskFillHistoryFinished(pTask);
}
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask); code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code;
}
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code;
}
} }
return code; return code;
...@@ -522,6 +525,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock ...@@ -522,6 +525,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
} }
// agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
pBlock->srcVgId = pTask->pMeta->vgId; pBlock->srcVgId = pTask->pMeta->vgId;
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
...@@ -530,16 +534,21 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock ...@@ -530,16 +534,21 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
} else { } else {
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
} }
} else { // level == TASK_LEVEL__SINK
streamFreeQitem((SStreamQueueItem*)pBlock);
} }
} else { // non-dispatch task, do task state transfer directly } else { // non-dispatch task, do task state transfer directly
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
ASSERT(pTask->info.fillHistory == 1); if (level != TASK_LEVEL__SINK) {
code = streamTransferStateToStreamTask(pTask); qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
}
} else {
qDebug("s-task:%s sink task does not transfer state", id);
} }
} }
...@@ -642,16 +651,6 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { ...@@ -642,16 +651,6 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
pTask->status.taskStatus == TASK_STATUS__DROPPING); pTask->status.taskStatus == TASK_STATUS__DROPPING);
} }
int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
// 1. notify all downstream tasks to transfer executor state after handle all history blocks.
appendTranstateIntoInputQ(pTask);
return TSDB_CODE_SUCCESS;
}
int32_t streamTryExec(SStreamTask* pTask) { int32_t streamTryExec(SStreamTask* pTask) {
// this function may be executed by multi-threads, so status check is required. // this function may be executed by multi-threads, so status check is required.
int8_t schedStatus = int8_t schedStatus =
...@@ -667,36 +666,14 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -667,36 +666,14 @@ int32_t streamTryExec(SStreamTask* pTask) {
} }
// todo the task should be commit here // todo the task should be commit here
// if (taosQueueEmpty(pTask->inputQueue->queue)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// fill-history WAL scan has completed qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
// if (pTask->status.transferState) { pTask->status.schedStatus);
// code = streamTransferStateToStreamTask(pTask);
// if (code != TSDB_CODE_SUCCESS) { if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamTaskShouldPause(&pTask->status))) {
// return code; streamSchedExec(pTask);
// } }
// the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by
// call this function (streamExecForAll) directly.
// code = streamExecForAll(pTask);
// if (code < 0) {
// do nothing
// }
// }
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
// streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
// } else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->status.schedStatus);
if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
}
// }
} else { } else {
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
......
...@@ -400,11 +400,6 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { ...@@ -400,11 +400,6 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
} }
pTask->status.appendTranstateBlock = true; pTask->status.appendTranstateBlock = true;
qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
streamSchedExec(pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -16,10 +16,6 @@ ...@@ -16,10 +16,6 @@
#include "taoserror.h" #include "taoserror.h"
#include "walInt.h" #include "walInt.h"
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
static int32_t walFetchBodyNew(SWalReader *pRead);
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader)); SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader));
if (pReader == NULL) { if (pReader == NULL) {
...@@ -80,19 +76,19 @@ int32_t walNextValidMsg(SWalReader *pReader) { ...@@ -80,19 +76,19 @@ int32_t walNextValidMsg(SWalReader *pReader) {
return -1; return -1;
} }
while (fetchVer <= appliedVer) { while (fetchVer <= appliedVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) { if (walFetchHead(pReader, fetchVer) < 0) {
return -1; return -1;
} }
int32_t type = pReader->pHead->head.msgType; int32_t type = pReader->pHead->head.msgType;
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
(IS_META_MSG(type) && pReader->cond.scanMeta)) { (IS_META_MSG(type) && pReader->cond.scanMeta)) {
if (walFetchBodyNew(pReader) < 0) { if (walFetchBody(pReader) < 0) {
return -1; return -1;
} }
return 0; return 0;
} else { } else {
if (walSkipFetchBodyNew(pReader) < 0) { if (walSkipFetchBody(pReader) < 0) {
return -1; return -1;
} }
...@@ -254,104 +250,8 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) { ...@@ -254,104 +250,8 @@ int32_t walReaderSeekVer(SWalReader *pReader, int64_t ver) {
return 0; return 0;
} }
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
int64_t contLen;
bool seeked = false;
wDebug("vgId:%d, wal starts to fetch head, index:%" PRId64, pRead->pWal->cfg.vgId, fetchVer);
if (pRead->curVersion != fetchVer) {
if (walReaderSeekVer(pRead, fetchVer) < 0) {
return -1;
}
seeked = true;
}
while (1) {
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) {
break;
} else if (contLen == 0 && !seeked) {
if(walReadSeekVerImpl(pRead, fetchVer) < 0){
return -1;
}
seeked = true;
continue;
} else {
if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
return -1;
}
}
// pRead->curInvalid = 0;
return 0;
}
static int32_t walFetchBodyNew(SWalReader *pReader) {
SWalCont *pReadHead = &pReader->pHead->head;
int64_t ver = pReadHead->version;
wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver,
pReadHead->bodyLen);
if (pReader->capacity < pReadHead->bodyLen) {
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReader->pHead = ptr;
pReadHead = &pReader->pHead->head;
pReader->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pReader->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver, tstrerror(terrno));
} else {
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since file corrupted",
pReader->pWal->cfg.vgId, pReader->pHead->head.version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
return -1;
}
if (walValidBodyCksum(pReader->pHead) != 0) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
wDebug("vgId:%d, index:%" PRId64 " is fetched, type:%d, cursor advance", pReader->pWal->cfg.vgId, ver, pReader->pHead->head.msgType);
pReader->curVersion = ver + 1;
return 0;
}
static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
int64_t code;
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
// pRead->curInvalid = 1;
return -1;
}
pRead->curVersion++; int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
wDebug("vgId:%d, version advance to %" PRId64 ", skip fetch", pRead->pWal->cfg.vgId, pRead->curVersion);
return 0;
}
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t code; int64_t code;
int64_t contLen; int64_t contLen;
bool seeked = false; bool seeked = false;
...@@ -369,15 +269,13 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { ...@@ -369,15 +269,13 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
if (pRead->curVersion != ver) { if (pRead->curVersion != ver) {
code = walReaderSeekVer(pRead, ver); code = walReaderSeekVer(pRead, ver);
if (code < 0) { if (code < 0) {
// pRead->curVersion = ver;
// pRead->curInvalid = 1;
return -1; return -1;
} }
seeked = true; seeked = true;
} }
while (1) { while (1) {
contLen = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead)); contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) { if (contLen == sizeof(SWalCkHead)) {
break; break;
} else if (contLen == 0 && !seeked) { } else if (contLen == 0 && !seeked) {
...@@ -392,12 +290,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { ...@@ -392,12 +290,11 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
} else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
// pRead->curInvalid = 1;
return -1; return -1;
} }
} }
code = walValidHeadCksum(pHead); code = walValidHeadCksum(pRead->pHead);
if (code != 0) { if (code != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver);
...@@ -405,32 +302,27 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { ...@@ -405,32 +302,27 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return -1; return -1;
} }
// pRead->curInvalid = 0;
return 0; return 0;
} }
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { int32_t walSkipFetchBody(SWalReader *pRead) {
int64_t code;
wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
", applied ver:%" PRId64, ", applied ver:%" PRId64,
pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer);
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); int64_t code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
// pRead->curInvalid = 1;
return -1; return -1;
} }
pRead->curVersion++; pRead->curVersion++;
return 0; return 0;
} }
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { int32_t walFetchBody(SWalReader *pRead) {
SWalCont *pReadHead = &((*ppHead)->head); SWalCont *pReadHead = &pRead->pHead->head;
int64_t ver = pReadHead->version; int64_t ver = pReadHead->version;
wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
...@@ -439,13 +331,13 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -439,13 +331,13 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
pRead->pWal->vers.appliedVer); pRead->pWal->vers.appliedVer);
if (pRead->capacity < pReadHead->bodyLen) { if (pRead->capacity < pReadHead->bodyLen) {
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(*ppHead, sizeof(SWalCkHead) + pReadHead->bodyLen); SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
*ppHead = ptr; pRead->pHead = ptr;
pReadHead = &((*ppHead)->head); pReadHead = &pRead->pHead->head;
pRead->capacity = pReadHead->bodyLen; pRead->capacity = pReadHead->bodyLen;
} }
...@@ -459,27 +351,24 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -459,27 +351,24 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
pRead->pWal->cfg.vgId, pReadHead->version, ver); pRead->pWal->cfg.vgId, pReadHead->version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
// pRead->curInvalid = 1;
return -1; return -1;
} }
if (pReadHead->version != ver) { if (pReadHead->version != ver) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pReadHead->version, ver); pReadHead->version, ver);
// pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
if (walValidBodyCksum(*ppHead) != 0) { if (walValidBodyCksum(pRead->pHead) != 0) {
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver); ver);
// pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
pRead->curVersion = ver + 1; pRead->curVersion++;
return 0; return 0;
} }
......
...@@ -460,8 +460,7 @@ class TDTestCase: ...@@ -460,8 +460,7 @@ class TDTestCase:
#self.test_db("db2", 5, 10*24*3600, 2*1024) # 2M size #self.test_db("db2", 5, 10*24*3600, 2*1024) # 2M size
# period + size # period + size
self.test_db("db", checkTime = 5*60, wal_period = 60, wal_size_kb=10) self.test_db("db", checkTime = 3*60, wal_period = 60, wal_size_kb=500)
#self.test_db("db", checkTime = 3*60, wal_period = 0, wal_size_kb=0)
def stop(self): def stop(self):
......
...@@ -776,7 +776,7 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) { ...@@ -776,7 +776,7 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
if (field->bytes > shell.args.displayWidth) { if (field->bytes > shell.args.displayWidth) {
return TMAX(shell.args.displayWidth, width); return TMAX(shell.args.displayWidth, width);
} else { } else {
return TMAX(field->bytes, width); return TMAX(field->bytes + 2, width);
} }
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
...@@ -785,7 +785,7 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) { ...@@ -785,7 +785,7 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
if (bytes > shell.args.displayWidth) { if (bytes > shell.args.displayWidth) {
return TMAX(shell.args.displayWidth, width); return TMAX(shell.args.displayWidth, width);
} else { } else {
return TMAX(bytes, width); return TMAX(bytes + 2, width);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册