From 894ff930851c3e45333c6523a17b6e43258b303d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 20 Dec 2021 09:55:56 +0800 Subject: [PATCH] add mock interface --- include/dnode/vnode/tq/tq.h | 22 +++++++++++++++++----- source/dnode/vnode/tq/src/tq.c | 4 +++- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index 6082105700..b8bfd72efa 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -95,14 +95,24 @@ typedef struct STqTopicVhandle { #define TQ_BUFFER_SIZE 8 +typedef struct STqExec { + void* runtimeEnv; + // return type will be SSDataBlock + void* (*exec)(void* runtimeEnv); + // inputData type will be submitblk + void* (*assign)(void* runtimeEnv, void* inputData); + char* (*serialize)(struct STqExec*); + struct STqExec* (*deserialize)(char*); +} STqExec; + typedef struct STqBufferItem { int64_t offset; // executors are identical but not concurrent // so there must be a copy in each item - void* executor; - int32_t status; - int64_t size; - void* content; + STqExec* executor; + int32_t status; + int64_t size; + void* content; } STqMsgItem; typedef struct STqTopic { @@ -248,10 +258,12 @@ typedef struct STQ { STqLogReader* tqLogReader; STqMemRef tqMemRef; STqMetaStore* tqMeta; + STqExec* tqExec; } STQ; // open in each vnode -STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac, + STqExec* tqExec); void tqClose(STQ*); // void* will be replace by a msg type diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index 88fa54cd8a..7beab8a983 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -35,7 +35,8 @@ void* tqSerializeItem(STqMsgItem* pItem, void* ptr); const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic); const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem); -STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac, + STqExec* tqExec) { STQ* pTq = malloc(sizeof(STQ)); if (pTq == NULL) { // TODO: memory error @@ -54,6 +55,7 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA // TODO: free STQ return NULL; } + pTq->tqExec = tqExec; return pTq; } -- GitLab