diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 1677ec0d4ffd0aebbeb7f629a287255b6ccfeb15..cea40f4785be8e32a07fadf165a7407d56fddf8e 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -232,7 +232,7 @@ void blockDebugShowData(const SArray* dataBlocks); int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t uid, tb_uid_t suid); -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid); +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, int32_t vgId); static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 71462a86d42d28ba1b1fd0b02c99eb75fd77146d..e277622c40b60a0e96e81ba35f3711f652a98a0b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -25,6 +25,8 @@ extern "C" { #ifndef _TSTREAM_H_ #define _TSTREAM_H_ +typedef struct SStreamTask SStreamTask; + enum { STREAM_TASK_STATUS__RUNNING = 1, STREAM_TASK_STATUS__STOP, @@ -69,7 +71,7 @@ typedef struct { SUseDbRsp dbInfo; } STaskDispatcherShuffle; -typedef void FTbSink(void* vnode, int64_t ver, const SArray* data); +typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); typedef struct { int64_t stbUid; @@ -119,7 +121,7 @@ enum { TASK_SINK__FETCH, }; -typedef struct { +struct SStreamTask { int64_t streamId; int32_t taskId; int8_t status; @@ -154,8 +156,7 @@ typedef struct { // application storage void* ahandle; - -} SStreamTask; +}; SStreamTask* tNewSStreamTask(int64_t streamId); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 1c099afc930fc1ed40d5a1a4d021e1869ef7fa1e..0d0bbb07be8255da57937c6ad38f8d3cb5cd8fda 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1596,7 +1596,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks return TSDB_CODE_SUCCESS; } -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid) { +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, + int32_t vgId) { SSubmitReq* ret = NULL; // cal size @@ -1634,9 +1635,11 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo } // assign data - ret = taosMemoryCalloc(1, cap); + ret = taosMemoryCalloc(1, cap + 46); + ret = POINTER_SHIFT(ret, 46); + ret->header.vgId = vgId; ret->version = htonl(1); - ret->length = htonl(cap - sizeof(SSubmitReq)); + ret->length = sizeof(SSubmitReq); ret->numOfBlocks = htonl(sz); void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq)); @@ -1703,11 +1706,14 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo rowData = POINTER_SHIFT(rowData, rowLen); blkHead->dataLen += rowLen; } - int32_t len = blkHead->dataLen; - blkHead->dataLen = htonl(len); - blkHead = POINTER_SHIFT(blkHead, len); + int32_t dataLen = blkHead->dataLen; + blkHead->dataLen = htonl(dataLen); + + ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; + blkHead = POINTER_SHIFT(blkHead, schemaLen + dataLen); /*submitBlk = blkHead;*/ } + ret->length = htonl(ret->length); return ret; } diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 4b42f97f53af81e4faab94df877b06cf04b5a346..fa5e1d78a0b8227a3886a13633359f755171703b 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -106,6 +106,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); // sync integration +int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9329e7b15d613dcd6ebfed74b90d50f3325ae41f..8939aa23e0fdf69ef8e2bb6d411fa6e7beed77a4 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -177,6 +177,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.pWrapper = pMgmt->pWrapper; + msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 3088c5dea40f92b5ef9a8b2b582ef8bfedc66d58..17c7563885becbdba4a589664fe9467bea3c7b53 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -135,6 +135,7 @@ static void *vmOpenVnodeFunc(void *param) { SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.pWrapper = pMgmt->pWrapper; + msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; @@ -147,7 +148,7 @@ static void *vmOpenVnodeFunc(void *param) { pThread->failed++; } else { vmOpenVnode(pMgmt, pCfg, pImpl); - //vnodeStart(pImpl); + // vnodeStart(pImpl); dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->opened++; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index f3d7253e716de5037194298fb2970f007474e48b..06e24cb48aef945d5c893ad12cd9e575b744b6b5 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -357,6 +357,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT pMsg->rpcMsg = *pRpc; // if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0); switch (qtype) { + case WRITE_QUEUE: + dTrace("msg:%p, will be put into vnode-write queue", pMsg); + taosWriteQitem(pVnode->pWriteQ, pMsg); + break; case QUERY_QUEUE: dTrace("msg:%p, will be put into vnode-query queue", pMsg); taosWriteQitem(pVnode->pQueryQ, pMsg); @@ -387,6 +391,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT return code; } +int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, WRITE_QUEUE); +} + int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e769108d59b7eaacdf46099ad5707a81ae77f73a..b7378e44f2eb37b80d9cf33dea848256c8031216 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -884,11 +884,21 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { } } -void tqTableSink(void* vnode, int64_t ver, const SArray* data) { - // - SVnode* pVnode = (SVnode*)vnode; +void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { + const SArray* pRes = (const SArray*)data; + SVnode* pVnode = (SVnode*)vnode; + + ASSERT(pTask->tbSink.pTSchema); + SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, pVnode->config.vgId); + tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); // build write msg - // + SRpcMsg msg = { + .msgType = TDMT_VND_SUBMIT, + .pCont = pReq, + .contLen = ntohl(pReq->length), + }; + + ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0); } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 5c609303ff760109ad935c8ec522bdf52899f6aa..ebde1c79970c70ba37c2b80f3153060d6493c6a3 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1,7 +1,7 @@ -#include "ttime.h" -#include "tdatablock.h" #include "executorimpl.h" #include "functionMgt.h" +#include "tdatablock.h" +#include "ttime.h" typedef enum SResultTsInterpType { RESULT_ROW_START_INTERP = 1, @@ -545,7 +545,6 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { } } - static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx, SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep, int32_t order, bool timeWindowInterpo) { @@ -759,10 +758,10 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SIntervalAggOperatorInfo* pInfo = pOperator->info; - int32_t order = TSDB_ORDER_ASC; + int32_t order = TSDB_ORDER_ASC; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -808,7 +807,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); int64_t gid = pBlock->info.groupId; @@ -932,7 +931,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -981,10 +980,10 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr } } static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) { - for ( int i = 0; i < num; i++) { + for (int i = 0; i < num; i++) { if (type == STREAM_INVERT) { fmSetInvertFunc(pCtx[i].functionId, &(pCtx[i].fpSet)); - } else if (type == STREAM_NORMAL){ + } else if (type == STREAM_NORMAL) { fmSetNormalFunc(pCtx[i].functionId, &(pCtx[i].fpSet)); } } @@ -992,7 +991,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; - int32_t order = TSDB_ORDER_ASC; + int32_t order = TSDB_ORDER_ASC; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -1038,7 +1037,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); - ASSERT(pInfo->binfo.pRes->info.rows > 0); + // TODO: remove for stream + /*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/ pOperator->status = OP_RES_TO_RETURN; return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; @@ -1070,17 +1070,17 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; // pInfo->execModel = OPTR_EXEC_MODEL_STREAM; - pInfo->execModel = pTaskInfo->execModel; - pInfo->win = pTaskInfo->window; - pInfo->twAggSup = *pTwAggSupp; + pInfo->execModel = pTaskInfo->execModel; + pInfo->win = pTaskInfo->window; + pInfo->twAggSup = *pTwAggSupp; pInfo->primaryTsIndex = primaryTsSlotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -1099,14 +1099,14 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); - pOperator->name = "TimeIntervalAggOperator"; + pOperator->name = "TimeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); @@ -1118,7 +1118,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* return pOperator; - _error: +_error: destroyIntervalOperatorInfo(pInfo, numOfCols); taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -1131,7 +1131,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } @@ -1177,7 +1177,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr return pOperator; - _error: +_error: destroyIntervalOperatorInfo(pInfo, numOfCols); taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -1321,7 +1321,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { return pSliceInfo->binfo.pRes; } - int32_t order = TSDB_ORDER_ASC; + int32_t order = TSDB_ORDER_ASC; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -1379,7 +1379,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; - _error: +_error: taosMemoryFree(pInfo); taosMemoryFree(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; @@ -1402,18 +1402,18 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - pInfo->twAggSup = *pTwAggSup; + pInfo->twAggSup = *pTwAggSup; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->tsSlotId = tsSlotId; - pOperator->name = "StateWindowOperator"; + pInfo->tsSlotId = tsSlotId; + pOperator->name = "StateWindowOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW; pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExpr; - pOperator->numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; - pOperator->info = pInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExpr; + pOperator->numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); @@ -1421,7 +1421,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; - _error: +_error: pTaskInfo->code = TSDB_CODE_SUCCESS; return NULL; } @@ -1432,8 +1432,8 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { } SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, - SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, + STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1453,18 +1453,18 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->tsSlotId = tsSlotId; - pInfo->gap = gap; - pInfo->binfo.pRes = pResBlock; - pInfo->winSup.prevTs = INT64_MIN; - pInfo->reptScan = false; - pOperator->name = "SessionWindowAggOperator"; + pInfo->tsSlotId = tsSlotId; + pInfo->gap = gap; + pInfo->binfo.pRes = pResBlock; + pInfo->winSup.prevTs = INT64_MIN; + pInfo->reptScan = false; + pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW; pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; - pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); @@ -1473,7 +1473,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo code = appendDownstream(pOperator, &downstream, 1); return pOperator; - _error: +_error: if (pInfo != NULL) { destroySWindowOperatorInfo(pInfo, numOfCols); } @@ -1482,4 +1482,4 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; -} \ No newline at end of file +} diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 3b49dca800265852dc4ac8bf04d810043a77deda..237f673a47ad12580c7a0394bb6e8673ef3d4199 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -150,12 +150,12 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in pRes = (SArray*)input; } + if (pRes == NULL || taosArrayGetSize(pRes) == 0) return 0; + // sink if (pTask->sinkType == TASK_SINK__TABLE) { /*blockDebugShowData(pRes);*/ - ASSERT(pTask->tbSink.pTSchema); - SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid); - tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); + pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes); } else if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); //