diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 7b0606512c91183383a1ff707a159721a32f0359..5ec5b1d58fdb742fe7d8b61e673885f2117b01ec 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -19,15 +19,14 @@ #include "tmallocator.h" // #include "sync.h" #include "tcoding.h" +#include "tdatablock.h" #include "tfs.h" #include "tlist.h" #include "tlockfree.h" #include "tmacro.h" -#include "wal.h" - #include "vnode.h" - #include "vnodeQuery.h" +#include "wal.h" #ifdef __cplusplus extern "C" { @@ -203,7 +202,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); // sma -void smaHandleRes(SVnode* pVnode, int64_t smaId, const SArray* data); +void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 55202335e05ff020ad8a3298cef0cd50fa162640..efc7ac80e930221804190fc17ed6a6d2bf20ba87 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -473,10 +473,16 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { } tCoderClear(&decoder); + // exec if (tqExpandTask(pTq, pTask, 4) < 0) { ASSERT(0); } + + // sink pTask->ahandle = pTq->pVnode; + if (pTask->sinkType == TASK_SINK__SMA) { + pTask->smaSink.smaHandle = smaHandleRes; + } taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 98256e6b248f4a62c6a810f47e6004a0a926f967..9b1bb61d070a424b43bd7cd46253dd81f1a17d35 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -15,6 +15,11 @@ #include "vnd.h" +void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { + // TODO + blockDebugShowData(data); +} + void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { SNodeMsg *pMsg; SRpcMsg *pRpc;