diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index c0de75c6dd067021f1088b3501910cef95ea3fe5..68caf9a9acde518be86c143168245e1d01a4a389 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG aa45ad4 + GIT_TAG 9cb965f SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index 9d06dbac6dc3ba9d4dcafe6d8316b52e1b3daeca..4a9007acecaa679dc716c5665eea7f0cd1e34dbb 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -13,15 +13,9 @@ IF (TD_LINUX) #TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua) add_executable(tmq "") - add_executable(tmq_taosx "") add_executable(stream_demo "") add_executable(demoapi "") - target_sources(tmq_taosx - PRIVATE - "tmq_taosx.c" - ) - target_sources(tmq PRIVATE "tmq.c" @@ -41,10 +35,6 @@ IF (TD_LINUX) taos_static ) - target_link_libraries(tmq_taosx - taos_static - ) - target_link_libraries(stream_demo taos_static ) @@ -57,10 +47,6 @@ IF (TD_LINUX) PUBLIC "${TD_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) - target_include_directories(tmq_taosx - PUBLIC "${TD_SOURCE_DIR}/include/os" - PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" - ) target_include_directories(stream_demo PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" @@ -73,7 +59,6 @@ IF (TD_LINUX) ) SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq) - SET_TARGET_PROPERTIES(tmq_taosx PROPERTIES OUTPUT_NAME tmq_taosx) SET_TARGET_PROPERTIES(stream_demo PROPERTIES OUTPUT_NAME stream_demo) SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi) ENDIF () diff --git a/examples/c/tmq_taosx.c b/examples/c/tmq_taosx.c deleted file mode 100644 index 491eda1ddba9db9a7f574d21fd072a57dadb9dd1..0000000000000000000000000000000000000000 --- a/examples/c/tmq_taosx.c +++ /dev/null @@ -1,489 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include -#include -#include -#include -#include -#include "taos.h" - -static int running = 1; - -static TAOS* use_db(){ - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { - return NULL; - } - - TAOS_RES* pRes = taos_query(pConn, "use db_taosx"); - if (taos_errno(pRes) != 0) { - printf("error in use db_taosx, reason:%s\n", taos_errstr(pRes)); - return NULL; - } - taos_free_result(pRes); - return pConn; -} - -static void msg_process(TAOS_RES* msg) { - /*memset(buf, 0, 1024);*/ - printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg)); - printf("db: %s\n", tmq_get_db_name(msg)); - printf("vg: %d\n", tmq_get_vgroup_id(msg)); - TAOS *pConn = use_db(); - if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { - char* result = tmq_get_json_meta(msg); - if (result) { - printf("meta result: %s\n", result); - } - tmq_free_json_meta(result); - } - - tmq_raw_data raw = {0}; - tmq_get_raw(msg, &raw); - int32_t ret = tmq_write_raw(pConn, raw); - printf("write raw data: %s\n", tmq_err2str(ret)); - -// else{ -// while(1){ -// int numOfRows = 0; -// void *pData = NULL; -// taos_fetch_raw_block(msg, &numOfRows, &pData); -// if(numOfRows == 0) break; -// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows); -// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg)); -// printf("write raw data: %s\n", tmq_err2str(ret)); -// } -// } - - taos_close(pConn); -} - -int32_t init_env() { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { - return -1; - } - - TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx"); - if (taos_errno(pRes) != 0) { - printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 4"); - if (taos_errno(pRes) != 0) { - printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop database if exists abc1"); - if (taos_errno(pRes) != 0) { - printf("error in drop db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create database if not exists abc1 vgroups 3"); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, - "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " - "nchar(8), t4 bool)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct0 values(1626006833600, 1, 2, 'a')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct4 using st1(t3) tags('ct4')"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table ct4, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table st1 add column c4 bigint"); - if (taos_errno(pRes) != 0) { - printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)"); - if (taos_errno(pRes) != 0) { - printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct3 select * from ct1"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)"); - if (taos_errno(pRes) != 0) { - printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table ct3 set tag t1=5000"); - if (taos_errno(pRes) != 0) { - printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop table ct3 ct1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop table st1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); - if (taos_errno(pRes) != 0) { - printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 add column c3 bigint"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 rename column c3 cc3"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 comment 'hello'"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 drop column c1"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop table n1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table jt2 using jt tags('')"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, - "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " - "nchar(8), t4 bool)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop table st1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - taos_close(pConn); - return 0; -} - -int32_t create_topic() { - printf("create topic\n"); - TAOS_RES* pRes; - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { - return -1; - } - - pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - taos_close(pConn); - return 0; -} - -void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { - printf("commit %d tmq %p param %p\n", code, tmq, param); -} - -tmq_t* build_consumer() { -#if 0 - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); -#endif - - tmq_conf_t* conf = tmq_conf_new(); - tmq_conf_set(conf, "group.id", "tg2"); - tmq_conf_set(conf, "client.id", "my app 1"); - tmq_conf_set(conf, "td.connect.user", "root"); - tmq_conf_set(conf, "td.connect.pass", "taosdata"); - tmq_conf_set(conf, "msg.with.table.name", "true"); - tmq_conf_set(conf, "enable.auto.commit", "true"); - tmq_conf_set(conf, "experimental.snapshot.enable", "true"); - - - /*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/ - - tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); - tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); - assert(tmq); - tmq_conf_destroy(conf); - return tmq; -} - -tmq_list_t* build_topic_list() { - tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "topic_ctb_column"); - /*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/ - return topic_list; -} - -void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { - int32_t code; - - if ((code = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); - printf("subscribe err\n"); - return; - } - int32_t cnt = 0; - while (running) { - TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); - if (tmqmessage) { - cnt++; - msg_process(tmqmessage); - /*if (cnt >= 2) break;*/ - /*printf("get data\n");*/ - taos_free_result(tmqmessage); - /*} else {*/ - /*break;*/ - /*tmq_commit_sync(tmq, NULL);*/ - } - } - - code = tmq_consumer_close(tmq); - if (code) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); - else - fprintf(stderr, "%% Consumer closed\n"); -} - -void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { - static const int MIN_COMMIT_COUNT = 1; - - int msg_count = 0; - int32_t code; - - if ((code = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); - return; - } - - tmq_list_t* subList = NULL; - tmq_subscription(tmq, &subList); - char** subTopics = tmq_list_to_c_array(subList); - int32_t sz = tmq_list_get_size(subList); - printf("subscribed topics: "); - for (int32_t i = 0; i < sz; i++) { - printf("%s, ", subTopics[i]); - } - printf("\n"); - tmq_list_destroy(subList); - - while (running) { - TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); - if (tmqmessage) { - msg_process(tmqmessage); - taos_free_result(tmqmessage); - - /*tmq_commit_sync(tmq, NULL);*/ - /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ - } - } - - code = tmq_consumer_close(tmq); - if (code) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); - else - fprintf(stderr, "%% Consumer closed\n"); -} - -int main(int argc, char* argv[]) { - printf("env init\n"); - if (init_env() < 0) { - return -1; - } - create_topic(); - - tmq_t* tmq = build_consumer(); - tmq_list_t* topic_list = build_topic_list(); - basic_consume_loop(tmq, topic_list); - /*sync_consume_loop(tmq, topic_list);*/ -} diff --git a/include/libs/function/function.h b/include/libs/function/function.h index c8db01625e4697117d0343c3acddbf9d6daac374..3f26eee86ad3f1b4666c55283ad346f60a7b4f31 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -92,6 +92,8 @@ struct SResultRowEntryInfo; //for selectivity query, the corresponding tag value is assigned if the data is qualified typedef struct SSubsidiaryResInfo { int16_t num; + int32_t rowLen; + char* buf; // serialize data buffer struct SqlFunctionCtx **pCtx; } SSubsidiaryResInfo; @@ -118,6 +120,11 @@ typedef struct SInputColumnInfoData { uint64_t uid; // table uid, used to set the tag value when building the final query result for selectivity functions. } SInputColumnInfoData; +typedef struct SSerializeDataHandle { + struct SDiskbasedBuf* pBuf; + int32_t currentPage; +} SSerializeDataHandle; + // sql function runtime context typedef struct SqlFunctionCtx { SInputColumnInfoData input; @@ -137,10 +144,9 @@ typedef struct SqlFunctionCtx { SFuncExecFuncs fpSet; SScalarFuncExecFuncs sfp; struct SExprInfo *pExpr; - struct SDiskbasedBuf *pBuf; struct SSDataBlock *pSrcBlock; struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity - int32_t curBufPage; + SSerializeDataHandle saveHandle; bool isStream; char udfName[TSDB_FUNC_NAME_LEN]; diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index 57a489c0dd269e065005fd359cde32b979913b75..9ab89273e6895c2ea322fa116c06332a431028bc 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -58,11 +58,10 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem /** * * @param pBuf - * @param groupId * @param pageId * @return */ -void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId); +void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId); /** * diff --git a/packaging/MPtestJenkinsfile b/packaging/MPtestJenkinsfile index a003bf354c4a8f28d9c50769cb1badb772fae9a8..45c8d8abf24877d30af67cb0b34151278612f57c 100644 --- a/packaging/MPtestJenkinsfile +++ b/packaging/MPtestJenkinsfile @@ -182,7 +182,6 @@ pipeline { cd ${TDENGINE_ROOT_DIR}/packaging bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server python3 checkPackageRuning.py - rmtaos ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging diff --git a/packaging/testpackage.sh b/packaging/testpackage.sh index 054c24eb5d42a959bd2c9df86b8fff820d3746fd..4b6264db2b186155d151962b72d740c8740ea8fe 100755 --- a/packaging/testpackage.sh +++ b/packaging/testpackage.sh @@ -7,6 +7,7 @@ originPackageName=$3 originversion=$4 testFile=$5 subFile="taos.tar.gz" +password=$6 if [ ${testFile} = "server" ];then tdPath="TDengine-server-${version}" @@ -56,6 +57,7 @@ fi cmdInstall tree cmdInstall wget +cmdInstall sshpass echo "new workroom path" installPath="/usr/local/src/packageTest" @@ -74,24 +76,49 @@ else echo "${oriInstallPath} already exists" fi -echo "decompress installPackage" -cd ${installPath} -wget https://www.taosdata.com/assets-download/3.0/${packgeName} -cd ${oriInstallPath} -wget https://www.taosdata.com/assets-download/3.0/${originPackageName} +echo "download installPackage" +# cd ${installPath} +# wget https://www.taosdata.com/assets-download/3.0/${packgeName} +# cd ${oriInstallPath} +# wget https://www.taosdata.com/assets-download/3.0/${originPackageName} + +cd ${installPath} +if [ ! -f {packgeName} ];then + sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/${packgeName} . +fi +if [ ! -f debAuto.sh ];then + echo '#!/usr/bin/expect ' > debAuto.sh + echo 'set timeout 3 ' >> debAuto.sh + echo 'pset packgeName [lindex $argv 0]' >> debAuto.sh + echo 'spawn dpkg -i ${packgeName}' >> debAuto.sh + echo 'expect "*one:"' >> debAuto.sh + echo 'send "\r"' >> debAuto.sh + echo 'expect "*skip:"' >> debAuto.sh + echo 'send "\r" ' >> debAuto.sh +fi + if [[ ${packgeName} =~ "deb" ]];then cd ${installPath} - echo "dpkg ${packgeName}" && dpkg -i ${packgeName} + dpkg -r taostools + dpkg -r tdengine + if [[ ${packgeName} =~ "TDengine" ]];then + echo "./debAuto.sh ${packgeName}" && chmod 755 debAuto.sh && ./debAuto.sh ${packgeName} + else + echo "dpkg -i ${packgeName}" && dpkg -i ${packgeName} + elif [[ ${packgeName} =~ "rpm" ]];then cd ${installPath} - echo "rpm ${packgeName}" && rpm -ivh ${packgeName} + echo "rpm ${packgeName}" && rpm -ivh ${packgeName} --quiet elif [[ ${packgeName} =~ "tar" ]];then - echo "tar ${packgeName}" && tar -xvf ${packgeName} - cd ${oriInstallPath} + cd ${oriInstallPath} + if [ ! -f {originPackageName} ];then + sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${originversion}/community${originPackageName} . + fi echo "tar -xvf ${originPackageName}" && tar -xvf ${originPackageName} + cd ${installPath} echo "tar -xvf ${packgeName}" && tar -xvf ${packgeName} @@ -105,10 +132,10 @@ elif [[ ${packgeName} =~ "tar" ]];then cd ${installPath} - tree ${oriInstallPath}/${originTdpPath} > ${originPackageName}_checkfile - tree ${installPath}/${tdPath} > ${packgeName}_checkfile + tree ${oriInstallPath}/${originTdpPath} > ${oriInstallPath}/${originPackageName}_checkfile + tree ${installPath}/${tdPath} > ${installPath}/${packgeName}_checkfile - diff ${packgeName}_checkfile ${originPackageName}_checkfile > ${installPath}/diffFile.log + diff ${installPath}/${packgeName}_checkfile ${oriInstallPath}/${originPackageName}_checkfile > ${installPath}/diffFile.log diffNumbers=`cat ${installPath}/diffFile.log |wc -l ` if [ ${diffNumbers} != 0 ];then echo "The number and names of files have changed from the previous installation package" @@ -122,11 +149,20 @@ elif [[ ${packgeName} =~ "tar" ]];then else bash ${installCmd} fi - if [[ ${packgeName} =~ "Lite" ]];then + if [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "tar" ]] ;then cd ${installPath} - wget https://www.taosdata.com/assets-download/3.0/taosTools-2.1.2-Linux-x64.tar.gz + sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.tar.gz . + # wget https://www.taosdata.com/assets-download/3.0/taosTools-2.1.2-Linux-x64.tar.gz tar xvf taosTools-2.1.2-Linux-x64.tar.gz cd taosTools-2.1.2 && bash install-taostools.sh + elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "deb" ]] ;then + cd ${installPath} + sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.deb . + dpkg -i taosTools-2.1.2-Linux-x64.deb + elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "rpm" ]] ;then + cd ${installPath} + sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.rpm . + rpm -ivh taosTools-2.1.2-Linux-x64.rpm --quiet fi fi diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 7d259fe06c26bfdd82b595ba49d3fefb7a181598..18d839e1091e3fc5f1be2939a22345efe8ea8579 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -471,6 +471,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE TABLE `%s` (", tbName); appendColumnFields(buf2, &len, pCfg); len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")"); + appendTableOptions(buf2, &len, pDbCfg, pCfg); } varDataLen(buf2) = len; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 7eb02308decc09a139ef91e31c7d3dcdccc55532..f0518a72ab38d5ab902d4013c66f560a6b0df4e8 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -303,6 +303,7 @@ typedef struct SAggSupporter { char* keyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + int32_t currentPageId; // current write page id } SAggSupporter; typedef struct { @@ -327,7 +328,6 @@ typedef struct STableScanInfo { SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; -// SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. SSampleExecInfo sample; // sample execution info int32_t currentGroupId; int32_t currentTable; @@ -431,6 +431,7 @@ typedef struct SStreamAggSupporter { char* pKeyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + int32_t currentPageId; // buffer page that is active SSDataBlock* pScanBlock; } SStreamAggSupporter; @@ -1009,7 +1010,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t size); -SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); +SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize); SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex); SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 3b3ef9e3debe8386740cdcbe26e189ef3397ee05..80c1494f8dc53c38bd8029eb69effd2ee270bf3a 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -46,8 +46,8 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { rowSize += pCtx[i].resDataInfo.interBufSize; } - rowSize += - (numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(doSaveTupleData) + rowSize += (numOfOutput * sizeof(bool)); + // expand rowSize to mark if col is null for top/bottom result(saveTupleData) return rowSize; } @@ -1178,7 +1178,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, SqlFunctionCtx* pCtx = &pFuncCtx[i]; pCtx->functionId = -1; - pCtx->curBufPage = -1; pCtx->pExpr = pExpr; if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) { @@ -1222,6 +1221,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->isStream = false; pCtx->param = pFunct->pParam; + pCtx->saveHandle.currentPage = -1; } for (int32_t i = 1; i < numOfOutput; ++i) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e79a9fa16e6db7c9743fd98ece11cf27e8d80dd1..4ffa80d468d8fca53715aa71c20b2fe13a1defeb 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -179,26 +179,23 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR } #endif -SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) { +SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) { SFilePage* pData = NULL; // in the first scan, new space needed for results int32_t pageId = -1; - SIDList list = getDataBufPagesIdList(pResultBuf); - - if (taosArrayGetSize(list) == 0) { - pData = getNewBufPage(pResultBuf, tableGroupId, &pageId); + if (*currentPageId == -1) { + pData = getNewBufPage(pResultBuf, &pageId); pData->num = sizeof(SFilePage); } else { - SPageInfo* pi = getLastPageInfo(list); - pData = getBufPage(pResultBuf, getPageId(pi)); - pageId = getPageId(pi); + pData = getBufPage(pResultBuf, *currentPageId); + pageId = *currentPageId; if (pData->num + interBufSize > getBufPageSize(pResultBuf)) { // release current page first, and prepare the next one - releaseBufPageInfo(pResultBuf, pi); + releaseBufPage(pResultBuf, pData); - pData = getNewBufPage(pResultBuf, tableGroupId, &pageId); + pData = getNewBufPage(pResultBuf, &pageId); if (pData != NULL) { pData->num = sizeof(SFilePage); } @@ -215,9 +212,9 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num); pResultRow->pageId = pageId; pResultRow->offset = (int32_t)pData->num; + *currentPageId = pageId; pData->num += interBufSize; - return pResultRow; } @@ -263,11 +260,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // allocate a new buffer page if (pResult == NULL) { -#ifdef BUF_PAGE_DEBUG - qDebug("page_2"); -#endif ASSERT(pSup->resultRowSize > 0); - pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize); + pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize); initResultRow(pResult); @@ -302,7 +296,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes SIDList list = getDataBufPagesIdList(pResultBuf); if (taosArrayGetSize(list) == 0) { - pData = getNewBufPage(pResultBuf, tid, &pageId); + pData = getNewBufPage(pResultBuf, &pageId); pData->num = sizeof(SFilePage); } else { SPageInfo* pi = getLastPageInfo(list); @@ -313,7 +307,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes // release current page first, and prepare the next one releaseBufPageInfo(pResultBuf, pi); - pData = getNewBufPage(pResultBuf, tid, &pageId); + pData = getNewBufPage(pResultBuf, &pageId); if (pData != NULL) { pData->num = sizeof(SFilePage); } @@ -3092,7 +3086,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { offset += sizeof(int32_t); uint64_t tableGroupId = *(uint64_t*)(result + offset); - SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); + SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize); if (!resultRow) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -3440,8 +3434,10 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey) { + int32_t code = 0; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pAggSup->currentPageId = -1; pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn); @@ -3455,18 +3451,18 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_AVAIL_DISK; - qError("Init stream agg supporter failed since %s", terrstr(terrno)); - return terrno; + code = TSDB_CODE_NO_AVAIL_DISK; + qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey); + return code; } - int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir); + code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir); if (code != TSDB_CODE_SUCCESS) { - qError("Create agg result buf failed since %s", tstrerror(code)); + qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey); return code; } - return TSDB_CODE_SUCCESS; + return code; } void cleanupAggSup(SAggSupporter* pAggSup) { @@ -3488,7 +3484,7 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf } for (int32_t i = 0; i < numOfCols; ++i) { - pSup->pCtx[i].pBuf = pAggSup->pResultBuf; + pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf; } return TSDB_CODE_SUCCESS; @@ -3520,6 +3516,7 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { } taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx); + taosMemoryFreeClear(pCtx[i].subsidiaries.buf); taosMemoryFree(pCtx[i].input.pData); taosMemoryFree(pCtx[i].input.pColumnDataAgg); } @@ -4678,6 +4675,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t size) { + pSup->currentPageId = -1; pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pSup->keySize = sizeof(int64_t) + sizeof(TSKEY); pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize); @@ -4705,7 +4703,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF } int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir); for (int32_t i = 0; i < numOfOutput; ++i) { - pCtx[i].pBuf = pSup->pResultBuf; + pCtx[i].saveHandle.pBuf = pSup->pResultBuf; } + return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9d7e833b19c05ebdf3bb72d9a28cdb28a28104cd..5d123f723e01d98faa791e612f95235ffef5a04d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -547,7 +547,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len); int32_t pageId = 0; - pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); + pPage = getNewBufPage(pInfo->pBuf, &pageId); taosArrayPush(p->pPageList, &pageId); *(int32_t *) pPage = 0; @@ -562,7 +562,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf // add a new page for current group int32_t pageId = 0; - pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); + pPage = getNewBufPage(pInfo->pBuf, &pageId); taosArrayPush(p->pPageList, &pageId); memset(pPage, 0, getBufPageSize(pInfo->pBuf)); } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 0661ccd3902bc0ba653e988cf6a03f91d2c6c68f..2f12a0d19bdf74e7b0b2ab94c373a31cbe7d8316 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -195,16 +195,6 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS return PROJECT_RETRIEVE_DONE; } -void printDataBlock1(SSDataBlock* pBlock, const char* flag) { - if (!pBlock || pBlock->info.rows == 0) { - qDebug("===stream===printDataBlock: Block is Null or Empty"); - return; - } - char* pBuf = NULL; - qDebug("%s", dumpBlockData(pBlock, flag, &pBuf)); - taosMemoryFreeClear(pBuf); -} - SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b97970aeefe6a6db97ba024817cb7268dea9b429..152bd5939dbe8b5bed819f19f26b9c14d9cc4475 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1828,12 +1828,6 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt return needed; } -void increaseTs(SqlFunctionCtx* pCtx) { - if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTART) { -// pCtx[0].increase = true; - } -} - void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup) { if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { // Todo(liuyao) support partition by column @@ -1895,7 +1889,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* if (isStream) { ASSERT(numOfCols > 0); - increaseTs(pSup->pCtx); initStreamFunciton(pSup->pCtx, pSup->numOfExprs); } @@ -3050,6 +3043,7 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo) tSimpleHashClear(pInfo->aggSup.pResultRowHashTable); clearDiskbasedBuf(pInfo->aggSup.pResultBuf); initResultRowInfo(&pInfo->binfo.resultRowInfo); + pInfo->aggSup.currentPageId = -1; } static void clearSpecialDataBlock(SSDataBlock* pBlock) { @@ -3420,7 +3414,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, initBasicInfo(&pInfo->binfo, pResBlock); ASSERT(numOfCols > 0); - increaseTs(pOperator->exprSupp.pCtx); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -3451,6 +3444,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, // semi interval operator does not catch result pInfo->isFinal = false; pOperator->name = "StreamSemiIntervalOperator"; + ASSERT(pInfo->aggSup.currentPageId == -1); } if (!IS_FINAL_OP(pInfo) || numOfChild == 0) { @@ -3559,11 +3553,10 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* initBasicInfo(pBasicInfo, pResultBlock); for (int32_t i = 0; i < numOfCols; ++i) { - pSup->pCtx[i].pBuf = NULL; + pSup->pCtx[i].saveHandle.pBuf = NULL; } ASSERT(numOfCols > 0); - increaseTs(pSup->pCtx); return TSDB_CODE_SUCCESS; } @@ -3820,7 +3813,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes } if (pWinInfo->pos.pageId == -1) { - *pResult = getNewResultRow(pAggSup->pResultBuf, groupId, pAggSup->resultRowSize); + *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize); if (*pResult == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -4337,6 +4330,7 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) { } } clearDiskbasedBuf(pInfo->streamAggSup.pResultBuf); + pInfo->streamAggSup.currentPageId = -1; } static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) { diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index e0752840db07052e056063b5789003cf9b6507e0..cffabcb6aca1f1f5ba457fb765828889bc3c03e6 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -97,7 +97,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t // allocate the overflow buffer page to hold this k/v. int32_t newPageId = -1; - SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId); + SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, &newPageId); if (pNewPage == NULL) { return terrno; } @@ -227,7 +227,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) { } int32_t pageId = -1; - SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId); + SFilePage* p = getNewBufPage(pHashObj->pBuf, &pageId); if (p == NULL) { return terrno; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index fc411e850a814fdf1ace8d1e27c3245871278a92..168cd21c4478d9c1b50053fadf0e9dcdf518d4f4 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -180,7 +180,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { } int32_t pageId = -1; - void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId); + void* pPage = getNewBufPage(pHandle->pBuf, &pageId); if (pPage == NULL) { blockDataDestroy(p); return terrno; @@ -512,7 +512,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } int32_t pageId = -1; - void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId); + void* pPage = getNewBufPage(pHandle->pBuf, &pageId); if (pPage == NULL) { return terrno; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 32d0472a50c596ed4a125a961555124aa408be3f..b71d06231e78c4edd904dd72d546284b51dd89ac 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1146,8 +1146,9 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -static void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); -static void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); +static STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock); +static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); +static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos); static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) { // the data is loaded, not only the block SMA value @@ -1199,7 +1200,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { pBuf->v = *(int64_t*)tval; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); } } else { if (IS_SIGNED_NUMERIC_TYPE(type)) { @@ -1211,7 +1212,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { *(int64_t*)&pBuf->v = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); } } @@ -1224,7 +1225,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { *(uint64_t*)&pBuf->v = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); } } } else if (type == TSDB_DATA_TYPE_DOUBLE) { @@ -1236,7 +1237,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { *(double*)&pBuf->v = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); } } } else if (type == TSDB_DATA_TYPE_FLOAT) { @@ -1250,7 +1251,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); - doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); } } } @@ -1275,7 +1276,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); } pBuf->assign = true; } else { @@ -1287,7 +1288,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1306,7 +1307,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); } pBuf->assign = true; } else { @@ -1318,7 +1319,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1337,7 +1338,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); } pBuf->assign = true; } else { @@ -1349,7 +1350,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1368,7 +1369,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); } pBuf->assign = true; } else { @@ -1380,7 +1381,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1401,7 +1402,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); } pBuf->assign = true; } else { @@ -1413,7 +1414,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1432,7 +1433,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); } pBuf->assign = true; } else { @@ -1444,7 +1445,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1463,7 +1464,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); } pBuf->assign = true; } else { @@ -1475,7 +1476,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1494,7 +1495,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); } pBuf->assign = true; } else { @@ -1506,7 +1507,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1526,7 +1527,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); } pBuf->assign = true; } else { @@ -1538,7 +1539,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1557,7 +1558,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if (!pBuf->assign) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); } pBuf->assign = true; } else { @@ -1569,7 +1570,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) { *val = pData[i]; if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); + updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); } } } @@ -1580,7 +1581,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { _min_max_over: if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved ) { - doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pBuf->nullTuplePos); + pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); pBuf->nullTupleSaved = true; } return numOfElems; @@ -1599,8 +1600,7 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { } static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex); - -static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rIndex); +static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex); int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); @@ -1648,34 +1648,29 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple return; } - int32_t pageId = pTuplePos->pageId; - int32_t offset = pTuplePos->offset; - - if (pTuplePos->pageId != -1) { - int32_t numOfCols = pCtx->subsidiaries.num; - SFilePage* pPage = getBufPage(pCtx->pBuf, pageId); + if (pCtx->saveHandle.pBuf != NULL) { + if (pTuplePos->pageId != -1) { + int32_t numOfCols = pCtx->subsidiaries.num; + const char* p = loadTupleData(pCtx, pTuplePos); - bool* nullList = (bool*)((char*)pPage + offset); - char* pStart = (char*)(nullList + numOfCols * sizeof(bool)); - - // todo set the offset value to optimize the performance. - for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { - SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + bool* nullList = (bool*)p; + char* pStart = (char*)(nullList + numOfCols * sizeof(bool)); - SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; - int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; + // todo set the offset value to optimize the performance. + for (int32_t j = 0; j < numOfCols; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; - SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); - ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes); - if (nullList[j]) { - colDataAppendNULL(pDstCol, rowIndex); - } else { - colDataAppend(pDstCol, rowIndex, pStart, false); + SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); + ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes); + if (nullList[j]) { + colDataAppendNULL(pDstCol, rowIndex); + } else { + colDataAppend(pDstCol, rowIndex, pStart, false); + } + pStart += pDstCol->info.bytes; } - pStart += pDstCol->info.bytes; } - - releaseBufPage(pCtx->pBuf, pPage); } } @@ -2756,15 +2751,15 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex); } -static void saveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) { +static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) { if (pCtx->subsidiaries.num <= 0) { return; } if (!pInfo->hasResult) { - doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); + pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock); } else { - doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); + updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); } } @@ -2778,7 +2773,7 @@ static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t cur memcpy(pInfo->buf, pData, pInfo->bytes); pInfo->ts = currentTs; - saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); pInfo->hasResult = true; } @@ -2982,7 +2977,7 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S pOutput->bytes = pInput->bytes; memcpy(pOutput->buf, pInput->buf, pOutput->bytes); - saveTupleData(pCtx->pSrcBlock, start, pCtx, pOutput); + firstlastSaveTupleData(pCtx->pSrcBlock, start, pCtx, pOutput); pOutput->hasResult = true; } @@ -3087,7 +3082,7 @@ static void doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, i } pInfo->ts = cts; - saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); + firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); pInfo->hasResult = true; } @@ -3420,7 +3415,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { } if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { - doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos); + pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); pRes->nullTupleSaved = true; } return TSDB_CODE_SUCCESS; @@ -3448,7 +3443,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { } if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { - doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos); + pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); pRes->nullTupleSaved = true; } @@ -3500,7 +3495,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // save the data of this tuple if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock); } #ifdef BUF_PAGE_DEBUG qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, @@ -3524,7 +3519,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // save the data of this tuple by over writing the old data if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); + updateTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); } #ifdef BUF_PAGE_DEBUG qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset); @@ -3541,38 +3536,13 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData * |(n columns, one bit for each column)| src column #1| src column #2| * +------------------------------------+--------------+--------------+ */ -void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { - SFilePage* pPage = NULL; - - // todo refactor: move away - int32_t completeRowSize = pCtx->subsidiaries.num * sizeof(bool); - for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { - SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; - completeRowSize += pc->pExpr->base.resSchema.bytes; - } - - if (pCtx->curBufPage == -1) { - pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage); - pPage->num = sizeof(SFilePage); - } else { - pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage); - if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) { - // current page is all used, let's prepare a new buffer page - releaseBufPage(pCtx->pBuf, pPage); - pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage); - pPage->num = sizeof(SFilePage); - } - } +void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsidiaryResInfo* pSubsidiaryies, char* buf) { + char* nullList = buf; + char* pStart = (char*)(nullList + sizeof(bool) * pSubsidiaryies->num); - pPos->pageId = pCtx->curBufPage; - pPos->offset = pPage->num; - - // keep the current row data, extract method int32_t offset = 0; - bool* nullList = (bool*)((char*)pPage + pPage->num); - char* pStart = (char*)(nullList + sizeof(bool) * pCtx->subsidiaries.num); - for (int32_t i = 0; i < pCtx->subsidiaries.num; ++i) { - SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i]; + for (int32_t i = 0; i < pSubsidiaryies->num; ++i) { + SqlFunctionCtx* pc = pSubsidiaryies->pCtx[i]; SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; int32_t srcSlotId = pFuncParam->pCol->slotId; @@ -3593,50 +3563,88 @@ void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* offset += pCol->info.bytes; } - pPage->num += completeRowSize; - - setBufPageDirty(pPage, true); - releaseBufPage(pCtx->pBuf, pPage); -#ifdef BUF_PAGE_DEBUG - qDebug("page_saveTuple pos:%p,pageId:%d, offset:%d\n", pPos, pPos->pageId, pPos->offset); -#endif + return buf; } -void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { - SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId); +static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length) { + STuplePos p = {0}; + if (pHandle->pBuf != NULL) { + SFilePage* pPage = NULL; - int32_t numOfCols = pCtx->subsidiaries.num; + if (pHandle->currentPage == -1) { + pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); + pPage->num = sizeof(SFilePage); + } else { + pPage = getBufPage(pHandle->pBuf, pHandle->currentPage); + if (pPage->num + length > getBufPageSize(pHandle->pBuf)) { + // current page is all used, let's prepare a new buffer page + releaseBufPage(pHandle->pBuf, pPage); + pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); + pPage->num = sizeof(SFilePage); + } + } - bool* nullList = (bool*)((char*)pPage + pPos->offset); - char* pStart = (char*)(nullList + numOfCols * sizeof(bool)); + p = (STuplePos) {.pageId = pHandle->currentPage, .offset = pPage->num}; + memcpy(pPage->data + pPage->num, pBuf, length); - int32_t offset = 0; - for (int32_t i = 0; i < numOfCols; ++i) { - SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i]; - SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; - int32_t srcSlotId = pFuncParam->pCol->slotId; + pPage->num += length; + setBufPageDirty(pPage, true); + releaseBufPage(pHandle->pBuf, pPage); + } else { + // other tuple save policy + } - SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId); - if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) { - offset += pCol->info.bytes; - continue; - } + return p; +} - char* p = colDataGetData(pCol, rowIndex); - if (IS_VAR_DATA_TYPE(pCol->info.type)) { - memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p) : varDataTLen(p)); - } else { - memcpy(pStart + offset, p, pCol->info.bytes); +STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock) { + if (pCtx->subsidiaries.rowLen == 0) { + int32_t rowLen = 0; + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + rowLen += pc->pExpr->base.resSchema.bytes; } - offset += pCol->info.bytes; + pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool); + pCtx->subsidiaries.buf = taosMemoryMalloc(pCtx->subsidiaries.rowLen); } - setBufPageDirty(pPage, true); - releaseBufPage(pCtx->pBuf, pPage); -#ifdef BUF_PAGE_DEBUG - qDebug("page_copyTuple pos:%p, pageId:%d, offset:%d", pPos, pPos->pageId, pPos->offset); -#endif + char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); + return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen); +} + +static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) { + if (pHandle->pBuf != NULL) { + SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); + memcpy(pPage->data + pPos->offset, pBuf, length); + setBufPageDirty(pPage, true); + releaseBufPage(pHandle->pBuf, pPage); + } else { + + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { + char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); + doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos); + return TSDB_CODE_SUCCESS; +} + +static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) { + if (pHandle->pBuf != NULL) { + SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); + char* p = pPage->data + pPos->offset; + releaseBufPage(pHandle->pBuf, pPage); + return p; + } else { + return NULL; + } +} + +static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos) { + return doLoadTupleData(&pCtx->saveHandle, pPos); } int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { @@ -3788,8 +3796,6 @@ int32_t spreadFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pCol = pInput->pData[0]; int32_t start = pInput->startRowIndex; - int32_t numOfRows = pInput->numOfRows; - // check the valid data one by one for (int32_t i = start; i < pInput->numOfRows + start; ++i) { if (colDataIsNull_f(pCol->nullbitmap, i)) { @@ -4964,7 +4970,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da if (pInfo->numSampled < pInfo->samples) { sampleAssignResult(pInfo, data, pInfo->numSampled); if (pCtx->subsidiaries.num > 0) { - doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]); + pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock); } pInfo->numSampled++; } else { @@ -4972,7 +4978,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da if (j < pInfo->samples) { sampleAssignResult(pInfo, data, j); if (pCtx->subsidiaries.num > 0) { - doCopyTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]); + updateTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]); } } } @@ -4995,7 +5001,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { } if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { - doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos); + pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); pInfo->nullTupleSaved = true; } diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index dbe0b6bb3ae887a2f9ac78fc98274ca5bbd1f867..4c58c0abe50e5784314445934618265231d4805a 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -372,7 +372,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { pPageIdList = pList; } - pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId); + pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId); pSlot->info.pageId = pageId; taosArrayPush(pPageIdList, &pageId); } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 56fbafe76d768144709666cda457aa232eeab662..049d1ef54526ba73cde02d82d3b1d8a6779286e6 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1669,6 +1669,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache pDb = taosHashIterate(context.pDbFNameHashObj, pDb); } } + if (pContext->pStmtCb) { + context.pVgroupsHashObj = NULL; + context.pTableBlockHashObj = NULL; + } destroyInsertParseContext(&context); return code; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 0667c5f5b9e2984d119dab5297abb16e374977fb..bf72f5210577d6f43f8ae97d098091b3020aeb16 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -197,28 +197,21 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols return SCAN_TYPE_TABLE; } -static SNode* createPrimaryKeyCol(uint64_t tableId) { +static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { return NULL; } - pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; - pCol->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; + pCol->node.resType.type = pSchema->type; + pCol->node.resType.bytes = pSchema->bytes; pCol->tableId = tableId; - pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + pCol->colId = pSchema->colId; pCol->colType = COLUMN_TYPE_COLUMN; - strcpy(pCol->colName, "#primarykey"); + strcpy(pCol->colName, pSchema->name); return (SNode*)pCol; } -static int32_t addPrimaryKeyCol(uint64_t tableId, SNodeList** pCols) { - if (NULL == *pCols) { - *pCols = nodesMakeList(); - if (NULL == *pCols) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } - +static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) { bool found = false; SNode* pCol = NULL; FOREACH(pCol, *pCols) { @@ -229,13 +222,25 @@ static int32_t addPrimaryKeyCol(uint64_t tableId, SNodeList** pCols) { } if (!found) { - if (TSDB_CODE_SUCCESS != nodesListStrictAppend(*pCols, createPrimaryKeyCol(tableId))) { - return TSDB_CODE_OUT_OF_MEMORY; - } + return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema)); } return TSDB_CODE_SUCCESS; } +static int32_t addSystableFirstCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) { + if (LIST_LENGTH(*pCols) > 0) { + return TSDB_CODE_SUCCESS; + } + return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema)); +} + +static int32_t addDefaultScanCol(const STableMeta* pMeta, SNodeList** pCols) { + if (TSDB_SYSTEM_TABLE == pMeta->tableType) { + return addSystableFirstCol(pMeta->uid, pMeta->schema, pCols); + } + return addPrimaryKeyCol(pMeta->uid, pMeta->schema, pCols); +} + static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealTable, bool hasRepeatScanFuncs, SLogicNode** pLogicNode) { SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN); @@ -299,8 +304,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pScan->hasNormalCols = true; } - if (TSDB_CODE_SUCCESS == code && SCAN_TYPE_SYSTEM_TABLE != pScan->scanType) { - code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols); + if (TSDB_CODE_SUCCESS == code) { + code = addDefaultScanCol(pRealTable->pMeta, &pScan->pScanCols); } // set output @@ -787,10 +792,8 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele static EDealRes needFillValueImpl(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { SColumnNode* pCol = (SColumnNode*)pNode; - if (COLUMN_TYPE_WINDOW_START != pCol->colType && - COLUMN_TYPE_WINDOW_END != pCol->colType && - COLUMN_TYPE_WINDOW_DURATION != pCol->colType && - COLUMN_TYPE_GROUP_KEY != pCol->colType) { + if (COLUMN_TYPE_WINDOW_START != pCol->colType && COLUMN_TYPE_WINDOW_END != pCol->colType && + COLUMN_TYPE_WINDOW_DURATION != pCol->colType && COLUMN_TYPE_GROUP_KEY != pCol->colType) { *(bool*)pContext = true; return DEAL_RES_END; } @@ -1008,7 +1011,8 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, NULL, COLLECT_COL_TYPE_ALL, &pPartition->node.pTargets); if (TSDB_CODE_SUCCESS == code && NULL == pPartition->node.pTargets) { - code = nodesListMakeStrictAppend(&pPartition->node.pTargets, nodesCloneNode(nodesListGetNode(pCxt->pCurrRoot->pTargets, 0))); + code = nodesListMakeStrictAppend(&pPartition->node.pTargets, + nodesCloneNode(nodesListGetNode(pCxt->pCurrRoot->pTargets, 0))); } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/test/planSysTbTest.cpp b/source/libs/planner/test/planSysTbTest.cpp index 921f86f09a41d36448ab0d435ab6a439645b9bfc..6b40e381cc18cb75cc9271352cd654d31a74242b 100644 --- a/source/libs/planner/test/planSysTbTest.cpp +++ b/source/libs/planner/test/planSysTbTest.cpp @@ -32,3 +32,9 @@ TEST_F(PlanSysTableTest, informationSchema) { run("SELECT * FROM information_schema.ins_databases WHERE name = 'information_schema'"); } + +TEST_F(PlanSysTableTest, withAgg) { + useDb("root", "information_schema"); + + run("SELECT COUNT(1) FROM ins_users"); +} diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 4d5532b9a639474507948f147fb93e43c2bb8230..2767fed9373aa47ebdbea39b07f28c238db14c7d 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -371,7 +371,7 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem return TSDB_CODE_SUCCESS; } -void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { +void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { pBuf->statis.getPages += 1; char* availablePage = NULL; diff --git a/source/util/test/pageBufferTest.cpp b/source/util/test/pageBufferTest.cpp index eaf198a483aa5e3e90595d2417516aa53f754331..1a057c5875ee95de2fc3c457ca09314366fff48c 100644 --- a/source/util/test/pageBufferTest.cpp +++ b/source/util/test/pageBufferTest.cpp @@ -18,7 +18,7 @@ void simpleTest() { int32_t pageId = 0; int32_t groupId = 0; - SFilePage* pBufPage = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage = static_cast(getNewBufPage(pBuf, &pageId)); ASSERT_TRUE(pBufPage != NULL); ASSERT_EQ(getTotalBufSize(pBuf), 1024); @@ -29,26 +29,26 @@ void simpleTest() { releaseBufPage(pBuf, pBufPage); - SFilePage* pBufPage1 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage1 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t == pBufPage1); - SFilePage* pBufPage2 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage2 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t1 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t1 == pBufPage2); - SFilePage* pBufPage3 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage3 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t2 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t2 == pBufPage3); - SFilePage* pBufPage4 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage4 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t3 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t3 == pBufPage4); releaseBufPage(pBuf, pBufPage2); - SFilePage* pBufPage5 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage5 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t4 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t4 == pBufPage5); @@ -64,7 +64,7 @@ void writeDownTest() { int32_t groupId = 0; int32_t nx = 12345; - SFilePage* pBufPage = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage = static_cast(getNewBufPage(pBuf, &pageId)); ASSERT_TRUE(pBufPage != NULL); *(int32_t*)(pBufPage->data) = nx; @@ -73,22 +73,22 @@ void writeDownTest() { setBufPageDirty(pBufPage, true); releaseBufPage(pBuf, pBufPage); - SFilePage* pBufPage1 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage1 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t1 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t1 == pBufPage1); ASSERT_TRUE(pageId == 1); - SFilePage* pBufPage2 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage2 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t2 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t2 == pBufPage2); ASSERT_TRUE(pageId == 2); - SFilePage* pBufPage3 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage3 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t3 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t3 == pBufPage3); ASSERT_TRUE(pageId == 3); - SFilePage* pBufPage4 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage4 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t4 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t4 == pBufPage4); ASSERT_TRUE(pageId == 4); @@ -113,32 +113,32 @@ void recyclePageTest() { int32_t groupId = 0; int32_t nx = 12345; - SFilePage* pBufPage = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage = static_cast(getNewBufPage(pBuf, &pageId)); ASSERT_TRUE(pBufPage != NULL); releaseBufPage(pBuf, pBufPage); - SFilePage* pBufPage1 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage1 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t1 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t1 == pBufPage1); ASSERT_TRUE(pageId == 1); - SFilePage* pBufPage2 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage2 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t2 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t2 == pBufPage2); ASSERT_TRUE(pageId == 2); - SFilePage* pBufPage3 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage3 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t3 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t3 == pBufPage3); ASSERT_TRUE(pageId == 3); - SFilePage* pBufPage4 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage4 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t4 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t4 == pBufPage4); ASSERT_TRUE(pageId == 4); releaseBufPage(pBuf, t4); - SFilePage* pBufPage5 = static_cast(getNewBufPage(pBuf, groupId, &pageId)); + SFilePage* pBufPage5 = static_cast(getNewBufPage(pBuf, &pageId)); SFilePage* t5 = static_cast(getBufPage(pBuf, pageId)); ASSERT_TRUE(t5 == pBufPage5); ASSERT_TRUE(pageId == 5); diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index d38e509d269beb265f9a7701f88954e71b064de2..07602ec29f69f9fbd0dab90935e0922996c80f80 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -20,15 +20,9 @@ class TDTestCase: tdSql.init(conn.cursor()) #tdSql.init(conn.cursor(), logSql) # output sql.txt file - def checkFileContent(self): - buildPath = tdCom.getBuildPath() - cfgPath = tdCom.getClientCfgPath() - cmdStr = '%s/build/bin/tmq_taosx_ci -c %s'%(buildPath, cfgPath) - tdLog.info(cmdStr) - os.system(cmdStr) - - srcFile = '%s/../log/tmq_taosx_tmp.source'%(cfgPath) - dstFile = '%s/../log/tmq_taosx_tmp.result'%(cfgPath) + def checkJson(self, cfgPath, name): + srcFile = '%s/../log/%s.source'%(cfgPath, name) + dstFile = '%s/../log/%s.result'%(cfgPath, name) tdLog.info("compare file: %s, %s"%(srcFile, dstFile)) consumeFile = open(srcFile, mode='r') @@ -43,7 +37,31 @@ class TDTestCase: tdLog.exit("compare error: %s != %s"%src, dst) else: break + return + + def checkDropData(self): + tdSql.execute('use db_taosx') + tdSql.query("show tables") + tdSql.checkRows(2) + tdSql.query("select * from jt order by i") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, 11) + tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}') + tdSql.checkData(1, 2, None) + + tdSql.execute('use abc1') + tdSql.query("show tables") + tdSql.checkRows(2) + tdSql.query("select * from jt order by i") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, 11) + tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}') + tdSql.checkData(1, 2, None) + return + def checkData(self): tdSql.execute('use db_taosx') tdSql.query("select * from ct3 order by c1 desc") tdSql.checkRows(2) @@ -116,113 +134,82 @@ class TDTestCase: tdSql.checkData(0, 2, None) tdSql.checkData(1, 1, 1) tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}') - - tdSql.execute('drop topic if exists topic_ctb_column') return - def checkFileContentSnapshot(self): + def checkWal1Vgroup(self): buildPath = tdCom.getBuildPath() cfgPath = tdCom.getClientCfgPath() - cmdStr = '%s/build/bin/tmq_taosx_snapshot_ci -c %s'%(buildPath, cfgPath) + cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1'%(buildPath, cfgPath) tdLog.info(cmdStr) os.system(cmdStr) - srcFile = '%s/../log/tmq_taosx_tmp_snapshot.source'%(cfgPath) - dstFile = '%s/../log/tmq_taosx_tmp_snapshot.result'%(cfgPath) - tdLog.info("compare file: %s, %s"%(srcFile, dstFile)) - - consumeFile = open(srcFile, mode='r') - queryFile = open(dstFile, mode='r') - - while True: - dst = queryFile.readline() - src = consumeFile.readline() + self.checkJson(cfgPath, "tmq_taosx_tmp") + self.checkData() - if dst: - if dst != src: - tdLog.exit("compare error: %s != %s"%src, dst) - else: - break + return - tdSql.execute('use db_taosx') - tdSql.query("select * from ct3 order by c1 desc") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 51) - tdSql.checkData(0, 4, 940) - tdSql.checkData(1, 1, 23) - tdSql.checkData(1, 4, None) + def checkWalMultiVgroups(self): + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -sv 3 -dv 5'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) - tdSql.query("select * from st1 order by ts") - tdSql.checkRows(8) - tdSql.checkData(0, 1, 1) - tdSql.checkData(1, 1, 3) - tdSql.checkData(4, 1, 4) - tdSql.checkData(6, 1, 23) + self.checkData() - tdSql.checkData(0, 2, 2) - tdSql.checkData(1, 2, 4) - tdSql.checkData(4, 2, 3) - tdSql.checkData(6, 2, 32) + return - tdSql.checkData(0, 3, 'a') - tdSql.checkData(1, 3, 'b') - tdSql.checkData(4, 3, 'hwj') - tdSql.checkData(6, 3, 's21ds') + def checkWalMultiVgroupsWithDropTable(self): + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -sv 3 -dv 5 -d'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) - tdSql.checkData(0, 4, None) - tdSql.checkData(1, 4, None) - tdSql.checkData(5, 4, 940) - tdSql.checkData(6, 4, None) + self.checkDropData() - tdSql.checkData(0, 5, 1000) - tdSql.checkData(1, 5, 2000) - tdSql.checkData(4, 5, 1000) - tdSql.checkData(6, 5, 5000) + return - tdSql.checkData(0, 6, 'ttt') - tdSql.checkData(1, 6, None) - tdSql.checkData(4, 6, 'ttt') - tdSql.checkData(6, 6, None) + def checkSnapshot1Vgroup(self): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s'%(buildPath, cfgPath) + tdLog.info(cmdStr) + os.system(cmdStr) - tdSql.checkData(0, 7, True) - tdSql.checkData(1, 7, None) - tdSql.checkData(4, 7, True) - tdSql.checkData(6, 7, None) + self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot") + self.checkData() - tdSql.checkData(0, 8, None) - tdSql.checkData(1, 8, None) - tdSql.checkData(4, 8, None) - tdSql.checkData(6, 8, None) + return - tdSql.query("select * from ct1") - tdSql.checkRows(4) + def checkSnapshotMultiVgroups(self): + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) - tdSql.query("select * from ct2") - tdSql.checkRows(0) + self.checkData() - tdSql.query("select * from ct0 order by c1") - tdSql.checkRows(2) - tdSql.checkData(0, 3, "a") - tdSql.checkData(1, 4, None) + return - tdSql.query("select * from n1 order by cc3 desc") - tdSql.checkRows(2) - tdSql.checkData(0, 1, "eeee") - tdSql.checkData(1, 2, 940) + def checkSnapshotMultiVgroupsWithDropTable(self): + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s -d'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) - tdSql.query("select * from jt order by i desc") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 11) - tdSql.checkData(0, 2, None) - tdSql.checkData(1, 1, 1) - tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}') + self.checkDropData() return def run(self): tdSql.prepare() - self.checkFileContent() - self.checkFileContentSnapshot() + self.checkWal1Vgroup() + self.checkSnapshot1Vgroup() + + self.checkWalMultiVgroups() + self.checkSnapshotMultiVgroups() + + self.checkWalMultiVgroupsWithDropTable() + self.checkSnapshotMultiVgroupsWithDropTable() def stop(self): tdSql.close() diff --git a/tests/test/c/CMakeLists.txt b/tests/test/c/CMakeLists.txt index 0fb80e69c264852a1fa7abc8b666d036c36a50d7..31331b52651fee79c837a077869bc45ec7acfe6c 100644 --- a/tests/test/c/CMakeLists.txt +++ b/tests/test/c/CMakeLists.txt @@ -2,7 +2,6 @@ add_executable(tmq_demo tmqDemo.c) add_executable(tmq_sim tmqSim.c) add_executable(create_table createTable.c) add_executable(tmq_taosx_ci tmq_taosx_ci.c) -add_executable(tmq_taosx_snapshot_ci tmq_taosx_snapshot_ci.c) add_executable(sml_test sml_test.c) target_link_libraries( create_table @@ -32,13 +31,6 @@ target_link_libraries( PUBLIC common PUBLIC os ) -target_link_libraries( - tmq_taosx_snapshot_ci - PUBLIC taos_static - PUBLIC util - PUBLIC common - PUBLIC os -) target_link_libraries( sml_test diff --git a/tests/test/c/tmq_taosx_ci.c b/tests/test/c/tmq_taosx_ci.c index ee5af03f053d0e606ec2e06596e48c0e2f4aba69..2afa05b0120b91ba5b40e151e10fc28b204bc1c0 100644 --- a/tests/test/c/tmq_taosx_ci.c +++ b/tests/test/c/tmq_taosx_ci.c @@ -22,8 +22,16 @@ #include "types.h" static int running = 1; -TdFilePtr g_fp = NULL; -char dir[64]={0}; +TdFilePtr g_fp = NULL; +typedef struct{ + bool snapShot; + bool dropTable; + int srcVgroups; + int dstVgroups; + char dir[64]; +}Config; + +Config g_conf = {0}; static TAOS* use_db(){ TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -41,7 +49,6 @@ static TAOS* use_db(){ } static void msg_process(TAOS_RES* msg) { - /*memset(buf, 0, 1024);*/ printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg)); printf("db: %s\n", tmq_get_db_name(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg)); @@ -51,8 +58,11 @@ static void msg_process(TAOS_RES* msg) { if (result) { printf("meta result: %s\n", result); } - taosFprintfFile(g_fp, result); - taosFprintfFile(g_fp, "\n"); + if(g_fp){ + taosFprintfFile(g_fp, result); + taosFprintfFile(g_fp, "\n"); + } + tmq_free_json_meta(result); } @@ -61,22 +71,10 @@ static void msg_process(TAOS_RES* msg) { int32_t ret = tmq_write_raw(pConn, raw); printf("write raw data: %s\n", tmq_err2str(ret)); -// else{ -// while(1){ -// int numOfRows = 0; -// void *pData = NULL; -// taos_fetch_raw_block(msg, &numOfRows, &pData); -// if(numOfRows == 0) break; -// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows); -// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg)); -// printf("write raw data: %s\n", tmq_err2str(ret)); -// } -// } - taos_close(pConn); } -int32_t init_env() { +int32_t init_env(Config *conf) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { return -1; @@ -89,13 +87,22 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 1"); + char sql[128] = {0}; + snprintf(sql, 128, "create database if not exists db_taosx vgroups %d", conf->dstVgroups); + pRes = taos_query(pConn, sql); if (taos_errno(pRes) != 0) { printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); + pRes = taos_query(pConn, "drop topic if exists topic_db"); + if (taos_errno(pRes) != 0) { + printf("error in drop topic, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + pRes = taos_query(pConn, "drop database if exists abc1"); if (taos_errno(pRes) != 0) { printf("error in drop db, reason:%s\n", taos_errstr(pRes)); @@ -103,7 +110,8 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + snprintf(sql, 128, "create database if not exists abc1 vgroups %d", conf->srcVgroups); + pRes = taos_query(pConn, sql); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; @@ -133,7 +141,7 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct0 values(1626006833600, 1, 2, 'a')"); + pRes = taos_query(pConn, "insert into ct0 values(1626006833400, 1, 2, 'a')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); return -1; @@ -168,7 +176,7 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); + pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); return -1; @@ -224,6 +232,22 @@ int32_t init_env() { } taos_free_result(pRes); + if(conf->dropTable){ + pRes = taos_query(pConn, "drop table ct3 ct1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop table st1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + } + pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); if (taos_errno(pRes) != 0) { printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes)); @@ -273,6 +297,15 @@ int32_t init_env() { } taos_free_result(pRes); + if(conf->dropTable){ + pRes = taos_query(pConn, "drop table n1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + } + pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); if (taos_errno(pRes) != 0) { printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); @@ -308,6 +341,23 @@ int32_t init_env() { } taos_free_result(pRes); + if(conf->dropTable){ + pRes = taos_query(pConn, + "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " + "nchar(8), t4 bool)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop table st1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + } taos_close(pConn); return 0; } @@ -327,9 +377,9 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); + pRes = taos_query(pConn, "create topic topic_db with meta as database abc1"); if (taos_errno(pRes) != 0) { - printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); + printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); @@ -342,18 +392,7 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { printf("commit %d tmq %p param %p\n", code, tmq, param); } -tmq_t* build_consumer() { -#if 0 - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); -#endif - +tmq_t* build_consumer(Config *config) { tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "group.id", "tg2"); tmq_conf_set(conf, "client.id", "my app 1"); @@ -363,7 +402,9 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.heartbeat.background", "true"); - /*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/ + if(config->snapShot){ + tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + } tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); @@ -374,8 +415,7 @@ tmq_t* build_consumer() { tmq_list_t* build_topic_list() { tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "topic_ctb_column"); - /*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/ + tmq_list_append(topic_list, "topic_db"); return topic_list; } @@ -393,12 +433,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { if (tmqmessage) { cnt++; msg_process(tmqmessage); - /*if (cnt >= 2) break;*/ - /*printf("get data\n");*/ taos_free_result(tmqmessage); - /*} else {*/ - /*break;*/ - /*tmq_commit_sync(tmq, NULL);*/ }else{ break; } @@ -411,52 +446,18 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { fprintf(stderr, "%% Consumer closed\n"); } -void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { - static const int MIN_COMMIT_COUNT = 1; - - int msg_count = 0; - int32_t code; - - if ((code = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); - return; - } - - tmq_list_t* subList = NULL; - tmq_subscription(tmq, &subList); - char** subTopics = tmq_list_to_c_array(subList); - int32_t sz = tmq_list_get_size(subList); - printf("subscribed topics: "); - for (int32_t i = 0; i < sz; i++) { - printf("%s, ", subTopics[i]); - } - printf("\n"); - tmq_list_destroy(subList); - - while (running) { - TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); - if (tmqmessage) { - msg_process(tmqmessage); - taos_free_result(tmqmessage); - - /*tmq_commit_sync(tmq, NULL);*/ - /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ - } - } - - code = tmq_consumer_close(tmq); - if (code) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); - else - fprintf(stderr, "%% Consumer closed\n"); -} - -void initLogFile() { +void initLogFile(Config *conf) { char f1[256] = {0}; char f2[256] = {0}; - sprintf(f1, "%s/../log/tmq_taosx_tmp.source", dir); - sprintf(f2, "%s/../log/tmq_taosx_tmp.result", dir); + if(conf->snapShot){ + sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", conf->dir); + sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", conf->dir); + }else{ + sprintf(f1, "%s/../log/tmq_taosx_tmp.source", conf->dir); + sprintf(f2, "%s/../log/tmq_taosx_tmp.result", conf->dir); + } + TdFilePtr pFile = taosOpenFile(f1, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM); if (NULL == pFile) { fprintf(stderr, "Failed to open %s for save result\n", f1); @@ -469,53 +470,82 @@ void initLogFile() { fprintf(stderr, "Failed to open %s for save result\n", f2); exit(-1); } - char *result[] = { - "{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}", - "{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}", - "{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}", - "{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[]}", - "{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":3000}]}", - "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}", - "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":7,\"colName\":\"c3\",\"colType\":8,\"colLength\":64}", - "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":1,\"colName\":\"t2\",\"colType\":8,\"colLength\":64}", - "{\"type\":\"alter\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"alterType\":4,\"colName\":\"t1\",\"colValue\":\"5000\",\"colValueNull\":false}", - "{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":10,\"length\":4}],\"tags\":[]}", - "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":5,\"colName\":\"c3\",\"colType\":5}", - "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":7,\"colName\":\"c2\",\"colType\":10,\"colLength\":8}", - "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":10,\"colName\":\"c3\",\"colNewName\":\"cc3\"}", - "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":9}", - "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":6,\"colName\":\"c1\"}", - "{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}", - "{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}", - "{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}" - }; - - for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){ - taosFprintfFile(pFile2, result[i]); - taosFprintfFile(pFile2, "\n"); + + if(conf->snapShot){ + char *result[] = { + "{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}", + "{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}", + "{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}", + "{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[]}", + "{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":5000}]}", + "{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c2\",\"type\":10,\"length\":8},{\"name\":\"cc3\",\"type\":5}],\"tags\":[]}", + "{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}", + "{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}", + "{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}", + }; + + for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){ + taosFprintfFile(pFile2, result[i]); + taosFprintfFile(pFile2, "\n"); + } + }else{ + char *result[] = { + "{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}", + "{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}", + "{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}", + "{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[]}", + "{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":3000}]}", + "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}", + "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":7,\"colName\":\"c3\",\"colType\":8,\"colLength\":64}", + "{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":1,\"colName\":\"t2\",\"colType\":8,\"colLength\":64}", + "{\"type\":\"alter\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"alterType\":4,\"colName\":\"t1\",\"colValue\":\"5000\",\"colValueNull\":false}", + "{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":10,\"length\":4}],\"tags\":[]}", + "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":5,\"colName\":\"c3\",\"colType\":5}", + "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":7,\"colName\":\"c2\",\"colType\":10,\"colLength\":8}", + "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":10,\"colName\":\"c3\",\"colNewName\":\"cc3\"}", + "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":9}", + "{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":6,\"colName\":\"c1\"}", + "{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}", + "{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}", + "{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}" + }; + + for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){ + taosFprintfFile(pFile2, result[i]); + taosFprintfFile(pFile2, "\n"); + } } + taosCloseFile(&pFile2); } int main(int argc, char* argv[]) { - if(argc == 3 && strcmp(argv[1], "-c") == 0) { - strcpy(dir, argv[2]); - }else{ -// strcpy(dir, "../../../sim/psim/cfg"); - strcpy(dir, "/var/log"); + for (int32_t i = 1; i < argc; i++) { + if(strcmp(argv[i], "-c") == 0){ + strcpy(g_conf.dir, argv[++i]); + }else if(strcmp(argv[i], "-s") == 0){ + g_conf.snapShot = true; + }else if(strcmp(argv[i], "-d") == 0){ + g_conf.dropTable = true; + }else if(strcmp(argv[i], "-sv") == 0){ + g_conf.srcVgroups = atol(argv[++i]); + }else if(strcmp(argv[i], "-dv") == 0){ + g_conf.dstVgroups = atol(argv[++i]); + } } printf("env init\n"); - initLogFile(); + if(strlen(g_conf.dir) != 0){ + initLogFile(&g_conf); + } - if (init_env() < 0) { + if (init_env(&g_conf) < 0) { return -1; } create_topic(); - tmq_t* tmq = build_consumer(); + tmq_t* tmq = build_consumer(&g_conf); tmq_list_t* topic_list = build_topic_list(); basic_consume_loop(tmq, topic_list); - /*sync_consume_loop(tmq, topic_list);*/ taosCloseFile(&g_fp); } diff --git a/tests/test/c/tmq_taosx_snapshot_ci.c b/tests/test/c/tmq_taosx_snapshot_ci.c deleted file mode 100644 index e3a52f7cad7bf4c3125fc0c023795a4d86fcdd5e..0000000000000000000000000000000000000000 --- a/tests/test/c/tmq_taosx_snapshot_ci.c +++ /dev/null @@ -1,512 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include -#include -#include -#include -#include -#include "taos.h" -#include "types.h" - -static int running = 1; -TdFilePtr g_fp = NULL; -char dir[64]={0}; - -static TAOS* use_db(){ - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { - return NULL; - } - - TAOS_RES* pRes = taos_query(pConn, "use db_taosx"); - if (taos_errno(pRes) != 0) { - printf("error in use db_taosx, reason:%s\n", taos_errstr(pRes)); - return NULL; - } - taos_free_result(pRes); - return pConn; -} - -static void msg_process(TAOS_RES* msg) { - /*memset(buf, 0, 1024);*/ - printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg)); - printf("db: %s\n", tmq_get_db_name(msg)); - printf("vg: %d\n", tmq_get_vgroup_id(msg)); - TAOS *pConn = use_db(); - if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { - char* result = tmq_get_json_meta(msg); - if (result) { - printf("meta result: %s\n", result); - } - taosFprintfFile(g_fp, result); - taosFprintfFile(g_fp, "\n"); - tmq_free_json_meta(result); - } - - tmq_raw_data raw = {0}; - tmq_get_raw(msg, &raw); - int32_t ret = tmq_write_raw(pConn, raw); - printf("write raw data: %s\n", tmq_err2str(ret)); - -// else{ -// while(1){ -// int numOfRows = 0; -// void *pData = NULL; -// taos_fetch_raw_block(msg, &numOfRows, &pData); -// if(numOfRows == 0) break; -// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows); -// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg)); -// printf("write raw data: %s\n", tmq_err2str(ret)); -// } -// } - - taos_close(pConn); -} - -int32_t init_env() { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { - return -1; - } - - TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx"); - if (taos_errno(pRes) != 0) { - printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 1"); - if (taos_errno(pRes) != 0) { - printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop database if exists abc1"); - if (taos_errno(pRes) != 0) { - printf("error in drop db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, - "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " - "nchar(8), t4 bool)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct0 values(1626006833600, 1, 2, 'a')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table st1 add column c4 bigint"); - if (taos_errno(pRes) != 0) { - printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)"); - if (taos_errno(pRes) != 0) { - printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into ct3 select * from ct1"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)"); - if (taos_errno(pRes) != 0) { - printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table ct3 set tag t1=5000"); - if (taos_errno(pRes) != 0) { - printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); - if (taos_errno(pRes) != 0) { - printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 add column c3 bigint"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 rename column c3 cc3"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 comment 'hello'"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "alter table n1 drop column c1"); - if (taos_errno(pRes) != 0) { - printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table jt2 using jt tags('')"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into jt1 values(now, 1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into jt2 values(now, 11)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - taos_close(pConn); - return 0; -} - -int32_t create_topic() { - printf("create topic\n"); - TAOS_RES* pRes; - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { - return -1; - } - - pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - taos_close(pConn); - return 0; -} - -void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { - printf("commit %d tmq %p param %p\n", code, tmq, param); -} - -tmq_t* build_consumer() { -#if 0 - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); -#endif - - tmq_conf_t* conf = tmq_conf_new(); - tmq_conf_set(conf, "group.id", "tg2"); - tmq_conf_set(conf, "client.id", "my app 1"); - tmq_conf_set(conf, "td.connect.user", "root"); - tmq_conf_set(conf, "td.connect.pass", "taosdata"); - tmq_conf_set(conf, "msg.with.table.name", "true"); - tmq_conf_set(conf, "enable.auto.commit", "true"); - tmq_conf_set(conf, "enable.heartbeat.background", "true"); - tmq_conf_set(conf, "experimental.snapshot.enable", "true"); - /*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/ - - tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); - tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); - assert(tmq); - tmq_conf_destroy(conf); - return tmq; -} - -tmq_list_t* build_topic_list() { - tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "topic_ctb_column"); - /*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/ - return topic_list; -} - -void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { - int32_t code; - - if ((code = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); - printf("subscribe err\n"); - return; - } - int32_t cnt = 0; - while (running) { - TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); - if (tmqmessage) { - cnt++; - msg_process(tmqmessage); - /*if (cnt >= 2) break;*/ - /*printf("get data\n");*/ - taos_free_result(tmqmessage); - /*} else {*/ - /*break;*/ - /*tmq_commit_sync(tmq, NULL);*/ - }else{ - break; - } - } - - code = tmq_consumer_close(tmq); - if (code) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); - else - fprintf(stderr, "%% Consumer closed\n"); -} - -void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { - static const int MIN_COMMIT_COUNT = 1; - - int msg_count = 0; - int32_t code; - - if ((code = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); - return; - } - - tmq_list_t* subList = NULL; - tmq_subscription(tmq, &subList); - char** subTopics = tmq_list_to_c_array(subList); - int32_t sz = tmq_list_get_size(subList); - printf("subscribed topics: "); - for (int32_t i = 0; i < sz; i++) { - printf("%s, ", subTopics[i]); - } - printf("\n"); - tmq_list_destroy(subList); - - while (running) { - TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); - if (tmqmessage) { - msg_process(tmqmessage); - taos_free_result(tmqmessage); - - /*tmq_commit_sync(tmq, NULL);*/ - /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ - } - } - - code = tmq_consumer_close(tmq); - if (code) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); - else - fprintf(stderr, "%% Consumer closed\n"); -} - -void initLogFile() { - char f1[256] = {0}; - char f2[256] = {0}; - - sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", dir); - sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", dir); - TdFilePtr pFile = taosOpenFile(f1, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM); - if (NULL == pFile) { - fprintf(stderr, "Failed to open %s for save result\n", f1); - exit(-1); - } - g_fp = pFile; - - TdFilePtr pFile2 = taosOpenFile(f2, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM); - if (NULL == pFile2) { - fprintf(stderr, "Failed to open %s for save result\n", f2); - exit(-1); - } - char *result[] = { - "{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}", - "{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}", - "{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}", - "{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[]}", - "{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":5000}]}", - "{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c2\",\"type\":10,\"length\":8},{\"name\":\"cc3\",\"type\":5}],\"tags\":[]}", - "{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}", - "{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}", - "{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}", - }; - - for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){ - taosFprintfFile(pFile2, result[i]); - taosFprintfFile(pFile2, "\n"); - } - taosCloseFile(&pFile2); -} - -int main(int argc, char* argv[]) { - if(argc == 3 && strcmp(argv[1], "-c") == 0) { - strcpy(dir, argv[2]); - }else{ -// strcpy(dir, "../../../sim/psim/cfg"); - strcpy(dir, "/var/log"); - } - - printf("env init\n"); - initLogFile(); - - if (init_env() < 0) { - return -1; - } - create_topic(); - - tmq_t* tmq = build_consumer(); - tmq_list_t* topic_list = build_topic_list(); - basic_consume_loop(tmq, topic_list); - /*sync_consume_loop(tmq, topic_list);*/ - taosCloseFile(&g_fp); -}