未验证 提交 610f9d19 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22188 from taosdata/fix/3_liaohj

fix(stream): set the correct step2 scan time window range.
......@@ -635,13 +635,15 @@ void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
// save to b-tree meta store
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
......
......@@ -160,7 +160,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
// 2.save task
taosWLockLatch(&pSnode->pMeta->lock);
code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask);
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask);
if (code < 0) {
taosWUnLockLatch(&pSnode->pMeta->lock);
return -1;
......@@ -179,7 +179,14 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->taskId);
if (pTask == NULL) {
qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId);
return 0;
}
streamMetaUnregisterTask(pSnode->pMeta, pReq->taskId);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
}
......
......@@ -172,7 +172,6 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq);
// tq util
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
......
......@@ -1041,12 +1041,13 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 2.save task, use the newest commit version as the initial start version of stream task.
int32_t taskId = 0;
taosWLockLatch(&pStreamMeta->lock);
code = streamMetaAddDeployedTask(pStreamMeta, sversion, pTask);
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask);
taskId = pTask->id.taskId;
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks);
tFreeStreamTask(pTask);
taosWUnLockLatch(&pStreamMeta->lock);
return -1;
}
......@@ -1136,7 +1137,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
pTask->streamTaskId.taskId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
tqDebug("s-task:%s scan-history-task set status to be dropping", id);
tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamMetaSaveTask(pMeta, pTask);
streamMetaReleaseTask(pMeta, pTask);
......@@ -1166,12 +1167,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
if (!streamTaskRecoverScanStep1Finished(pTask)) {
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s",
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, id);
STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
", do secondary scan-history data after halt the related stream task:%s",
id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id);
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING);
st = taosGetTimestampMs();
streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window);
streamSetParamForStreamScannerStep2(pTask, pRange, pWindow);
}
if (!streamTaskRecoverScanStep2Finished(pTask)) {
......@@ -1259,6 +1262,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
......@@ -1466,8 +1470,14 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId);
return 0;
}
streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
streamMetaUnregisterTask(pTq->pStreamMeta, pReq->taskId);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
......
......@@ -336,6 +336,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) {
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
} else {
ASSERT(0);
}
......
......@@ -20,21 +20,6 @@
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) {
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
if (code < 0) {
tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr);
return -1;
}
if (streamSchedExec(pTask) < 0) {
tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno));
return -1;
}
return TSDB_CODE_SUCCESS;
}
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
pRsp->reqOffset = pReq->reqOffset;
......
......@@ -3179,6 +3179,16 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
// load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
if (pScanInfo == NULL) {
tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr);
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
continue;
}
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
// reset the index in last block when handing a new file
doCleanupTableScanInfo(pScanInfo);
......
......@@ -1774,6 +1774,67 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
}
}
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey);
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*) colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);
if (!p[i]) {
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
taosMemoryFree(p);
}
}
// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
if (skey == INT64_MIN) {
return;
}
int32_t numOfRows = pBlock->info.rows;
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
for (int32_t i = 0; i < numOfRows; i++) {
if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
}
if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
taosMemoryFree(p);
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -1800,8 +1861,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} else {
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion, id);
pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
pTSInfo->base.cond.twindows.ekey, id);
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN2;
}
......@@ -1920,6 +1983,7 @@ FETCH_NEXT_BLOCK:
if (pInfo->pUpdateInfo) {
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
}
blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) {
case STREAM_NORMAL:
......@@ -1942,7 +2006,9 @@ FETCH_NEXT_BLOCK:
} else {
pDelBlock = pBlock;
}
setBlockGroupIdByUid(pInfo, pDelBlock);
rebuildDeleteBlockData(pDelBlock, pStreamInfo->fillHistoryWindow.skey, id);
printDataBlock(pDelBlock, "stream scan delete recv filtered");
if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) {
......@@ -1950,6 +2016,7 @@ FETCH_NEXT_BLOCK:
}
goto FETCH_NEXT_BLOCK;
}
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
......@@ -2091,39 +2158,15 @@ FETCH_NEXT_BLOCK:
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
{ // do additional time window filter
STimeWindow* pWindow = &pStreamInfo->fillHistoryWindow;
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey);
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*) colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);
if (!p[i]) {
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
taosMemoryFree(p);
}
}
// apply additional time window filter
doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows,
pInfo->pUpdateDataRes->info.rows);
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) {
break;
}
}
......
......@@ -65,7 +65,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param;
int8_t status = atomic_load_8(&pTask->triggerStatus);
qDebug("s-task:%s in scheduler timer, trigger status:%d", pTask->id.idStr, status);
qDebug("s-task:%s in scheduler, trigger status:%d, next:%dms", pTask->id.idStr, status, (int32_t)pTask->triggerParam);
if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
streamMetaReleaseTask(NULL, pTask);
......@@ -74,23 +74,22 @@ static void streamSchedByTimer(void* param, void* tmrId) {
}
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
if (trigger == NULL) {
SStreamTrigger* pTrigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
if (pTrigger == NULL) {
return;
}
trigger->type = STREAM_INPUT__GET_RES;
trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (trigger->pBlock == NULL) {
taosFreeQitem(trigger);
pTrigger->type = STREAM_INPUT__GET_RES;
pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pTrigger->pBlock == NULL) {
taosFreeQitem(pTrigger);
return;
}
trigger->pBlock->info.type = STREAM_GET_ALL;
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) {
taosFreeQitem(trigger);
pTrigger->pBlock->info.type = STREAM_GET_ALL;
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTrigger) < 0) {
taosFreeQitem(pTrigger);
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->schedTimer);
return;
}
......@@ -102,7 +101,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
}
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) {
if (pTask->triggerParam != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
ASSERT(ref == 2 && pTask->schedTimer == NULL);
......@@ -399,6 +398,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
qDebug("s-task:%s new data arrived, active the trigger, trigerStatus:%d", pTask->id.idStr, pTask->triggerStatus);
}
return 0;
......
......@@ -166,6 +166,8 @@ int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubm
}
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
terrno = 0;
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
......@@ -181,7 +183,10 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
// todo handle error
if (pMerged == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
......@@ -189,6 +194,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
taosFreeQitem(pElem);
return (SStreamQueueItem*)pMerged;
} else {
qDebug("block type:%d not merged with existed blocks list, type:%d", pElem->type, dst->type);
return NULL;
}
}
......
......@@ -407,12 +407,14 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskResumeFromHalt(pStreamTask);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
int32_t taskId = pTask->id.taskId;
// free it and remove it from disk meta-store
streamMetaUnregisterTask(pMeta, taskId);
// save to disk
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pTask);
streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
......@@ -464,7 +466,12 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = streamMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
if (terrno == 0) {
qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
} else {
qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
tstrerror(terrno));
}
streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS;
}
......@@ -492,6 +499,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {
int32_t batchSize = 0;
SStreamQueueItem* pInput = NULL;
if (streamTaskShouldStop(&pTask->status)) {
qDebug("s-task:%s stream task stopped, abort", id);
break;
}
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
......
......@@ -217,6 +217,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
tEncoderClear(&encoder);
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
qError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno));
return -1;
}
......@@ -224,8 +225,19 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0;
}
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
int32_t code = tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(taskId), pMeta->txn);
if (code != 0) {
qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, taskId, tstrerror(terrno));
} else {
qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, taskId);
}
return code;
}
// add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
......@@ -233,6 +245,8 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
return -1;
}
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
if (streamMetaSaveTask(pMeta, pTask) < 0) {
tFreeStreamTask(pTask);
return -1;
......@@ -242,7 +256,6 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
tFreeStreamTask(pTask);
return -1;
}
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
} else {
return 0;
}
......@@ -281,6 +294,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
} else if (ref == 0) {
ASSERT(streamTaskShouldStop(&pTask->status));
qTrace("s-task:%s all refs are gone, free it", pTask->id.idStr);
tFreeStreamTask(pTask);
} else if (ref < 0) {
qError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr);
......@@ -297,7 +311,7 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId)
}
}
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask* pTask = NULL;
// pre-delete operation
......@@ -309,7 +323,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
} else {
qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
taosWUnLockLatch(&pMeta->lock);
return;
return 0;
}
taosWUnLockLatch(&pMeta->lock);
......@@ -339,27 +353,26 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
ASSERT(pTask->status.timerActive == 0);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1);
doRemoveIdFromList(pMeta, num, pTask->id.taskId);
// remove the ref by timer
if (pTask->triggerParam != 0) {
taosTmrStop(pTask->schedTimer);
streamMetaReleaseTask(pMeta, pTask);
}
streamMetaRemoveTask(pMeta, taskId);
streamMetaReleaseTask(pMeta, pTask);
} else {
qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
}
taosWUnLockLatch(&pMeta->lock);
return 0;
}
int32_t streamMetaBegin(SStreamMeta* pMeta) {
......@@ -404,7 +417,9 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) {
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
return -1;
}
......@@ -413,6 +428,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
void* pVal = NULL;
int32_t vLen = 0;
SDecoder decoder;
SArray* pRecycleList = taosArrayInit(4, sizeof(int32_t));
tdbTbcMoveToFirst(pCur);
......@@ -422,6 +438,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
taosArrayDestroy(pRecycleList);
return -1;
}
......@@ -429,16 +446,29 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tDecodeStreamTask(&decoder, pTask);
tDecoderClear(&decoder);
// remove duplicate
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
int32_t taskId = pTask->id.taskId;
tFreeStreamTask(pTask);
taosArrayPush(pRecycleList, &taskId);
int32_t total = taosArrayGetSize(pRecycleList);
qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
continue;
}
// do duplicate task check.
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
taosMemoryFree(pTask);
tFreeStreamTask(pTask);
taosArrayDestroy(pRecycleList);
return -1;
}
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
} else {
tdbFree(pKey);
......@@ -452,7 +482,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
taosMemoryFree(pTask);
tFreeStreamTask(pTask);
taosArrayDestroy(pRecycleList);
return -1;
}
......@@ -462,8 +493,18 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
tdbFree(pKey);
tdbFree(pVal);
if (tdbTbcClose(pCur) < 0) {
taosArrayDestroy(pRecycleList);
return -1;
}
if (taosArrayGetSize(pRecycleList) > 0) {
for(int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
int32_t taskId = *(int32_t*) taosArrayGet(pRecycleList, i);
streamMetaRemoveTask(pMeta, taskId);
}
}
qDebug("vgId:%d load %d task from disk", pMeta->vgId, (int32_t) taosArrayGetSize(pMeta->pTaskList));
taosArrayDestroy(pRecycleList);
return 0;
}
......@@ -765,7 +765,13 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
} else {
SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey + 1;
int64_t ekey = 0;
if (pRange->window.ekey < INT64_MAX) {
ekey = pRange->window.ekey + 1;
} else {
ekey = pRange->window.ekey;
}
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
......
......@@ -211,7 +211,7 @@ static void freeItem(void* p) {
}
void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s", pTask->id.idStr);
qDebug("free s-task:%s, %p", pTask->id.idStr, pTask);
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) {
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/cfg.sh -n dnode1 -c keepColumnName -v 1
system sh/exec.sh -n dnode1 -s start
sleep 5000
sleep 1000
sql connect
print ========== interval\session\state window
......@@ -32,7 +30,6 @@ sql create stream streamd6 into streamt6 as select ca, _wstart,_wend, count(*),
sql alter local 'keepColumnName' '1'
sql CREATE STABLE `meters_test_data` (`ts` TIMESTAMP, `close` FLOAT, `parttime` TIMESTAMP, `parttime_str` VARCHAR(32)) TAGS (`id` VARCHAR(32));
sql_error create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str);
......@@ -58,17 +55,13 @@ sql_error create stream streamd11 into streamd11 as select _wstart, _wend, count
sql alter local 'keepColumnName' '0'
sql create stream realtime_meters fill_history 1 into realtime_meters as select last(parttime),first(close),last(close) from meters_test_data partition by tbname state_window(parttime_str);
sql desc realtime_meters;
if $rows == 0 then
return -1
endi
sql create stream streamd7 into streamt7 as select _wstart, _wend, count(*), first(ca), last(ca) from t1 interval(10s);
sql create stream streamd7 into streamt7 as select _wstart t1, _wend t2, count(*), first(ca), last(ca) from t1 interval(10s);
sql desc streamt7;
if $rows == 0 then
return -1
endi
......@@ -76,12 +69,11 @@ endi
sql create stream streamd71 into streamt71 as select _wstart, _wend, count(*) as ca, first(ca), last(ca) as c2 from t1 interval(10s);
sql desc streamt71;
if $rows == 0 then
return -1
endi
sleep 3000
sleep 1000
sql drop stream if exists streamd1;
sql drop stream if exists streamd2;
......@@ -93,23 +85,19 @@ sql drop stream if exists streamd6;
sql create stream streamd10 into streamd10 as select _wstart, _wend, count(*), first(ca), last(cb) as c2 from t1 interval(10s);
sql desc streamd10;
if $rows == 0 then
return -1
endi
sql_error create stream streamd11 into streamd11 as select _wstart, _wend, count(*), last(ca), last(ca) from t1 interval(10s);
sql create stream streamd12 into streamd12 as select _wstart, _wend, count(*), last(ca), last(cb) as c2 from t1 interval(10s);
sql desc streamd12;
if $rows == 0 then
return -1
endi
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print =============== check
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册