未验证 提交 af0f3981 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14960 from taosdata/feature/stream

feat(stream): recover from failure
......@@ -57,7 +57,7 @@ enum {
// STREAM_INPUT__TABLE_SCAN,
STREAM_INPUT__TQ_SCAN,
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__TRIGGER,
STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DROP,
};
......@@ -155,10 +155,10 @@ typedef struct SQueryTableDataCond {
int32_t numOfCols;
SColumnInfo* colList;
int32_t type; // data block load type:
// int32_t numOfTWindows;
STimeWindow twindows;
int64_t startVersion;
int64_t endVersion;
// int32_t numOfTWindows;
STimeWindow twindows;
int64_t startVersion;
int64_t endVersion;
} SQueryTableDataCond;
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
......
......@@ -1969,7 +1969,7 @@ typedef struct SVCreateTbReq {
int8_t type;
union {
struct {
char* name; // super table name
char* name; // super table name
tb_uid_t suid;
SArray* tagName;
uint8_t* pTag;
......@@ -2439,9 +2439,6 @@ typedef struct {
int8_t igNotExists;
} SMDropStreamReq;
int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq);
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
typedef struct {
int8_t reserved;
} SMDropStreamRsp;
......@@ -2456,6 +2453,27 @@ typedef struct {
int8_t reserved;
} SVDropStreamTaskRsp;
int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq);
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
int8_t igNotExists;
} SMRecoverStreamReq;
typedef struct {
int8_t reserved;
} SMRecoverStreamRsp;
typedef struct {
int64_t recoverObjUid;
int32_t taskId;
int32_t hasCheckPoint;
} SMVStreamGatherInfoReq;
int32_t tSerializeSMRecoverStreamReq(void* buf, int32_t bufLen, const SMRecoverStreamReq* pReq);
int32_t tDeserializeSMRecoverStreamReq(void* buf, int32_t bufLen, SMRecoverStreamReq* pReq);
typedef struct {
int64_t leftForVer;
int32_t vgId;
......@@ -2878,7 +2896,8 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp
}
static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI16(buf, &pRsp->resMsgType);
buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen);
buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen);
......
......@@ -131,6 +131,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "alter-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "drop-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RECOVER_STREAM, "recover-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL)
......
......@@ -192,6 +192,8 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem);
int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer);
#ifdef __cplusplus
}
#endif
......
......@@ -34,6 +34,10 @@ typedef struct SStreamTask SStreamTask;
enum {
TASK_STATUS__NORMAL = 0,
TASK_STATUS__DROPPING,
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__PREPARE_RECOVER,
TASK_STATUS__RECOVERING,
};
enum {
......@@ -72,6 +76,7 @@ typedef struct {
int8_t type;
int32_t srcVgId;
int32_t childId;
int64_t sourceVer;
SArray* blocks; // SArray<SSDataBlock*>
......@@ -222,6 +227,8 @@ typedef struct {
int32_t nodeId;
int32_t childId;
int32_t taskId;
int64_t checkpointVer;
int64_t processedVer;
SEpSet epSet;
} SStreamChildEpInfo;
......@@ -232,6 +239,7 @@ typedef struct SStreamTask {
int8_t execType;
int8_t sinkType;
int8_t dispatchType;
int8_t isStreamDistributed;
int16_t dispatchMsgType;
int8_t taskStatus;
......@@ -242,6 +250,13 @@ typedef struct SStreamTask {
int32_t nodeId;
SEpSet epSet;
// used for semi or single task,
// while final task should have processedVer for each child
int64_t recoverSnapVer;
int64_t startVer;
int64_t checkpointVer;
int64_t processedVer;
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
......@@ -316,12 +331,12 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (pItem->type == STREAM_INPUT__TRIGGER) {
} else if (pItem->type == STREAM_INPUT__GET_RES) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
}
if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
}
......@@ -420,6 +435,36 @@ typedef struct {
int8_t inputStatus;
} SStreamTaskRecoverRsp;
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq);
int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq);
int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp);
int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pRsp);
typedef struct {
int64_t streamId;
int32_t taskId;
} SMStreamTaskRecoverReq;
typedef struct {
int64_t streamId;
int32_t taskId;
} SMStreamTaskRecoverRsp;
int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq);
int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq);
int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp);
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp);
typedef struct {
int64_t streamId;
} SPStreamTaskRecoverReq;
typedef struct {
int8_t reserved;
} SPStreamTaskRecoverRsp;
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
......
......@@ -4827,6 +4827,35 @@ int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *
return 0;
}
int32_t tSerializeSMRecoverStreamReq(void *buf, int32_t bufLen, const SMRecoverStreamReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMRecoverStreamReq(void *buf, int32_t bufLen, SMRecoverStreamReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->ast);
......@@ -4949,8 +4978,8 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if (tEncodeTag(pCoder, (const STag *)pReq->ctb.pTag) < 0) return -1;
int32_t len = taosArrayGetSize(pReq->ctb.tagName);
if (tEncodeI32(pCoder, len) < 0) return -1;
for (int32_t i = 0; i < len; i++){
char* name = taosArrayGet(pReq->ctb.tagName, i);
for (int32_t i = 0; i < len; i++) {
char *name = taosArrayGet(pReq->ctb.tagName, i);
if (tEncodeCStr(pCoder, name) < 0) return -1;
}
} else if (pReq->type == TSDB_NORMAL_TABLE) {
......@@ -4986,9 +5015,9 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
int32_t len = 0;
if (tDecodeI32(pCoder, &len) < 0) return -1;
pReq->ctb.tagName = taosArrayInit(len, TSDB_COL_NAME_LEN);
if(pReq->ctb.tagName == NULL) return -1;
for (int32_t i = 0; i < len; i++){
char name[TSDB_COL_NAME_LEN] = {0};
if (pReq->ctb.tagName == NULL) return -1;
for (int32_t i = 0; i < len; i++) {
char name[TSDB_COL_NAME_LEN] = {0};
char *tmp = NULL;
if (tDecodeCStr(pCoder, &tmp) < 0) return -1;
strcpy(name, tmp);
......
......@@ -559,6 +559,7 @@ typedef struct {
// info
int64_t uid;
int8_t status;
int8_t isDistributed;
// config
int8_t igExpired;
int8_t trigger;
......@@ -586,6 +587,23 @@ typedef struct {
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
typedef struct {
char streamName[TSDB_STREAM_FNAME_LEN];
int64_t uid;
int64_t streamUid;
SArray* childInfo; // SArray<SStreamChildEpInfo>
} SStreamCheckpointObj;
#if 0
typedef struct {
int64_t uid;
int64_t streamId;
int8_t isDistributed;
int8_t status;
int8_t stage;
} SStreamRecoverObj;
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -27,6 +27,7 @@ typedef enum {
TRANS_STOP_FUNC_TEST = 2,
TRANS_START_FUNC_MQ_REB = 3,
TRANS_STOP_FUNC_MQ_REB = 4,
TRANS_FUNC_RECOVER_STREAM_STEP_NEXT = 5,
} ETrnFunc;
typedef enum {
......
......@@ -27,6 +27,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->isDistributed) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
......@@ -72,6 +73,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->isDistributed) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
......
......@@ -319,6 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
ASSERT(totLevel <= 2);
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
pStream->isDistributed = totLevel == 2;
bool hasExtraSink = false;
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
......
......@@ -36,7 +36,7 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
/*static int32_t mndProcessDropStreamInRsp(SRpcMsg *pRsp);*/
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
......@@ -55,6 +55,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
......@@ -672,6 +673,69 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
/*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/
SMRecoverStreamReq recoverReq = {0};
if (tDeserializeSMRecoverStreamReq(pReq->pCont, pReq->contLen, &recoverReq) < 0) {
ASSERT(0);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
pStream = mndAcquireStream(pMnode, recoverReq.name);
if (pStream == NULL) {
if (recoverReq.igNotExists) {
mDebug("stream:%s, not exist, ignore not exist is set", recoverReq.name);
sdbRelease(pMnode->pSdb, pStream);
return 0;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
}
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to recover since %s", recoverReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
mDebug("trans:%d, used to drop stream:%s", pTrans->id, recoverReq.name);
// broadcast to recover all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to recover task since %s", recoverReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
// update stream status
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare recover stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
sdbRelease(pMnode->pSdb, pStream);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
......
......@@ -137,17 +137,18 @@ typedef enum {
SDB_USER = 7,
SDB_AUTH = 8,
SDB_ACCT = 9,
SDB_STREAM = 10,
SDB_OFFSET = 11,
SDB_SUBSCRIBE = 12,
SDB_CONSUMER = 13,
SDB_TOPIC = 14,
SDB_VGROUP = 15,
SDB_SMA = 16,
SDB_STB = 17,
SDB_DB = 18,
SDB_FUNC = 19,
SDB_MAX = 20
SDB_STREAM_CK = 10,
SDB_STREAM = 11,
SDB_OFFSET = 12,
SDB_SUBSCRIBE = 13,
SDB_CONSUMER = 14,
SDB_TOPIC = 15,
SDB_VGROUP = 16,
SDB_SMA = 17,
SDB_STB = 18,
SDB_DB = 19,
SDB_FUNC = 20,
SDB_MAX = 21
} ESdbType;
typedef struct SSdbRaw {
......@@ -308,7 +309,7 @@ void sdbRelease(SSdb *pSdb, void *pObj);
* @return void* The next iterator of the table.
*/
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj);
void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status) ;
void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status);
/**
* @brief Cancel a traversal
......
......@@ -139,6 +139,12 @@ typedef struct STaskIdInfo {
char* str;
} STaskIdInfo;
enum {
STREAM_RECOVER_STEP__NONE = 0,
STREAM_RECOVER_STEP__PREPARE,
STREAM_RECOVER_STEP__SCAN,
};
typedef struct {
//TODO remove prepareStatus
STqOffsetVal prepareStatus; // for tmq
......@@ -147,6 +153,10 @@ typedef struct {
SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond;
int64_t lastScanUid;
int8_t recoverStep;
SQueryTableDataCond tableCond;
int64_t recoverStartVer;
int64_t recoverEndVer;
} SStreamTaskInfo;
typedef struct SExecTaskInfo {
......
......@@ -261,6 +261,15 @@ int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
}
#endif
int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.recoverStartVer = startVer;
pTaskInfo->streamInfo.recoverEndVer = endVer;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE;
return 0;
}
void* qExtractReaderFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return (void*)pInfo->tqReader;
......
......@@ -516,10 +516,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
tsdbReaderClose(pInfo->dataReader);
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader,
GET_TASKID(pTaskInfo));
if (code != 0) {
// TODO
}
}
SSDataBlock* result = doTableScanGroup(pOperator);
......@@ -871,6 +875,7 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
}
resetTableScanInfo(pInfo->pTableScanOp->info, &win);
pInfo->pTableScanOp->status = OP_OPENED;
return true;
}
......@@ -1193,8 +1198,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
}
}
ASSERT(pInfo->pRes->pDataBlock != NULL);
// currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) {
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
......@@ -1259,6 +1262,24 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return NULL;
}
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
}
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
if (pBlock != NULL) {
return pBlock;
}
// TODO fill in bloom filter
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
return NULL;
}
size_t total = taosArrayGetSize(pInfo->pBlockLists);
// TODO: refactor
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
......@@ -1551,6 +1572,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
goto _error;
}
taosArrayDestroy(tableIdList);
memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond));
}
// create the pseduo columns info
......
......@@ -57,7 +57,7 @@ void streamTriggerByTimer(void* param, void* tmrId) {
if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM);
if (trigger == NULL) return;
trigger->type = STREAM_INPUT__TRIGGER;
trigger->type = STREAM_INPUT__GET_RES;
trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (trigger->pBlock == NULL) {
taosFreeQitem(trigger);
......@@ -183,8 +183,11 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
#if 0
if (pTask->execType != TASK_EXEC__NONE) {
streamExec(pTask, pTask->pMsgCb);
#endif
streamExec(pTask, pTask->pMsgCb);
#if 0
} else {
ASSERT(pTask->sinkType != TASK_SINK__NONE);
while (1) {
......@@ -195,11 +198,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
}
}
}
#endif
// 3. handle output
// 3.1 check and set status
// 3.2 dispatch / sink
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
streamDispatch(pTask, pTask->pMsgCb);
}
......
......@@ -112,7 +112,7 @@ int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
void streamFreeQitem(SStreamQueueItem* data) {
int8_t type = data->type;
if (type == STREAM_INPUT__TRIGGER) {
if (type == STREAM_INPUT__GET_RES) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
......
......@@ -20,7 +20,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
// set input
SStreamQueueItem* pItem = (SStreamQueueItem*)data;
if (pItem->type == STREAM_INPUT__TRIGGER) {
if (pItem->type == STREAM_INPUT__GET_RES) {
SStreamTrigger* pTrigger = (SStreamTrigger*)data;
qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
......@@ -73,6 +73,15 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
return 0;
}
static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) {
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
int32_t childId = pBlock->childId;
int64_t ver = pBlock->sourceVer;
SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId);
pChildInfo->processedVer = ver;
return 0;
}
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
int32_t cnt = 0;
void* data = NULL;
......@@ -84,14 +93,17 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
}
if (data == NULL) {
data = qItem;
if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
}
streamQueueProcessSuccess(pTask->inputQueue);
continue;
} else {
if (streamAppendQueueItem(data, qItem) < 0) {
streamQueueProcessFail(pTask->inputQueue);
break;
} else {
cnt++;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
streamQueueProcessSuccess(pTask->inputQueue);
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
taosFreeQitem(qItem);
......@@ -106,6 +118,12 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (data == NULL) return pRes;
if (pTask->execType == TASK_EXEC__NONE) {
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, data);
return pRes;
}
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId);
......@@ -125,6 +143,11 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
taosFreeQitem(qRes);
return NULL;
}
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/
pRes = taosArrayInit(0, sizeof(SSDataBlock));
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamInc.h"
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->inputStatus) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->inputStatus) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
if (pTask->taskStatus != TASK_STATUS__FAIL) {
return 0;
}
if (pTask->isStreamDistributed) {
if (pTask->isDataScan) {
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
} else if (pTask->execType != TASK_EXEC__NONE) {
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
bool hasCheckpoint = false;
int32_t childSz = taosArrayGetSize(pTask->childEpInfo);
for (int32_t i = 0; i < childSz; i++) {
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
if (pEpInfo->checkpointVer == -1) {
hasCheckpoint = true;
break;
}
}
if (hasCheckpoint) {
// load from checkpoint
} else {
// recover child
}
}
} else {
if (pTask->isDataScan) {
if (pTask->checkpointVer != -1) {
// load from checkpoint
} else {
// reset stream query task info
// TODO get snapshot ver
pTask->recoverSnapVer = -1;
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
pTask->taskStatus = TASK_STATUS__RECOVERING;
}
}
}
if (pTask->taskStatus == TASK_STATUS__RECOVERING) {
streamProcessRunReq(pTask);
}
return 0;
}
......@@ -34,6 +34,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo)
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
return 0;
}
......@@ -42,6 +43,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册