提交 6a3c935b 编写于 作者: H Haojun Liao

fix(stream): fix error in fill history scan.

上级 0dd93301
......@@ -221,7 +221,7 @@ void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo);
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver, int64_t ekey);
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
......
......@@ -233,28 +233,6 @@ typedef struct SStoreSnapshotFn {
int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
} SStoreSnapshotFn;
/**
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderReleaseLock(SMetaReader *pReader);
void metaReaderClear(SMetaReader *pReader);
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
bool *acquired);
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen, double selectivityRatio);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen);
*/
typedef struct SStoreMeta {
SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor
void (*closeTableMetaCursor)(SMTbCursor *pTbCur); // metaCloseTbCursor
......
......@@ -557,18 +557,19 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
// common
int32_t streamSetParamForRecover(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
// source level
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver, int64_t ekey);
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask);
int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask);
// agg level
int32_t streamAggRecoverPrepare(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
......
......@@ -25,7 +25,7 @@
#define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode;
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup);
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
......@@ -101,7 +101,7 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
return 0;
}
int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList, SStreamTask* pTask) {
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList, SStreamTask* pTask) {
bool isShuffle = false;
if (pStream->fixedSinkVgId == 0) {
......@@ -202,7 +202,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
}
// create sink node for each vgroup.
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream) {
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, int32_t fillHistory) {
SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL;
......@@ -218,15 +218,15 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea
continue;
}
mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup);
mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, fillHistory);
sdbRelease(pSdb, pVgroup);
}
return 0;
}
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) {
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->conf.fillHistory, 0, pTaskList);
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory) {
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -246,14 +246,15 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
return terrno;
}
if (fillHistory) { // todo set the correct ts, which should be last key of queried table.
pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = taosGetTimestampMs();
}
// todo set the correct ts, which should be last key of queried table.
pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs();
mDebug("0x%x----------------window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
// sink or dispatch
if (hasExtraSink) {
mndAddDispatcherForInnerTask(pMnode, pStream, pSinkTaskList, pTask);
mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, pTask);
} else {
mndSetSinkTaskInfo(pStream, pTask);
}
......@@ -318,11 +319,12 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
(*pStreamTask)->historyTaskId.taskId = (*pHTask)->id.taskId;
(*pStreamTask)->historyTaskId.streamId = (*pHTask)->id.streamId;
mDebug("s-task:0x%x related history task:0x%x", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId);
}
}
static int32_t addSourceTasksForSingleLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream,
bool hasExtraSink) {
static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream,
bool hasExtraSink) {
// create exec stream task, since only one level, the exec task is also the source task
SArray* pTaskList = addNewTaskList(pStream->tasks);
......@@ -368,8 +370,8 @@ static int32_t addSourceTasksForSingleLevelStream(SMnode* pMnode, const SQueryPl
if (pStream->conf.fillHistory) {
SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL);
code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, pStream->conf.fillHistory,
hasExtraSink);
code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid,
pStream->conf.fillHistory, hasExtraSink);
setHTasksId(pTaskList, pHTaskList);
}
......@@ -390,6 +392,13 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
return -1;
}
// todo set the correct ts, which should be last key of queried table.
pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs();
mDebug("0x%x----------------window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
// all the source tasks dispatch result to a single agg node.
setFixedDownstreamEpInfo(pTask, pDownstreamTask);
if (mndAssignStreamTaskToVgroup(pMnode, pTask, pPlan, pVgroup) < 0) {
......@@ -399,38 +408,38 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
return setEpToDownstreamTask(pTask, pDownstreamTask);
}
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, int32_t fillHistory,
SStreamTask** pAggTask) {
*pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pStream->conf.fillHistory,
pStream->conf.triggerParam, pTaskList);
*pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList);
if (*pAggTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
// dispatch
if (mndAddDispatcherForInnerTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) {
if (mndAddDispatcherForInternalTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) {
return -1;
}
return 0;
}
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan) {
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SStreamTask** pAggTask,
SStreamTask** pHAggTask) {
SArray* pAggTaskList = addNewTaskList(pStream->tasks);
SSdb* pSdb = pMnode->pSdb;
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
SNodeListNode* pInnerNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
SSubplan* plan = (SSubplan*)nodesListGetNode(pInnerNode->pNodeList, 0);
if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
SStreamTask* pAggTask = NULL;
SArray* pSinkNodeList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL);
*pAggTask = NULL;
SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, &pAggTask);
int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, 0, pAggTask);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
......@@ -448,16 +457,17 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
}
if (pSnode != NULL) {
code = mndAssignStreamTaskToSnode(pMnode, pAggTask, plan, pSnode);
code = mndAssignStreamTaskToSnode(pMnode, *pAggTask, plan, pSnode);
} else {
code = mndAssignStreamTaskToVgroup(pMnode, pAggTask, plan, pVgroup);
code = mndAssignStreamTaskToVgroup(pMnode, *pAggTask, plan, pVgroup);
}
if (pStream->conf.fillHistory) {
SArray* pHAggTaskList = addNewTaskList(pStream->pHTasksList);
SArray* pHSinkNodeList = taosArrayGetP(pStream->pHTasksList, SINK_NODE_LEVEL);
SStreamTask* pHAggTask = NULL;
code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, &pHAggTask);
*pHAggTask = NULL;
code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pStream->conf.fillHistory, pHAggTask);
if (code != TSDB_CODE_SUCCESS) {
if (pSnode != NULL) {
sdbRelease(pSdb, pSnode);
......@@ -468,9 +478,9 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
}
if (pSnode != NULL) {
code = mndAssignStreamTaskToSnode(pMnode, pHAggTask, plan, pSnode);
code = mndAssignStreamTaskToSnode(pMnode, *pHAggTask, plan, pSnode);
} else {
code = mndAssignStreamTaskToVgroup(pMnode, pHAggTask, plan, pVgroup);
code = mndAssignStreamTaskToVgroup(pMnode, *pHAggTask, plan, pVgroup);
}
setHTasksId(pAggTaskList, pHAggTaskList);
......@@ -515,7 +525,7 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
continue;
}
int32_t code = doAddSourceTask(pSourceTaskList, pStream->conf.fillHistory, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup);
int32_t code = doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
terrno = code;
......@@ -523,7 +533,7 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
}
if (pStream->conf.fillHistory) {
code = doAddSourceTask(pHSourceTaskList, 0, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup);
code = doAddSourceTask(pHSourceTaskList, pStream->conf.fillHistory, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup);
sdbRelease(pSdb, pVgroup);
if (code != TSDB_CODE_SUCCESS) {
......@@ -537,15 +547,15 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
return TSDB_CODE_SUCCESS;
}
static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList) {
static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList, int32_t fillHistory) {
SArray* pSinkTaskList = addNewTaskList(pTasksList);
if (pStream->fixedSinkVgId == 0) {
if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream) < 0) {
if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, fillHistory) < 0) {
// TODO free
return -1;
}
} else {
if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) {
if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, fillHistory) < 0) {
// TODO free
return -1;
}
......@@ -578,7 +588,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
hasExtraSink = true;
SArray* pSinkTaskList = NULL;
int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList);
int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList, 0);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -586,7 +596,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
// check for fill history
if (pStream->conf.fillHistory) {
SArray* pHSinkTaskList = NULL;
code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList);
code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, pStream->conf.fillHistory);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -598,16 +608,18 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
pStream->totalLevel = numOfPlanLevel + hasExtraSink;
if (numOfPlanLevel > 1) {
SStreamTask* pInnerTask;
int32_t code = addAggTask(pStream, pMnode, pPlan);
SStreamTask* pAggTask = NULL;
SStreamTask* pHAggTask = NULL;
int32_t code = addAggTask(pStream, pMnode, pPlan, &pAggTask, &pHAggTask);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// source level
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pInnerTask, NULL);
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask);
} else if (numOfPlanLevel == 1) {
return addSourceTasksForSingleLevelStream(pMnode, pPlan, pStream, hasExtraSink);
return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink);
}
return 0;
......
......@@ -101,7 +101,7 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
// meta
void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI);
void metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI);
void metaReaderReleaseLock(SMetaReader *pReader);
void metaReaderClear(SMetaReader *pReader);
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
......@@ -125,8 +125,6 @@ int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey,
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen);
int64_t metaGetTbNum(SMeta *pMeta);
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables);
// tsdb
......
......@@ -175,7 +175,8 @@ void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta);
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
int64_t metaGetTbNum(SMeta *pMeta);
void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
......
......@@ -17,13 +17,13 @@
#include "osMemory.h"
#include "tencode.h"
void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) {
void metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
metaReaderInit(pReader, pMeta, flags);
metaReaderDoInit(pReader, pMeta, flags);
pReader->pAPI = pAPI;
}
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
memset(pReader, 0, sizeof(*pReader));
pReader->pMeta = pMeta;
pReader->flags = flags;
......@@ -143,7 +143,7 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
......@@ -159,7 +159,7 @@ int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
metaReaderDoInit(&mr, (SMeta *)meta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
......@@ -174,7 +174,7 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
SMetaReader *pReader = &mr;
......@@ -195,7 +195,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
code = metaGetTableEntryByName(&mr, tbName);
if (code == 0) *tbType = mr.me.type;
......@@ -222,7 +222,7 @@ SMTbCursor *metaOpenTbCursor(void *pVnode) {
}
SVnode* pVnodeObj = pVnode;
metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0);
metaReaderDoInit(&pTbCur->mr, pVnodeObj->pMeta, 0);
tdbTbcOpen(pVnodeObj->pMeta->pUidIdx, (TBC **)&pTbCur->pDbc, NULL);
......@@ -760,7 +760,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
}
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
int64_t smaId;
int smaIdx = 0;
STSma *pTSma = NULL;
......@@ -815,7 +815,7 @@ _err:
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
STSma *pTSma = NULL;
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
if (metaReaderGetTableEntryByUid(&mr, indexUid) < 0) {
metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid);
metaReaderClear(&mr);
......
......@@ -37,7 +37,7 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
// validate req
// save smaIndex
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
if (metaReaderGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) {
#if 1
terrno = TSDB_CODE_TSMA_ALREADY_EXIST;
......
......@@ -709,7 +709,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
}
// validate req
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
if (pReq->type == TSDB_CHILD_TABLE && pReq->ctb.suid != mr.me.ctbEntry.suid) {
terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
......
......@@ -896,7 +896,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
metaReaderInit(&mr, SMA_META(pSma), 0);
metaReaderDoInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
if (metaReaderGetTableEntryByUidCache(&mr, pInfo->suid) < 0) {
code = terrno;
......@@ -1116,7 +1116,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
}
int64_t nRsmaTables = 0;
metaReaderInit(&mr, SMA_META(pSma), 0);
metaReaderDoInit(&mr, SMA_META(pSma), 0);
if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
......
......@@ -812,11 +812,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta;
pTask->chkInfo.version = ver;
pTask->chkInfo.currentVer = ver;
pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver;
// expand executor
pTask->status.taskStatus = (pTask->fillHistory) ? TASK_STATUS__WAIT_DOWNSTREAM : TASK_STATUS__NORMAL;
pTask->status.taskStatus = /*(pTask->fillHistory) ? */TASK_STATUS__WAIT_DOWNSTREAM /*: TASK_STATUS__NORMAL*/;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
......@@ -919,16 +923,12 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64
") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d",
pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus,
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%d, rsp status %d",
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, pTask->status.taskStatus, rsp.status);
} else {
rsp.status = 0;
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
") %d at node %d, check req from task:0x%x at node %d, rsp status %d",
taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId,
rsp.status);
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
SEncoder encoder;
......@@ -969,12 +969,12 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32
}
tDecoderClear(&decoder);
tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d",
tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d (vgId:%d) check req from task:0x%x (vgId:%d), status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
if (pTask == NULL) {
tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId,
tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
pTq->pStreamMeta->vgId);
return -1;
}
......@@ -1027,13 +1027,27 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
taosWUnLockLatch(&pStreamMeta->lock);
// 3. for fill history task, do nothing. wait for the main task to start it
// 3. It's an fill history task, do nothing. wait for the main task to start it
if (pTask->fillHistory) {
tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
} else {
// calculate the correct start time window, and start the handle the history data for the main task.
if (pTask->historyTaskId.taskId != 0) {
// launch the history fill stream task
streamTaskStartHistoryTask(pTask, sversion);
// launch current task
SHistoryDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey;
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
pRange->window.ekey = INT64_MAX;
pRange->range.minVer = 0;
pRange->range.maxVer = ver;
}
streamTaskCheckDownstreamTasks(pTask);
}
tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
......@@ -1043,20 +1057,24 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
}
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
int32_t code;
int32_t code = TSDB_CODE_SUCCESS;
char* msg = pMsg->pCont;
int32_t msgLen = pMsg->contLen;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
pMeta->vgId, pReq->taskId);
return -1;
}
// check param
int64_t fillVer1 = pTask->chkInfo.version;
if (fillVer1 <= 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
streamMetaReleaseTask(pMeta, pTask);
return -1;
}
......@@ -1068,14 +1086,27 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
// todo transfer the executor status, and then destroy this stream task
if (pTask->fillHistory) {
// todo transfer the executor status, and then destroy this stream task
} else {
// todo update the chkInfo version for current task.
// this task has an associated history stream task, so we need to scan wal from the end version of
// history scan. The current version of chkInfo.current is not updated during the history scan
tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
", window:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->chkInfo.currentVer, pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
code = streamTaskScanHistoryDataComplete(pTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
#if 0
// build msg to launch next step
......@@ -1162,7 +1193,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
// dispatch recover finish req to all related downstream task
code = streamDispatchRecoverFinishReq(pTask);
code = streamDispatchRecoverFinishMsg(pTask);
if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1;
......
......@@ -114,7 +114,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
}
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0);
metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0);
if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
metaReaderClear(&mr);
......
......@@ -48,7 +48,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
metaReaderDoInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success
if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
......
......@@ -311,7 +311,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tbData.uid = pTableSinkInfo->uid;
} else {
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
metaReaderDoInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr);
taosMemoryFree(pTableSinkInfo);
......
......@@ -1431,7 +1431,7 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
tb_uid_t suid = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, 0);
if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
metaReaderClear(&mr); // table not esist
return 0;
......
......@@ -5452,7 +5452,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
......@@ -5584,57 +5584,3 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
/*-------------todo:refactor the implementation of those APIs in this file to seperate the API into two files------*/
// opt perf, do NOT create so many readers
int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr) {
SQueryTableDataCond cond = {.type = TIMEWINDOW_RANGE_CONTAINED, .numOfCols = 1, .order = TSDB_ORDER_DESC,
.startVersion = -1, .endVersion = -1};
cond.twindows.skey = INT64_MIN;
cond.twindows.ekey = INT64_MAX;
cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
cond.pSlotList = taosMemoryMalloc(sizeof(int32_t) * cond.numOfCols);
if (cond.colList == NULL || cond.pSlotList == NULL) {
// todo
}
cond.colList[0].colId = 1;
cond.colList[0].slotId = 0;
cond.colList[0].type = TSDB_DATA_TYPE_TIMESTAMP;
cond.pSlotList[0] = 0;
STableKeyInfo* pTableKeyInfo = pTableList;
STsdbReader* pReader = NULL;
SSDataBlock* pBlock = createDataBlock();
SColumnInfoData data = {0};
data.info = (SColumnInfo) {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .bytes = TSDB_KEYSIZE};
blockDataAppendColInfo(pBlock, &data);
int64_t key = INT64_MIN;
for(int32_t i = 0; i < numOfTables; ++i) {
int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, (void**)&pReader, pIdStr, false, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
bool hasData = false;
code = tsdbNextDataBlock(pReader, &hasData);
if (!hasData || code != TSDB_CODE_SUCCESS) {
continue;
}
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
int64_t k = *(int64_t*)pCol->pData;
if (key < k) {
key = k;
}
tsdbReaderClose(pReader);
}
return 0;
}
......@@ -204,7 +204,7 @@ void initStateStoreAPI(SStateStore* pStore) {
}
void initMetaReaderAPI(SStoreMetaReader* pMetaReader) {
pMetaReader->initReader = _metaReaderInit;
pMetaReader->initReader = metaReaderInit;
pMetaReader->clearReader = metaReaderClear;
pMetaReader->getTableEntryByUid = metaReaderGetTableEntryByUid;
......
......@@ -62,7 +62,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
}
// query meta
metaReaderInit(&mer1, pVnode->pMeta, 0);
metaReaderDoInit(&mer1, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) {
code = terrno;
......@@ -79,7 +79,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
schemaTag = mer1.me.stbEntry.schemaTag;
metaRsp.suid = mer1.me.uid;
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
metaReaderInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
strcpy(metaRsp.stbName, mer2.me.name);
......@@ -175,7 +175,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
}
// query meta
metaReaderInit(&mer1, pVnode->pMeta, 0);
metaReaderDoInit(&mer1, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mer1, cfgReq.tbName) < 0) {
code = terrno;
......@@ -188,7 +188,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
code = TSDB_CODE_VND_HASH_MISMATCH;
goto _exit;
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
metaReaderInit(&mer2, pVnode->pMeta, 0);
metaReaderDoInit(&mer2, pVnode->pMeta, 0);
if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
strcpy(cfgRsp.stbName, mer2.me.name);
......
......@@ -564,24 +564,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO:
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
case TDMT_STREAM_TASK_CHECK:
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
case TDMT_STREAM_RECOVER_FINISH:
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RECOVER_FINISH_RSP:
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
......@@ -1651,7 +1633,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
tDecodeSBatchDeleteReq(&decoder, &deleteReq);
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
for (int32_t i = 0; i < sz; i++) {
......
......@@ -64,8 +64,10 @@ typedef struct {
int8_t recoverStep;
int8_t recoverScanFinished;
SQueryTableDataCond tableCond;
int64_t fillHistoryVer1;
int64_t fillHisotryeKey1;
SVersionRange fillHistoryVer;
STimeWindow fillHistoryWindow;
// int64_t fillHistoryVer1;
// int64_t fillHisotryeKey1;
int64_t fillHistoryVer2;
SStreamState* pState;
int64_t dataVersion;
......
......@@ -869,11 +869,11 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
}
}
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver, int64_t ekey) {
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.fillHistoryVer1 = ver;
pTaskInfo->streamInfo.fillHisotryeKey1 = ekey;
pTaskInfo->streamInfo.fillHistoryVer = *pVerRange;
pTaskInfo->streamInfo.fillHistoryWindow = *pWindow;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
return 0;
}
......
......@@ -1785,14 +1785,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
pTSInfo->base.cond.startVersion = 0;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64 ", %s", pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion, id);
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer.maxVer;
pTSInfo->base.cond.twindows = pTaskInfo->streamInfo.fillHistoryWindow;
qDebug("stream recover step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
} else {
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer + 1;
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion,
pTSInfo->base.cond.endVersion, id);
......@@ -2085,8 +2088,11 @@ FETCH_NEXT_BLOCK:
return pInfo->pCreateTbRes;
}
// todo apply time window range filter
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
......
......@@ -48,7 +48,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
......
......@@ -164,7 +164,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
}
buf = NULL;
qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId:0x%" PRIx64, pTask->id.idStr,
qDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
}
code = 0;
......@@ -238,14 +238,14 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
msg.pCont = buf;
msg.msgType = TDMT_STREAM_TASK_CHECK;
qDebug("s-task:%s dispatch check msg to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr,
qDebug("s-task:%s dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
pReq->streamId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg);
return 0;
}
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
......@@ -283,8 +283,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
msg.info.noResp = 1;
tmsgSendReq(pEpSet, &msg);
qDebug("s-task:%s dispatch recover finish msg to downstream taskId:0x%x node %d: recover finish msg", pTask->id.idStr,
pReq->taskId, vgId);
qDebug("s-task:%s dispatch recover finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId);
return 0;
}
......
......@@ -15,16 +15,30 @@
#include "streamInc.h"
#include "ttimer.h"
#include "wal.h"
const char* streamGetTaskStatusStr(int32_t status) {
switch(status) {
case TASK_STATUS__NORMAL: return "normal";
case TASK_STATUS__WAIT_DOWNSTREAM: return "wait-for-downstream";
case TASK_STATUS__RECOVER_PREPARE: return "scan-history-prepare";
case TASK_STATUS__RECOVER1: return "scan-history-data";
default:return "";
}
}
int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId);
qDebug("s-task:%s (vgId:%d) launch recover", pTask->id.idStr, pTask->nodeId);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE);
qDebug("s-task:%s set task status:%d and start to recover", pTask->id.idStr, pTask->status.taskStatus);
SVersionRange* pRange = &pTask->dataRange.range;
qDebug("s-task:%s set task status:%s and start to recover, ver:%" PRId64 "-%" PRId64, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->dataRange.range.minVer,
pTask->dataRange.range.maxVer);
streamSetParamForRecover(pTask);
streamSourceRecoverPrepareStep1(pTask, pTask->dataRange.range.maxVer, pTask->dataRange.window.ekey);
streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window);
SStreamRecoverStep1Req req;
streamBuildSourceRecover1Req(pTask, &req);
......@@ -43,12 +57,12 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
}
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
streamSetStatusNormal(pTask);
streamSetParamForRecover(pTask);
streamAggRecoverPrepare(pTask);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
// sink nodes has no specified operation for fill history
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
streamSetStatusNormal(pTask);
qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
}
return 0;
......@@ -56,8 +70,9 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
// check status
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
qDebug("s-task:%s in fill history stage, ver:%"PRId64" ekey:%"PRId64, pTask->id.idStr, pTask->dataRange.range.maxVer,
pTask->dataRange.window.ekey);
qDebug("s-task:%s in fill history stage, ver:%" PRId64 "-%"PRId64" window:%" PRId64"-%"PRId64, pTask->id.idStr,
pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer, pTask->dataRange.window.skey,
pTask->dataRange.window.ekey);
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
......@@ -74,7 +89,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
pTask->checkReqId = req.reqId;
qDebug("s-task:%s at node %d check downstream task:0x%x at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
req.downstreamNodeId);
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
......@@ -90,7 +105,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
taosArrayPush(pTask->checkReqIds, &req.reqId);
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle)", pTask->id.idStr, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId);
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
......@@ -113,7 +128,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
.childId = pRsp->childId,
};
qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (recheck)", pTask->id.idStr, pTask->nodeId,
qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
......@@ -139,9 +154,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) {
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status);
const char* id = pTask->id.idStr;
if (pRsp->status == 1) {
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
......@@ -167,21 +180,25 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL;
qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", pTask->id.idStr, numOfReqs);
qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", id, numOfReqs);
streamTaskLaunchRecover(pTask);
} else {
qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, remain not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, left);
}
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
if (pRsp->reqId != pTask->checkReqId) {
return -1;
}
qDebug("s-task:%s fixed downstream tasks is ready, now enter into recover stage", id);
streamTaskLaunchRecover(pTask);
} else {
ASSERT(0);
}
} else { // not ready, wait for 100ms and retry
qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} else { // not ready, wait for 100ms and retry
qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
pRsp->downstreamNodeId);
taosMsleep(100);
streamRecheckOneDownstream(pTask, pRsp);
......@@ -206,9 +223,9 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) {
}
// source
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver, int64_t ekey) {
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
void* exec = pTask->exec.pExecutor;
return qStreamSourceRecoverStep1(exec, ver, ekey);
return qStreamSourceRecoverStep1(exec, pVerRange, pWindow);
}
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) {
......@@ -246,23 +263,23 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
return code;
}
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask) {
SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->selfChildId };
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d, status:%d", pTask->id.idStr,
qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr,
pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus);
req.taskId = pTask->fixedEpDispatcher.taskId;
streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
streamDoDispatchRecoverFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t vgSz = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < vgSz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.taskId = pVgInfo->taskId;
streamDispatchOneRecoverFinishReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
streamDoDispatchRecoverFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
}
return 0;
......@@ -271,7 +288,8 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) {
// agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo);
qDebug("s-task:%s wait for %d upstreams", pTask->id.idStr, pTask->recoverWaitingUpstream);
qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete fill history procedure", pTask->id.idStr,
pTask->recoverWaitingUpstream);
return 0;
}
......@@ -303,8 +321,8 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
pHTask->dataRange.range.minVer = 0;
pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;
qDebug("s-task:%s set the launch condition for fill history task:%s, window:%" PRId64 " - %" PRId64
" verrange:%" PRId64 " - %" PRId64,
qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 "-%" PRId64
" verrange:%" PRId64 "-%" PRId64,
pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
......@@ -321,7 +339,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
pMeta->vgId, pTask->historyTaskId.taskId);
taosTmrReset(tryLaunchHistoryTask, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
taosTmrReset(tryLaunchHistoryTask, 100, pTask, streamEnv.timer, &pTask->timer);
return;
}
......@@ -357,6 +375,42 @@ int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver) {
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
qDebug("s-task:%s set start wal scan start ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
ASSERT(walReaderGetCurrentVer(pTask->exec.pWalReader) == -1);
// walReaderSeekVer(pTask->exec.pWalReader, sversion);
// pTask->chkInfo.currentVer = sversion;
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
return 0;
}
// restore param
int32_t code = streamRestoreParam(pTask);
if (code < 0) {
return -1;
}
// dispatch recover finish req to all related downstream task
code = streamDispatchRecoverFinishMsg(pTask);
if (code < 0) {
return -1;
}
// set status normal
qDebug("s-task:%s set the status to be normal, and start wal scan", pTask->id.idStr);
code = streamSetStatusNormal(pTask);
if (code < 0) {
return -1;
}
streamMetaSaveTask(pMeta, pTask);
return 0;
}
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
......
......@@ -17,7 +17,7 @@
#include "tstream.h"
#include "wal.h"
static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) {
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->selfChildId = childId;
taosArrayPush(pArray, &pTask);
......@@ -45,110 +45,10 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
mndAddToTaskset(pTaskList, pTask);
addToTaskset(pTaskList, pTask);
return pTask;
}
SStreamTask* streamTaskClone(SStreamTask* pTask) {
SStreamTask* pDst = taosMemoryCalloc(1, sizeof(SStreamTask));
/* pDst->
SStreamId id;
int32_t totalLevel;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
SStreamStatus status;
int32_t selfChildId;
int32_t nodeId; // vgroup id
SEpSet epSet;
SCheckpointInfo chkInfo;
STaskExec exec;
int8_t fillHistory; // fill history
int64_t ekey; // end ts key
int64_t endVer; // end version
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
// output
union {
STaskDispatcherFixedEp fixedEpDispatcher;
STaskDispatcherShuffle shuffleDispatcher;
STaskSinkTb tbSink;
STaskSinkSma smaSink;
STaskSinkFetch fetchSink;
};
int8_t inputStatus;
int8_t outputStatus;
SStreamQueue* inputQueue;
SStreamQueue* outputQueue;
// trigger
int8_t triggerStatus;
int64_t triggerParam;
void* timer;
SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend
// the followings attributes don't be serialized
int32_t recoverTryingDownstream;
int32_t recoverWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt;
int64_t checkpointingId;
int32_t checkpointAlignCnt;
struct SStreamMeta* pMeta;
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) {
streamQueueClose(pTask->inputQueue);
}
if (pTask->outputQueue) {
streamQueueClose(pTask->outputQueue);
}
if (pTask->exec.qmsg) {
taosMemoryFree(pTask->exec.qmsg);
}
if (pTask->exec.pExecutor) {
qDestroyTask(pTask->exec.pExecutor);
pTask->exec.pExecutor = NULL;
}
if (pTask->exec.pWalReader != NULL) {
walCloseReader(pTask->exec.pWalReader);
}
taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema);
tSimpleHashCleanup(pTask->tbSink.pTblInfo);
}
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL;
}
if (pTask->pState) {
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
}
if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr);
}
taosMemoryFree(pTask);*/
return NULL;
}
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册