diff --git a/example/src/tstream.c b/example/src/tstream.c index a606024cf190b92d241c69affbc48ac6ba05a394..62d94b041d5b304e2d4f449436fcf9896a97e395 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -19,9 +19,6 @@ #include #include "taos.h" -static int running = 1; -static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } - int32_t init_env() { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -91,124 +88,6 @@ int32_t create_stream() { return 0; } -void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { - printf("commit %d\n", resp); -} - -tmq_t* build_consumer() { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - tmq_conf_t* conf = tmq_conf_new(); - tmq_conf_set(conf, "group.id", "tg2"); - tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print); - tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); - return tmq; -} - -tmq_list_t* build_topic_list() { - tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "test_stb_topic_1"); - return topic_list; -} - -void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { - tmq_resp_err_t err; - - if ((err = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); - printf("subscribe err\n"); - return; - } - /*int32_t cnt = 0;*/ - /*clock_t startTime = clock();*/ - while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); - if (tmqmessage) { - /*cnt++;*/ - msg_process(tmqmessage); - tmq_message_destroy(tmqmessage); - /*} else {*/ - /*break;*/ - } - } - /*clock_t endTime = clock();*/ - /*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/ - - err = tmq_consumer_close(tmq); - if (err) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); - else - fprintf(stderr, "%% Consumer closed\n"); -} - -void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { - static const int MIN_COMMIT_COUNT = 1; - - int msg_count = 0; - tmq_resp_err_t err; - - if ((err = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); - return; - } - - while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1000); - if (tmqmessage) { - msg_process(tmqmessage); - tmq_message_destroy(tmqmessage); - - if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); - } - } - - err = tmq_consumer_close(tmq); - if (err) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); - else - fprintf(stderr, "%% Consumer closed\n"); -} - -void perf_loop(tmq_t* tmq, tmq_list_t* topics) { - tmq_resp_err_t err; - - if ((err = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); - printf("subscribe err\n"); - return; - } - int32_t batchCnt = 0; - int32_t skipLogNum = 0; - clock_t startTime = clock(); - while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); - if (tmqmessage) { - batchCnt++; - skipLogNum += tmqGetSkipLogNum(tmqmessage); - /*msg_process(tmqmessage);*/ - tmq_message_destroy(tmqmessage); - } else { - break; - } - } - clock_t endTime = clock(); - printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum, - (double)(endTime - startTime) / CLOCKS_PER_SEC); - - err = tmq_consumer_close(tmq); - if (err) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); - else - fprintf(stderr, "%% Consumer closed\n"); -} - int main(int argc, char* argv[]) { int code; if (argc > 1) { diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h deleted file mode 100644 index 6391eaffea70088b389a0e24834e151f8d075dd4..0000000000000000000000000000000000000000 --- a/source/dnode/vnode/inc/tq.h +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TQ_H_ -#define _TQ_H_ - -#include "executor.h" -#include "meta.h" -#include "taoserror.h" -#include "tcommon.h" -#include "tmallocator.h" -#include "tmsg.h" -#include "trpc.h" -#include "ttimer.h" -#include "tutil.h" -#include "vnode.h" -#include "wal.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct STQ STQ; - -// memory allocator provided by vnode -typedef struct { - SMemAllocatorFactory* pAllocatorFactory; - SMemAllocator* pAllocator; -} STqMemRef; - -// init once -int tqInit(); -void tqCleanUp(); - -// open in each vnode -STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); -void tqClose(STQ*); - -// required by vnode -int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version); -int tqCommit(STQ*); - -int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessSetConnReq(STQ* pTq, char* msg); -int32_t tqProcessRebReq(STQ* pTq, char* msg); -int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); - -int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); - -#ifdef __cplusplus -} -#endif - -#endif /*_TQ_H_*/ diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index 9b21cf92ce83a161e56031cf8569f1337f3a180c..4d4bb12a21fdbed28c6dbc7380bd9ae491a77595 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -18,8 +18,8 @@ #include "meta.h" #include "tlog.h" -#include "tq.h" #include "tqPush.h" +#include "vnd.h" #ifdef __cplusplus extern "C" { @@ -153,6 +153,11 @@ typedef struct { FTqDelete pDeleter; } STqMetaStore; +typedef struct { + SMemAllocatorFactory* pAllocatorFactory; + SMemAllocator* pAllocator; +} STqMemRef; + struct STQ { // the collection of groups // the handle of meta kvstore diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index c911f95e4aa4fe28afec3e46178d973ca1e1eec9..ed9aad927771d482c519aac8cc9218f67518c607 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -23,7 +23,6 @@ #include "tlist.h" #include "tlockfree.h" #include "tmacro.h" -#include "tq.h" #include "wal.h" #include "vnode.h" @@ -34,6 +33,8 @@ extern "C" { #endif +typedef struct STQ STQ; + typedef struct SVState SVState; typedef struct SVBufPool SVBufPool; @@ -171,6 +172,25 @@ void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size); void vmaFree(SVMemAllocator* pVMA, void* ptr); bool vmaIsFull(SVMemAllocator* pVMA); +// init once +int tqInit(); +void tqCleanUp(); + +// open in each vnode +STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); +void tqClose(STQ*); + +// required by vnode +int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version); +int tqCommit(STQ*); + +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessSetConnReq(STQ* pTq, char* msg); +int32_t tqProcessRebReq(STQ* pTq, char* msg); +int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); + +int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index d7af04c462d8068a33ac83b3760a585dae55b3d5..ef755b10ece4be22bd15e242efe06f3737fff692 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -14,6 +14,7 @@ */ #include "vnodeQuery.h" +#include "executor.h" #include "vnd.h" static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index d3769b8a30a03bea6aca639cb740f9fe0b840c8c..5443194efd8e90de015ad10a1355cf4782def8de 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -13,12 +13,11 @@ * along with this program. If not, see . */ -#include "tq.h" #include "vnd.h" void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { SNodeMsg *pMsg; - SRpcMsg *pRpc; + SRpcMsg *pRpc; for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i); diff --git a/source/dnode/vnode/test/tqSerializerTest.cpp b/source/dnode/vnode/test/tqSerializerTest.cpp deleted file mode 100644 index 0d76322c17d79ebca9c0164e1762844c5021c4fe..0000000000000000000000000000000000000000 --- a/source/dnode/vnode/test/tqSerializerTest.cpp +++ /dev/null @@ -1,13 +0,0 @@ -#include -#include -#include -#include - -#include "tq.h" - -using namespace std; - -TEST(TqSerializerTest, basicTest) { - TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle)); - -} diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index edacfc339f50396e7be4e9af02952ddf85e6a2bf..19100d7560b6eb3df1519f479b827294891865b9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -16,7 +16,7 @@ #include "executor.h" #include "executorimpl.h" #include "planner.h" -#include "tq.h" +#include "vnode.h" static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) { ASSERT(pOperator != NULL);