未验证 提交 33d64b67 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17865 from taosdata/feature/stream

refactor(stream): stream deploy and state transfer
......@@ -241,6 +241,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "vnode-stream-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MON_MSG)
......
......@@ -46,7 +46,7 @@ enum {
TASK_STATUS__DROPPING,
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__RECOVER_DOWNSTREAM,
TASK_STATUS__WAIT_DOWNSTREAM,
TASK_STATUS__RECOVER_PREPARE,
TASK_STATUS__RECOVER1,
TASK_STATUS__RECOVER2,
......@@ -332,7 +332,10 @@ typedef struct SStreamTask {
SStreamState* pState;
// do not serialize
int32_t recoverWaitingChild;
int32_t recoverTryingDownstream;
int32_t recoverWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
} SStreamTask;
......@@ -418,7 +421,10 @@ typedef struct {
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t upstreamNodeId;
int32_t upstreamTaskId;
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int8_t inputStatus;
} SStreamDispatchRsp;
......@@ -441,60 +447,40 @@ typedef struct {
} SStreamRetrieveRsp;
typedef struct {
SMsgHead msgHead;
int64_t streamId;
int32_t taskId;
} SStreamRecoverStep1Req, SStreamRecoverStep2Req;
typedef struct {
int64_t reqId;
int64_t streamId;
int32_t taskId;
int32_t upstreamNodeId;
int32_t upstreamTaskId;
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int32_t childId;
} SStreamRecoverFinishReq;
} SStreamTaskCheckReq;
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
#if 0
typedef struct {
int64_t reqId;
int64_t streamId;
int32_t taskId;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
} SStreamTaskRecoverReq;
typedef struct {
int64_t streamId;
int32_t rspTaskId;
int32_t reqTaskId;
int8_t inputStatus;
} SStreamTaskRecoverRsp;
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq);
int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq);
int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp);
int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pRsp);
int32_t upstreamTaskId;
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int32_t childId;
int8_t status;
} SStreamTaskCheckRsp;
typedef struct {
int64_t streamId;
int32_t taskId;
} SMStreamTaskRecoverReq;
SMsgHead msgHead;
int64_t streamId;
int32_t taskId;
} SStreamRecoverStep1Req, SStreamRecoverStep2Req;
typedef struct {
int64_t streamId;
int32_t taskId;
} SMStreamTaskRecoverRsp;
int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq);
int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq);
int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp);
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp);
int32_t childId;
} SStreamRecoverFinishReq;
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
#endif
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
typedef struct {
int64_t streamId;
......@@ -509,20 +495,18 @@ typedef struct {
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
} SStreamRecoverDownstreamRsp;
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
int32_t tEncodeSStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstreamRsp* pRsp);
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t waitingRspCnt;
int32_t totReq;
SArray* info; // SArray<SArray<SStreamCheckpointInfo>*>
} SStreamRecoverStatus;
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
......@@ -533,7 +517,7 @@ int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
......@@ -544,6 +528,10 @@ int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
// recover and fill history
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version);
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version);
int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq);
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version);
// common
int32_t streamSetParamForRecover(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
......
......@@ -612,9 +612,12 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
// TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x4100)
#define TSDB_CODE_TDLITE_IVLD_OPEN_DIR TAOS_DEF_ERROR_CODE(0, 0x4101)
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)
#define TSDB_CODE_TDLITE_IVLD_OPEN_DIR TAOS_DEF_ERROR_CODE(0, 0x5101)
#ifdef __cplusplus
}
......
......@@ -1756,7 +1756,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
tscDebug("consumer:%" PRId64 ", return rsp", tmq->consumerId);
tscDebug("consumer:%" PRId64 ", return rsp %p", tmq->consumerId, rspObj);
return (TAOS_RES*)rspObj;
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
tscDebug("consumer:%" PRId64 ", return null since no committed offset", tmq->consumerId);
......
......@@ -58,7 +58,11 @@ static void smProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dTrace("msg:%p, get from snode-stream queue", pMsg);
int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pMsg);
if (code < 0) {
dGError("snd, msg:%p failed to process stream since %s", pMsg, terrstr(code));
if (pMsg) {
dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr(code));
} else {
dGError("snd, msg:%p failed to process stream empty msg since %s", pMsg, terrstr(code));
}
smSendRsp(pMsg, terrno);
}
......
......@@ -403,7 +403,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
......@@ -412,6 +411,9 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -86,7 +86,12 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr(code));
if (pMsg) {
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
terrstr(code));
} else {
dGError("vgId:%d, msg:%p failed to process stream empty msg since %s", pVnode->vgId, pMsg, terrstr(code));
}
vmSendRsp(pMsg, code);
}
......@@ -218,7 +223,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
return code;
}
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE); }
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE);
}
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
......
......@@ -157,7 +157,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
_OVER:
if (code != 0) {
if (terrno != 0) code = terrno;
dGTrace("msg:%p, failed to process since %s", pMsg, terrstr());
if (pMsg) {
dGTrace("msg:%p, failed to process %s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
} else {
dGTrace("msg:%p, failed to process empty msg since %s", pMsg, terrstr());
}
if (IsReq(pRpc)) {
SRpcMsg rsp = {.code = code, .info = pRpc->info};
......
......@@ -705,7 +705,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-stream");
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream");
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
......
......@@ -612,14 +612,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mndReleaseSubscribe(pMnode, pSub);
}
// TODO replace assert with error check
ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
if (mndDoRebalance(pMnode, &rebInput, &rebOutput) < 0) {
mError("mq rebalance internal error");
}
// if add more consumer to balanced subscribe,
// possibly no vg is changed
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
// TODO replace assert with error check
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped");
}
......
......@@ -231,10 +231,10 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
int32_t taskId = ntohl(pRsp->upstreamTaskId);
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp);
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
return 0;
} else {
return -1;
......
......@@ -158,7 +158,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
bool tsdbTableNextDataBlock(STsdbReader *pReader, uint64_t uid);
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow);
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
......@@ -240,7 +240,7 @@ bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
void vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
// sma
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
......
......@@ -150,19 +150,19 @@ typedef struct {
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
// tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb);
int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb);
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb);
int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb);
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
// tq
int tqInit();
......@@ -183,13 +183,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
// tq-stream
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
// int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
// int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg);
......
......@@ -896,6 +896,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->startVer = ver;
// expand executor
if (pTask->fillHistory) {
pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
}
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
......@@ -911,9 +915,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor);
if (pTask->fillHistory) {
pTask->taskStatus = TASK_STATUS__RECOVER_PREPARE;
}
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
......@@ -947,11 +948,90 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
streamSetupTrigger(pTask);
tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
pTask->selfChildId);
tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", TD_VID(pTq->pVnode), pTask->taskId,
pTask->selfChildId, pTask->taskLevel);
return 0;
}
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req;
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeSStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = {
.reqId = req.reqId,
.streamId = req.streamId,
.childId = req.childId,
.downstreamNodeId = req.downstreamNodeId,
.downstreamTaskId = req.downstreamTaskId,
.upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId,
};
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
rsp.status = 1;
} else {
rsp.status = 0;
}
tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code);
if (code < 0) {
ASSERT(0);
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeSStreamTaskCheckRsp(&encoder, &rsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {
.code = 0,
.pCont = buf,
.contLen = sizeof(SMsgHead) + len,
.info = pMsg->info,
};
tmsgSendRsp(&rspMsg);
return 0;
}
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
int32_t code;
SStreamTaskCheckRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
tDecoderClear(&decoder);
return -1;
}
tDecoderClear(&decoder);
tqDebug("tq recv task check rsp(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, rsp.upstreamTaskId);
if (pTask == NULL) {
return -1;
}
return streamProcessTaskCheckRsp(pTask, &rsp, version);
}
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
int32_t code;
#if 0
......@@ -982,37 +1062,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
// 3.go through recover steps to fill history
if (pTask->fillHistory) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
streamSetParamForRecover(pTask);
streamSourceRecoverPrepareStep1(pTask, version);
SStreamRecoverStep1Req req;
streamBuildSourceRecover1Req(pTask, &req);
int32_t len = sizeof(SStreamRecoverStep1Req);
void* serializedReq = rpcMallocCont(len);
if (serializedReq == NULL) {
return -1;
}
memcpy(serializedReq, &req, len);
SRpcMsg rpcMsg = {
.contLen = len,
.pCont = serializedReq,
.msgType = TDMT_VND_STREAM_RECOVER_STEP1,
};
if (tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &rpcMsg) < 0) {
/*ASSERT(0);*/
}
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
streamSetParamForRecover(pTask);
streamAggRecoverPrepare(pTask);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
// do nothing
}
streamTaskCheckDownstream(pTask, version);
}
return 0;
......@@ -1268,7 +1318,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE) {
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("skip push task %d, task status %d", pTask->taskId, pTask->taskStatus);
continue;
}
......@@ -1335,10 +1385,11 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
int32_t taskId = ntohl(pRsp->upstreamTaskId);
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
tqDebug("recv dispatch rsp, code: %x", pMsg->code);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp);
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
return 0;
} else {
return -1;
......@@ -1379,12 +1430,12 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
STQ* pTq = pVnode->pTq;
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
STQ* pTq = pVnode->pTq;
SMsgHead* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamDispatchReq req;
SDecoder decoder;
......@@ -1407,16 +1458,45 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
streamProcessDispatchReq(pTask, &req, &rsp, false);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
return;
return 0;
}
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
FAIL:
if (pMsg->info.handle == NULL) return;
if (pMsg->info.handle == NULL) return -1;
SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
if (pRspHead == NULL) {
SRpcMsg rsp = {
.code = TSDB_CODE_OUT_OF_MEMORY,
.info = pMsg->info,
};
tqDebug("send dispatch error rsp, code: %x", code);
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
return -1;
}
pRspHead->vgId = htonl(req.upstreamNodeId);
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
pRsp->streamId = htobe64(req.streamId);
pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
pRsp->downstreamNodeId = htonl(pVnode->config.vgId);
pRsp->downstreamTaskId = htonl(req.taskId);
pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
SRpcMsg rsp = {
.code = code,
.info = pMsg->info,
.contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp),
.pCont = pRspHead,
};
tqDebug("send dispatch error rsp, code: %x", code);
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
return -1;
}
......@@ -229,8 +229,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
break;
/* TQ */
case TDMT_VND_TMQ_SUBSCRIBE:
if (tqProcessSubscribeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
if (tqProcessSubscribeReq(pVnode->pTq, version, pReq, len) < 0) {
goto _err;
}
break;
......@@ -240,26 +239,22 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
}
break;
case TDMT_VND_TMQ_COMMIT_OFFSET:
if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
if (tqProcessOffsetCommitReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
break;
case TDMT_VND_TMQ_ADD_CHECKINFO:
if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
if (tqProcessAddCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) {
goto _err;
}
break;
case TDMT_VND_TMQ_DEL_CHECKINFO:
if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
if (tqProcessDelCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) {
goto _err;
}
break;
case TDMT_STREAM_TASK_DEPLOY: {
if (tqProcessTaskDeployReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
if (tqProcessTaskDeployReq(pVnode->pTq, version, pReq, len) < 0) {
goto _err;
}
} break;
......@@ -273,6 +268,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
goto _err;
}
} break;
case TDMT_STREAM_TASK_CHECK_RSP: {
if (tqProcessStreamTaskCheckRsp(pVnode->pTq, version, pReq, len) < 0) {
goto _err;
}
} break;
case TDMT_VND_ALTER_CONFIRM:
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
break;
......@@ -390,10 +390,12 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
#if 0
#if 1
case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
#endif
case TDMT_STREAM_TASK_CHECK:
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE:
......
......@@ -1888,11 +1888,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
pTSInfo->cond.startVersion = -1;
pTSInfo->cond.startVersion = 0;
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->cond.startVersion,
pTSInfo->cond.endVersion);
} else {
pTSInfo->cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->cond.startVersion,
pTSInfo->cond.endVersion);
}
/*resetTableScanInfo(pTSInfo, pWin);*/
......@@ -1909,11 +1913,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pBlock != NULL) {
calBlockTbName(&pInfo->tbnameCalSup, pBlock);
updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
return pBlock;
}
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
pTSInfo->cond.startVersion = 0;
tsdbReaderClose(pTSInfo->dataReader);
pTSInfo->dataReader = NULL;
pTSInfo->cond.startVersion = -1;
pTSInfo->cond.endVersion = -1;
return NULL;
......
......@@ -43,6 +43,8 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
......
......@@ -135,8 +135,11 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
pCont->inputStatus = status;
pCont->streamId = pReq->streamId;
pCont->taskId = pReq->upstreamTaskId;
pCont->streamId = htobe64(pReq->streamId);
pCont->upstreamNodeId = htonl(pReq->upstreamNodeId);
pCont->upstreamTaskId = htonl(pReq->upstreamTaskId);
pCont->downstreamNodeId = htonl(pTask->nodeId);
pCont->downstreamTaskId = htonl(pTask->taskId);
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
......@@ -203,10 +206,10 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
return 0;
}
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
qDebug("task %d receive dispatch rsp", pTask->taskId);
qDebug("task %d receive dispatch rsp, code: %x", pTask->taskId, code);
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
......
......@@ -210,6 +210,46 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
return 0;
}
int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeSStreamTaskCheckReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
return -1;
}
((SMsgHead*)buf)->vgId = htonl(nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeSStreamTaskCheckReq(&encoder, pReq)) < 0) {
goto FAIL;
}
tEncoderClear(&encoder);
msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf;
msg.msgType = TDMT_STREAM_TASK_CHECK;
qDebug("dispatch from task %d to task %d node %d: check msg", pTask->taskId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg);
return 0;
FAIL:
if (buf) rpcFreeCont(buf);
return code;
}
int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) {
void* buf = NULL;
......@@ -243,7 +283,8 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
tmsgSendReq(pEpSet, &msg);
code = 0;
qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->taskId, pReq->taskId, vgId);
return 0;
FAIL:
if (buf) rpcFreeCont(buf);
......@@ -279,7 +320,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p
msg.pCont = buf;
msg.msgType = pTask->dispatchMsgType;
qDebug("dispatch from task %d to task %d node %d", pTask->taskId, pReq->taskId, vgId);
qDebug("dispatch from task %d to task %d node %d: data msg", pTask->taskId, pReq->taskId, vgId);
tmsgSendReq(pEpSet, &msg);
......
......@@ -202,83 +202,83 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {
int32_t batchCnt = 1;
void* data = NULL;
void* input = NULL;
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
qDebug("stream task exec over, queue empty, task: %d", pTask->taskId);
break;
}
if (data == NULL) {
data = qItem;
if (input == NULL) {
input = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->taskLevel == TASK_LEVEL__SINK) {
break;
}
} else {
void* newRet;
if ((newRet = streamMergeQueueItem(data, qItem)) == NULL) {
if ((newRet = streamMergeQueueItem(input, qItem)) == NULL) {
streamQueueProcessFail(pTask->inputQueue);
break;
} else {
batchCnt++;
data = newRet;
input = newRet;
streamQueueProcessSuccess(pTask->inputQueue);
}
}
}
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
if (data) streamFreeQitem(data);
if (input) streamFreeQitem(input);
return 0;
}
if (data == NULL) {
if (input == NULL) {
break;
}
if (pTask->taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, data);
ASSERT(((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_BLOCK);
streamTaskOutput(pTask, input);
continue;
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, batchCnt);
streamTaskExecImpl(pTask, data, pRes);
streamTaskExecImpl(pTask, input, pRes);
qDebug("stream task %d exec end", pTask->taskId);
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
streamFreeQitem(data);
streamFreeQitem(input);
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
if (((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)input;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver;
} else if (((SStreamQueueItem*)data)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data;
} else if (((SStreamQueueItem*)input)->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)input;
qRes->childId = pTask->selfChildId;
qRes->sourceVer = pMerged->ver;
}
if (streamTaskOutput(pTask, qRes) < 0) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
streamFreeQitem(data);
streamFreeQitem(input);
taosFreeQitem(qRes);
return -1;
}
} else {
taosArrayDestroy(pRes);
}
streamFreeQitem(data);
streamFreeQitem(input);
}
return 0;
}
......
......@@ -609,6 +609,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
// TDLite
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory")
......
......@@ -235,6 +235,8 @@
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/fillHistoryBasic1.sim
./test.sh -f tsim/stream/fillHistoryBasic2.sim
./test.sh -f tsim/stream/fillHistoryBasic3.sim
./test.sh -f tsim/stream/distributeInterval0.sim
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
./test.sh -f tsim/stream/distributeSession0.sim
......
......@@ -26,7 +26,7 @@ sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1);
sleep 1000
sleep 5000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
......@@ -139,7 +139,7 @@ if $data35 != 3 then
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500
sleep 1000
sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c)
......@@ -256,7 +256,7 @@ if $data35 != 3 then
endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
......@@ -286,7 +286,7 @@ if $data15 != 13 then
endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
......@@ -318,7 +318,7 @@ endi
sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
......@@ -350,7 +350,7 @@ endi
sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2
......@@ -380,7 +380,7 @@ if $data25 != 3 then
endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0
......@@ -410,7 +410,7 @@ if $data05 != 3 then
endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
......@@ -466,9 +466,6 @@ if $data25 != 3 then
endi
sql create database test2 vgroups 1;
sql select * from information_schema.ins_databases;
......@@ -484,7 +481,7 @@ sql insert into t1 values(1648791213004,4,2,3,4.1);
sql create stream stream2 trigger at_once fill_history 1 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
sleep 1000
sleep 5000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
......@@ -597,7 +594,7 @@ if $data35 != 3 then
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500
sleep 1000
sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c)
......@@ -714,7 +711,7 @@ if $data35 != 3 then
endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
......@@ -744,7 +741,7 @@ if $data15 != 13 then
endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
......@@ -776,7 +773,7 @@ endi
sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
......@@ -808,7 +805,7 @@ endi
sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2
......@@ -838,7 +835,7 @@ if $data25 != 3 then
endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0
......@@ -868,7 +865,7 @@ if $data05 != 3 then
endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
print ===== step1
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql select * from information_schema.ins_dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ===== step2
sql drop stream if exists stream_t1;
sql drop database if exists test;
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql insert into ts1 values(1648791213001,1,12,3,1.0);
sql insert into ts2 values(1648791213001,1,12,3,1.0);
sql insert into ts3 values(1648791213001,1,12,3,1.0);
sql insert into ts4 values(1648791213001,1,12,3,1.0);
sql insert into ts1 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL);
sql insert into ts1 values(1648791223002,2,2,3,1.1);
sql insert into ts1 values(1648791233003,3,2,3,2.1);
sql insert into ts2 values(1648791243004,4,2,43,73.1);
sql insert into ts1 values(1648791213002,24,22,23,4.1);
sql insert into ts1 values(1648791243005,4,20,3,3.1);
sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
sql insert into ts3 values(1648791223002,2,2,3,1.1);
sql insert into ts4 values(1648791233003,3,2,3,2.1);
sql insert into ts3 values(1648791243004,4,2,43,73.1);
sql insert into ts4 values(1648791213002,24,22,23,4.1);
sql insert into ts3 values(1648791243005,4,20,3,3.1);
sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ;
sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ;
sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1);
sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ;
sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ;
sql create stream stream_t1 trigger at_once fill_history 1 watermark 1d into streamtST1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s);
sleep 1000
$loop_count = 0
loop1:
sleep 300
sql select * from streamtST1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 8 then
print =====data01=$data01
goto loop1
endi
if $data02 != 6 then
print =====data02=$data02
goto loop1
endi
if $data03 != 52 then
print ======data03=$data03
goto loop1
endi
if $data04 != 52 then
print ======data04=$data04
goto loop1
endi
if $data05 != 13 then
print ======data05=$data05
goto loop1
endi
# row 1
if $data11 != 6 then
print =====data11=$data11
goto loop1
endi
if $data12 != 6 then
print =====data12=$data12
goto loop1
endi
if $data13 != 92 then
print ======$data13
return -1
endi
if $data14 != 22 then
print ======$data14
return -1
endi
if $data15 != 3 then
print ======$data15
return -1
endi
# row 2
if $data21 != 4 then
print =====data21=$data21
goto loop1
endi
if $data22 != 4 then
print =====data22=$data22
goto loop1
endi
if $data23 != 32 then
print ======$data23
return -1
endi
if $data24 != 12 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 30 then
print =====data31=$data31
goto loop1
endi
if $data32 != 30 then
print =====data32=$data32
goto loop1
endi
if $data33 != 180 then
print ======$data33
return -1
endi
if $data34 != 42 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
sql create database test1 vgroups 4;
sql use test1;
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3);
sql insert into ts2 values(1648791211000,1,2,3);
sql insert into ts2 values(1648791222001,2,2,3);
sql create stream stream_t2 trigger at_once fill_history 1 watermark 20s into streamtST1 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
$loop_count = 0
loop2:
sql select * from streamtST1;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
#rows 1
if $data11 != 2 then
print =====data11=$data11
goto loop2
endi
#max,min selectivity
sql create database test3 vgroups 4;
sql use test3;
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t3 trigger at_once into streamtST3 as select ts, min(a) c6, a, b, c, ta, tb, tc from st interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3);
sleep 50
sql insert into ts1 values(1648791222001,2,2,3);
sleep 50
sql insert into ts2 values(1648791211000,1,2,3);
sleep 50
sql insert into ts2 values(1648791222001,2,2,3);
sleep 50
$loop_count = 0
loop3:
sql select * from streamtST3;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data02 != 1 then
print =====data02=$data02
goto loop3
endi
# row 1
if $data12 != 2 then
print =====data12=$data12
goto loop3
endi
system sh/stop_dnodes.sh
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create database test vgroups 1;
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
sql create stream streams2 trigger at_once fill_history 1 into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st partition by a interval(10s);
sleep 3000
$loop_count = 0
loop7:
sleep 50
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop7
endi
if $data02 != NULL then
print =====data02=$data02
goto loop7
endi
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791213000,1,2,3,1.0);
$loop_count = 0
loop8:
sleep 50
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop8
endi
if $data02 != 1 then
print =====data02=$data02
goto loop8
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
sql insert into t2 values(1648791213000,2,2,3,1.0);
$loop_count = 0
loop9:
sleep 50
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop9
endi
if $data02 != 2 then
print =====data02=$data02
goto loop9
endi
sql insert into t1 values(1648791213000,2,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.0);
sql insert into t1 values(1648791213002,2,2,3,1.0);
sql insert into t1 values(1648791213002,1,2,3,1.0);
sql insert into t2 values(1648791213000,2,2,3,1.0);
sql insert into t2 values(1648791213001,2,2,3,1.0);
sql insert into t2 values(1648791213002,2,2,3,1.0);
sql insert into t2 values(1648791213002,1,2,3,1.0);
$loop_count = 0
loop10:
sleep 50
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop10
endi
if $data02 != 1 then
print =====data02=$data02
goto loop10
endi
if $data11 != 4 thenloop4
print =====data11=$data11
goto loop10
endi
if $data12 != 2 then
print =====data12=$data12
goto loop10
endi
sql insert into t1 values(1648791223000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.0);
sql insert into t1 values(1648791223002,3,2,3,1.0);
sql insert into t1 values(1648791223003,3,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
sql insert into t2 values(1648791223000,1,2,3,1.0);
sql insert into t2 values(1648791223001,1,2,3,1.0);
sql insert into t2 values(1648791223002,3,2,3,1.0);
sql insert into t2 values(1648791223003,3,2,3,1.0);
sql insert into t2 values(1648791213001,1,2,3,1.0) (1648791223001,2,2,3,1.0) (1648791223003,1,2,3,1.0);
$loop_count = 0
loop11:
sleep 50
sql select * from test.streamt2 order by c1, c2, c3;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop11
endi
if $data02 != 2 then
print =====data02=$data02
goto loop11
endi
if $data11 != 4 then
print =====data11=$data11
goto loop11
endi
if $data12 != 1 then
print =====data12=$data12
goto loop11
endi
if $data21 != 2 then
print =====data21=$data21
goto loop11
endi
if $data22 != 2 then
print =====data22=$data22
goto loop11
endi
if $data31 != 2 then
print =====data31=$data31
goto loop11
endi
if $data32 != 3 then
print =====data32=$data32
goto loop11
endi
if $data41 != 4 then
print =====data41=$data41
goto loop11
endi
if $data42 != 1 then
print =====data42=$data42
goto loop11
endi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册