提交 3d193021 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/3.0_interval_hash_optimize

......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG aa45ad4
GIT_TAG 9cb965f
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -13,15 +13,9 @@ IF (TD_LINUX)
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
add_executable(tmq "")
add_executable(tmq_taosx "")
add_executable(stream_demo "")
add_executable(demoapi "")
target_sources(tmq_taosx
PRIVATE
"tmq_taosx.c"
)
target_sources(tmq
PRIVATE
"tmq.c"
......@@ -41,10 +35,6 @@ IF (TD_LINUX)
taos_static
)
target_link_libraries(tmq_taosx
taos_static
)
target_link_libraries(stream_demo
taos_static
)
......@@ -57,10 +47,6 @@ IF (TD_LINUX)
PUBLIC "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_include_directories(tmq_taosx
PUBLIC "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_include_directories(stream_demo
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
......@@ -73,7 +59,6 @@ IF (TD_LINUX)
)
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
SET_TARGET_PROPERTIES(tmq_taosx PROPERTIES OUTPUT_NAME tmq_taosx)
SET_TARGET_PROPERTIES(stream_demo PROPERTIES OUTPUT_NAME stream_demo)
SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi)
ENDIF ()
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
static int running = 1;
static TAOS* use_db(){
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return NULL;
}
TAOS_RES* pRes = taos_query(pConn, "use db_taosx");
if (taos_errno(pRes) != 0) {
printf("error in use db_taosx, reason:%s\n", taos_errstr(pRes));
return NULL;
}
taos_free_result(pRes);
return pConn;
}
static void msg_process(TAOS_RES* msg) {
/*memset(buf, 0, 1024);*/
printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg));
printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
TAOS *pConn = use_db();
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
char* result = tmq_get_json_meta(msg);
if (result) {
printf("meta result: %s\n", result);
}
tmq_free_json_meta(result);
}
tmq_raw_data raw = {0};
tmq_get_raw(msg, &raw);
int32_t ret = tmq_write_raw(pConn, raw);
printf("write raw data: %s\n", tmq_err2str(ret));
// else{
// while(1){
// int numOfRows = 0;
// void *pData = NULL;
// taos_fetch_raw_block(msg, &numOfRows, &pData);
// if(numOfRows == 0) break;
// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows);
// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg));
// printf("write raw data: %s\n", tmq_err2str(ret));
// }
// }
taos_close(pConn);
}
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 4");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists abc1");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists abc1 vgroups 3");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct0 values(1626006833600, 1, 2, 'a')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct4 using st1(t3) tags('ct4')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct4, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 select * from ct1");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table ct3 set tag t1=5000");
if (taos_errno(pRes) != 0) {
printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table ct3 ct1");
if (taos_errno(pRes) != 0) {
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
if (taos_errno(pRes) != 0) {
printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 add column c3 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 rename column c3 cc3");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 comment 'hello'");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 drop column c1");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table n1");
if (taos_errno(pRes) != 0) {
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt2 using jt tags('')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
int32_t create_topic() {
printf("create topic\n");
TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p param %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_ctb_column");
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return topic_list;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t code;
if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
printf("subscribe err\n");
return;
}
int32_t cnt = 0;
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
/*if (cnt >= 2) break;*/
/*printf("get data\n");*/
taos_free_result(tmqmessage);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}
}
code = tmq_consumer_close(tmq);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1;
int msg_count = 0;
int32_t code;
if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
return;
}
tmq_list_t* subList = NULL;
tmq_subscription(tmq, &subList);
char** subTopics = tmq_list_to_c_array(subList);
int32_t sz = tmq_list_get_size(subList);
printf("subscribed topics: ");
for (int32_t i = 0; i < sz; i++) {
printf("%s, ", subTopics[i]);
}
printf("\n");
tmq_list_destroy(subList);
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
msg_process(tmqmessage);
taos_free_result(tmqmessage);
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
code = tmq_consumer_close(tmq);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
int main(int argc, char* argv[]) {
printf("env init\n");
if (init_env() < 0) {
return -1;
}
create_topic();
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
}
......@@ -92,6 +92,8 @@ struct SResultRowEntryInfo;
//for selectivity query, the corresponding tag value is assigned if the data is qualified
typedef struct SSubsidiaryResInfo {
int16_t num;
int32_t rowLen;
char* buf; // serialize data buffer
struct SqlFunctionCtx **pCtx;
} SSubsidiaryResInfo;
......@@ -118,6 +120,11 @@ typedef struct SInputColumnInfoData {
uint64_t uid; // table uid, used to set the tag value when building the final query result for selectivity functions.
} SInputColumnInfoData;
typedef struct SSerializeDataHandle {
struct SDiskbasedBuf* pBuf;
int32_t currentPage;
} SSerializeDataHandle;
// sql function runtime context
typedef struct SqlFunctionCtx {
SInputColumnInfoData input;
......@@ -137,10 +144,9 @@ typedef struct SqlFunctionCtx {
SFuncExecFuncs fpSet;
SScalarFuncExecFuncs sfp;
struct SExprInfo *pExpr;
struct SDiskbasedBuf *pBuf;
struct SSDataBlock *pSrcBlock;
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
int32_t curBufPage;
SSerializeDataHandle saveHandle;
bool isStream;
char udfName[TSDB_FUNC_NAME_LEN];
......
......@@ -58,11 +58,10 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
/**
*
* @param pBuf
* @param groupId
* @param pageId
* @return
*/
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId);
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId);
/**
*
......
......@@ -182,7 +182,6 @@ pipeline {
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
python3 checkPackageRuning.py
rmtaos
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
......
......@@ -7,6 +7,7 @@ originPackageName=$3
originversion=$4
testFile=$5
subFile="taos.tar.gz"
password=$6
if [ ${testFile} = "server" ];then
tdPath="TDengine-server-${version}"
......@@ -56,6 +57,7 @@ fi
cmdInstall tree
cmdInstall wget
cmdInstall sshpass
echo "new workroom path"
installPath="/usr/local/src/packageTest"
......@@ -74,24 +76,49 @@ else
echo "${oriInstallPath} already exists"
fi
echo "decompress installPackage"
cd ${installPath}
wget https://www.taosdata.com/assets-download/3.0/${packgeName}
cd ${oriInstallPath}
wget https://www.taosdata.com/assets-download/3.0/${originPackageName}
echo "download installPackage"
# cd ${installPath}
# wget https://www.taosdata.com/assets-download/3.0/${packgeName}
# cd ${oriInstallPath}
# wget https://www.taosdata.com/assets-download/3.0/${originPackageName}
cd ${installPath}
if [ ! -f {packgeName} ];then
sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/${packgeName} .
fi
if [ ! -f debAuto.sh ];then
echo '#!/usr/bin/expect ' > debAuto.sh
echo 'set timeout 3 ' >> debAuto.sh
echo 'pset packgeName [lindex $argv 0]' >> debAuto.sh
echo 'spawn dpkg -i ${packgeName}' >> debAuto.sh
echo 'expect "*one:"' >> debAuto.sh
echo 'send "\r"' >> debAuto.sh
echo 'expect "*skip:"' >> debAuto.sh
echo 'send "\r" ' >> debAuto.sh
fi
if [[ ${packgeName} =~ "deb" ]];then
cd ${installPath}
echo "dpkg ${packgeName}" && dpkg -i ${packgeName}
dpkg -r taostools
dpkg -r tdengine
if [[ ${packgeName} =~ "TDengine" ]];then
echo "./debAuto.sh ${packgeName}" && chmod 755 debAuto.sh && ./debAuto.sh ${packgeName}
else
echo "dpkg -i ${packgeName}" && dpkg -i ${packgeName}
elif [[ ${packgeName} =~ "rpm" ]];then
cd ${installPath}
echo "rpm ${packgeName}" && rpm -ivh ${packgeName}
echo "rpm ${packgeName}" && rpm -ivh ${packgeName} --quiet
elif [[ ${packgeName} =~ "tar" ]];then
echo "tar ${packgeName}" && tar -xvf ${packgeName}
cd ${oriInstallPath}
cd ${oriInstallPath}
if [ ! -f {originPackageName} ];then
sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${originversion}/community${originPackageName} .
fi
echo "tar -xvf ${originPackageName}" && tar -xvf ${originPackageName}
cd ${installPath}
echo "tar -xvf ${packgeName}" && tar -xvf ${packgeName}
......@@ -105,10 +132,10 @@ elif [[ ${packgeName} =~ "tar" ]];then
cd ${installPath}
tree ${oriInstallPath}/${originTdpPath} > ${originPackageName}_checkfile
tree ${installPath}/${tdPath} > ${packgeName}_checkfile
tree ${oriInstallPath}/${originTdpPath} > ${oriInstallPath}/${originPackageName}_checkfile
tree ${installPath}/${tdPath} > ${installPath}/${packgeName}_checkfile
diff ${packgeName}_checkfile ${originPackageName}_checkfile > ${installPath}/diffFile.log
diff ${installPath}/${packgeName}_checkfile ${oriInstallPath}/${originPackageName}_checkfile > ${installPath}/diffFile.log
diffNumbers=`cat ${installPath}/diffFile.log |wc -l `
if [ ${diffNumbers} != 0 ];then
echo "The number and names of files have changed from the previous installation package"
......@@ -122,11 +149,20 @@ elif [[ ${packgeName} =~ "tar" ]];then
else
bash ${installCmd}
fi
if [[ ${packgeName} =~ "Lite" ]];then
if [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "tar" ]] ;then
cd ${installPath}
wget https://www.taosdata.com/assets-download/3.0/taosTools-2.1.2-Linux-x64.tar.gz
sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.tar.gz .
# wget https://www.taosdata.com/assets-download/3.0/taosTools-2.1.2-Linux-x64.tar.gz
tar xvf taosTools-2.1.2-Linux-x64.tar.gz
cd taosTools-2.1.2 && bash install-taostools.sh
elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "deb" ]] ;then
cd ${installPath}
sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.deb .
dpkg -i taosTools-2.1.2-Linux-x64.deb
elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "rpm" ]] ;then
cd ${installPath}
sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.rpm .
rpm -ivh taosTools-2.1.2-Linux-x64.rpm --quiet
fi
fi
......
......@@ -471,6 +471,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p
len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE TABLE `%s` (", tbName);
appendColumnFields(buf2, &len, pCfg);
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")");
appendTableOptions(buf2, &len, pDbCfg, pCfg);
}
varDataLen(buf2) = len;
......
......@@ -303,6 +303,7 @@ typedef struct SAggSupporter {
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // current write page id
} SAggSupporter;
typedef struct {
......@@ -327,7 +328,6 @@ typedef struct STableScanInfo {
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
// SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
......@@ -431,6 +431,7 @@ typedef struct SStreamAggSupporter {
char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // buffer page that is active
SSDataBlock* pScanBlock;
} SStreamAggSupporter;
......@@ -1009,7 +1010,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
......
......@@ -46,8 +46,8 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
rowSize += pCtx[i].resDataInfo.interBufSize;
}
rowSize +=
(numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(doSaveTupleData)
rowSize += (numOfOutput * sizeof(bool));
// expand rowSize to mark if col is null for top/bottom result(saveTupleData)
return rowSize;
}
......@@ -1178,7 +1178,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
SqlFunctionCtx* pCtx = &pFuncCtx[i];
pCtx->functionId = -1;
pCtx->curBufPage = -1;
pCtx->pExpr = pExpr;
if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
......@@ -1222,6 +1221,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->isStream = false;
pCtx->param = pFunct->pParam;
pCtx->saveHandle.currentPage = -1;
}
for (int32_t i = 1; i < numOfOutput; ++i) {
......
......@@ -179,26 +179,23 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
}
#endif
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
SFilePage* pData = NULL;
// in the first scan, new space needed for results
int32_t pageId = -1;
SIDList list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
if (*currentPageId == -1) {
pData = getNewBufPage(pResultBuf, &pageId);
pData->num = sizeof(SFilePage);
} else {
SPageInfo* pi = getLastPageInfo(list);
pData = getBufPage(pResultBuf, getPageId(pi));
pageId = getPageId(pi);
pData = getBufPage(pResultBuf, *currentPageId);
pageId = *currentPageId;
if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi);
releaseBufPage(pResultBuf, pData);
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
pData = getNewBufPage(pResultBuf, &pageId);
if (pData != NULL) {
pData->num = sizeof(SFilePage);
}
......@@ -215,9 +212,9 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int
SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
pResultRow->pageId = pageId;
pResultRow->offset = (int32_t)pData->num;
*currentPageId = pageId;
pData->num += interBufSize;
return pResultRow;
}
......@@ -263,11 +260,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// allocate a new buffer page
if (pResult == NULL) {
#ifdef BUF_PAGE_DEBUG
qDebug("page_2");
#endif
ASSERT(pSup->resultRowSize > 0);
pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
initResultRow(pResult);
......@@ -302,7 +296,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
SIDList list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, tid, &pageId);
pData = getNewBufPage(pResultBuf, &pageId);
pData->num = sizeof(SFilePage);
} else {
SPageInfo* pi = getLastPageInfo(list);
......@@ -313,7 +307,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
// release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi);
pData = getNewBufPage(pResultBuf, tid, &pageId);
pData = getNewBufPage(pResultBuf, &pageId);
if (pData != NULL) {
pData->num = sizeof(SFilePage);
}
......@@ -3092,7 +3086,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
offset += sizeof(int32_t);
uint64_t tableGroupId = *(uint64_t*)(result + offset);
SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (!resultRow) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -3440,8 +3434,10 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey) {
int32_t code = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pAggSup->currentPageId = -1;
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn);
......@@ -3455,18 +3451,18 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
qError("Init stream agg supporter failed since %s", terrstr(terrno));
return terrno;
code = TSDB_CODE_NO_AVAIL_DISK;
qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
return code;
}
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
qError("Create agg result buf failed since %s", tstrerror(code));
qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
return code;
}
return TSDB_CODE_SUCCESS;
return code;
}
void cleanupAggSup(SAggSupporter* pAggSup) {
......@@ -3488,7 +3484,7 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
}
for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
}
return TSDB_CODE_SUCCESS;
......@@ -3520,6 +3516,7 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
}
taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
taosMemoryFree(pCtx[i].input.pData);
taosMemoryFree(pCtx[i].input.pColumnDataAgg);
}
......@@ -4678,6 +4675,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size) {
pSup->currentPageId = -1;
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
......@@ -4705,7 +4703,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
}
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir);
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].pBuf = pSup->pResultBuf;
pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
}
return code;
}
......@@ -547,7 +547,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
pPage = getNewBufPage(pInfo->pBuf, &pageId);
taosArrayPush(p->pPageList, &pageId);
*(int32_t *) pPage = 0;
......@@ -562,7 +562,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
// add a new page for current group
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
pPage = getNewBufPage(pInfo->pBuf, &pageId);
taosArrayPush(p->pPageList, &pageId);
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
}
......
......@@ -195,16 +195,6 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
return PROJECT_RETRIEVE_DONE;
}
void printDataBlock1(SSDataBlock* pBlock, const char* flag) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("===stream===printDataBlock: Block is Null or Empty");
return;
}
char* pBuf = NULL;
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
taosMemoryFreeClear(pBuf);
}
SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
......
......@@ -1828,12 +1828,6 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
return needed;
}
void increaseTs(SqlFunctionCtx* pCtx) {
if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTART) {
// pCtx[0].increase = true;
}
}
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup) {
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
// Todo(liuyao) support partition by column
......@@ -1895,7 +1889,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
if (isStream) {
ASSERT(numOfCols > 0);
increaseTs(pSup->pCtx);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
}
......@@ -3050,6 +3043,7 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo)
tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->aggSup.currentPageId = -1;
}
static void clearSpecialDataBlock(SSDataBlock* pBlock) {
......@@ -3420,7 +3414,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initBasicInfo(&pInfo->binfo, pResBlock);
ASSERT(numOfCols > 0);
increaseTs(pOperator->exprSupp.pCtx);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
......@@ -3451,6 +3444,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
// semi interval operator does not catch result
pInfo->isFinal = false;
pOperator->name = "StreamSemiIntervalOperator";
ASSERT(pInfo->aggSup.currentPageId == -1);
}
if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
......@@ -3559,11 +3553,10 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
initBasicInfo(pBasicInfo, pResultBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].pBuf = NULL;
pSup->pCtx[i].saveHandle.pBuf = NULL;
}
ASSERT(numOfCols > 0);
increaseTs(pSup->pCtx);
return TSDB_CODE_SUCCESS;
}
......@@ -3820,7 +3813,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
}
if (pWinInfo->pos.pageId == -1) {
*pResult = getNewResultRow(pAggSup->pResultBuf, groupId, pAggSup->resultRowSize);
*pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
if (*pResult == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -4337,6 +4330,7 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
}
}
clearDiskbasedBuf(pInfo->streamAggSup.pResultBuf);
pInfo->streamAggSup.currentPageId = -1;
}
static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) {
......
......@@ -97,7 +97,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
// allocate the overflow buffer page to hold this k/v.
int32_t newPageId = -1;
SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId);
SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, &newPageId);
if (pNewPage == NULL) {
return terrno;
}
......@@ -227,7 +227,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) {
}
int32_t pageId = -1;
SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId);
SFilePage* p = getNewBufPage(pHashObj->pBuf, &pageId);
if (p == NULL) {
return terrno;
}
......
......@@ -180,7 +180,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
}
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
if (pPage == NULL) {
blockDataDestroy(p);
return terrno;
......@@ -512,7 +512,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
}
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
if (pPage == NULL) {
return terrno;
}
......
......@@ -372,7 +372,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pPageIdList = pList;
}
pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId);
pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
pSlot->info.pageId = pageId;
taosArrayPush(pPageIdList, &pageId);
}
......
......@@ -1669,6 +1669,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
pDb = taosHashIterate(context.pDbFNameHashObj, pDb);
}
}
if (pContext->pStmtCb) {
context.pVgroupsHashObj = NULL;
context.pTableBlockHashObj = NULL;
}
destroyInsertParseContext(&context);
return code;
}
......
......@@ -197,28 +197,21 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
return SCAN_TYPE_TABLE;
}
static SNode* createPrimaryKeyCol(uint64_t tableId) {
static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return NULL;
}
pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
pCol->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
pCol->node.resType.type = pSchema->type;
pCol->node.resType.bytes = pSchema->bytes;
pCol->tableId = tableId;
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
pCol->colId = pSchema->colId;
pCol->colType = COLUMN_TYPE_COLUMN;
strcpy(pCol->colName, "#primarykey");
strcpy(pCol->colName, pSchema->name);
return (SNode*)pCol;
}
static int32_t addPrimaryKeyCol(uint64_t tableId, SNodeList** pCols) {
if (NULL == *pCols) {
*pCols = nodesMakeList();
if (NULL == *pCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) {
bool found = false;
SNode* pCol = NULL;
FOREACH(pCol, *pCols) {
......@@ -229,13 +222,25 @@ static int32_t addPrimaryKeyCol(uint64_t tableId, SNodeList** pCols) {
}
if (!found) {
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(*pCols, createPrimaryKeyCol(tableId))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema));
}
return TSDB_CODE_SUCCESS;
}
static int32_t addSystableFirstCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) {
if (LIST_LENGTH(*pCols) > 0) {
return TSDB_CODE_SUCCESS;
}
return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema));
}
static int32_t addDefaultScanCol(const STableMeta* pMeta, SNodeList** pCols) {
if (TSDB_SYSTEM_TABLE == pMeta->tableType) {
return addSystableFirstCol(pMeta->uid, pMeta->schema, pCols);
}
return addPrimaryKeyCol(pMeta->uid, pMeta->schema, pCols);
}
static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealTable, bool hasRepeatScanFuncs,
SLogicNode** pLogicNode) {
SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
......@@ -299,8 +304,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pScan->hasNormalCols = true;
}
if (TSDB_CODE_SUCCESS == code && SCAN_TYPE_SYSTEM_TABLE != pScan->scanType) {
code = addPrimaryKeyCol(pScan->tableId, &pScan->pScanCols);
if (TSDB_CODE_SUCCESS == code) {
code = addDefaultScanCol(pRealTable->pMeta, &pScan->pScanCols);
}
// set output
......@@ -787,10 +792,8 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
static EDealRes needFillValueImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SColumnNode* pCol = (SColumnNode*)pNode;
if (COLUMN_TYPE_WINDOW_START != pCol->colType &&
COLUMN_TYPE_WINDOW_END != pCol->colType &&
COLUMN_TYPE_WINDOW_DURATION != pCol->colType &&
COLUMN_TYPE_GROUP_KEY != pCol->colType) {
if (COLUMN_TYPE_WINDOW_START != pCol->colType && COLUMN_TYPE_WINDOW_END != pCol->colType &&
COLUMN_TYPE_WINDOW_DURATION != pCol->colType && COLUMN_TYPE_GROUP_KEY != pCol->colType) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
......@@ -1008,7 +1011,8 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
int32_t code =
nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, NULL, COLLECT_COL_TYPE_ALL, &pPartition->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pPartition->node.pTargets) {
code = nodesListMakeStrictAppend(&pPartition->node.pTargets, nodesCloneNode(nodesListGetNode(pCxt->pCurrRoot->pTargets, 0)));
code = nodesListMakeStrictAppend(&pPartition->node.pTargets,
nodesCloneNode(nodesListGetNode(pCxt->pCurrRoot->pTargets, 0)));
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -32,3 +32,9 @@ TEST_F(PlanSysTableTest, informationSchema) {
run("SELECT * FROM information_schema.ins_databases WHERE name = 'information_schema'");
}
TEST_F(PlanSysTableTest, withAgg) {
useDb("root", "information_schema");
run("SELECT COUNT(1) FROM ins_users");
}
......@@ -371,7 +371,7 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
return TSDB_CODE_SUCCESS;
}
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
pBuf->statis.getPages += 1;
char* availablePage = NULL;
......
......@@ -18,7 +18,7 @@ void simpleTest() {
int32_t pageId = 0;
int32_t groupId = 0;
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
ASSERT_TRUE(pBufPage != NULL);
ASSERT_EQ(getTotalBufSize(pBuf), 1024);
......@@ -29,26 +29,26 @@ void simpleTest() {
releaseBufPage(pBuf, pBufPage);
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t == pBufPage1);
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t1 == pBufPage2);
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t2 == pBufPage3);
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t3 == pBufPage4);
releaseBufPage(pBuf, pBufPage2);
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t4 == pBufPage5);
......@@ -64,7 +64,7 @@ void writeDownTest() {
int32_t groupId = 0;
int32_t nx = 12345;
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
ASSERT_TRUE(pBufPage != NULL);
*(int32_t*)(pBufPage->data) = nx;
......@@ -73,22 +73,22 @@ void writeDownTest() {
setBufPageDirty(pBufPage, true);
releaseBufPage(pBuf, pBufPage);
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
......@@ -113,32 +113,32 @@ void recyclePageTest() {
int32_t groupId = 0;
int32_t nx = 12345;
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
ASSERT_TRUE(pBufPage != NULL);
releaseBufPage(pBuf, pBufPage);
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseBufPage(pBuf, t4);
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t5 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t5 == pBufPage5);
ASSERT_TRUE(pageId == 5);
......
......@@ -20,15 +20,9 @@ class TDTestCase:
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
srcFile = '%s/../log/tmq_taosx_tmp.source'%(cfgPath)
dstFile = '%s/../log/tmq_taosx_tmp.result'%(cfgPath)
def checkJson(self, cfgPath, name):
srcFile = '%s/../log/%s.source'%(cfgPath, name)
dstFile = '%s/../log/%s.result'%(cfgPath, name)
tdLog.info("compare file: %s, %s"%(srcFile, dstFile))
consumeFile = open(srcFile, mode='r')
......@@ -43,7 +37,31 @@ class TDTestCase:
tdLog.exit("compare error: %s != %s"%src, dst)
else:
break
return
def checkDropData(self):
tdSql.execute('use db_taosx')
tdSql.query("show tables")
tdSql.checkRows(2)
tdSql.query("select * from jt order by i")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 11)
tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}')
tdSql.checkData(1, 2, None)
tdSql.execute('use abc1')
tdSql.query("show tables")
tdSql.checkRows(2)
tdSql.query("select * from jt order by i")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 11)
tdSql.checkData(0, 2, '{"k1":1,"k2":"hello"}')
tdSql.checkData(1, 2, None)
return
def checkData(self):
tdSql.execute('use db_taosx')
tdSql.query("select * from ct3 order by c1 desc")
tdSql.checkRows(2)
......@@ -116,113 +134,82 @@ class TDTestCase:
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 1, 1)
tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}')
tdSql.execute('drop topic if exists topic_ctb_column')
return
def checkFileContentSnapshot(self):
def checkWal1Vgroup(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_snapshot_ci -c %s'%(buildPath, cfgPath)
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
srcFile = '%s/../log/tmq_taosx_tmp_snapshot.source'%(cfgPath)
dstFile = '%s/../log/tmq_taosx_tmp_snapshot.result'%(cfgPath)
tdLog.info("compare file: %s, %s"%(srcFile, dstFile))
consumeFile = open(srcFile, mode='r')
queryFile = open(dstFile, mode='r')
while True:
dst = queryFile.readline()
src = consumeFile.readline()
self.checkJson(cfgPath, "tmq_taosx_tmp")
self.checkData()
if dst:
if dst != src:
tdLog.exit("compare error: %s != %s"%src, dst)
else:
break
return
tdSql.execute('use db_taosx')
tdSql.query("select * from ct3 order by c1 desc")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 51)
tdSql.checkData(0, 4, 940)
tdSql.checkData(1, 1, 23)
tdSql.checkData(1, 4, None)
def checkWalMultiVgroups(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 3 -dv 5'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
tdSql.query("select * from st1 order by ts")
tdSql.checkRows(8)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 3)
tdSql.checkData(4, 1, 4)
tdSql.checkData(6, 1, 23)
self.checkData()
tdSql.checkData(0, 2, 2)
tdSql.checkData(1, 2, 4)
tdSql.checkData(4, 2, 3)
tdSql.checkData(6, 2, 32)
return
tdSql.checkData(0, 3, 'a')
tdSql.checkData(1, 3, 'b')
tdSql.checkData(4, 3, 'hwj')
tdSql.checkData(6, 3, 's21ds')
def checkWalMultiVgroupsWithDropTable(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 3 -dv 5 -d'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
tdSql.checkData(0, 4, None)
tdSql.checkData(1, 4, None)
tdSql.checkData(5, 4, 940)
tdSql.checkData(6, 4, None)
self.checkDropData()
tdSql.checkData(0, 5, 1000)
tdSql.checkData(1, 5, 2000)
tdSql.checkData(4, 5, 1000)
tdSql.checkData(6, 5, 5000)
return
tdSql.checkData(0, 6, 'ttt')
tdSql.checkData(1, 6, None)
tdSql.checkData(4, 6, 'ttt')
tdSql.checkData(6, 6, None)
def checkSnapshot1Vgroup(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -s'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
tdSql.checkData(0, 7, True)
tdSql.checkData(1, 7, None)
tdSql.checkData(4, 7, True)
tdSql.checkData(6, 7, None)
self.checkJson(cfgPath, "tmq_taosx_tmp_snapshot")
self.checkData()
tdSql.checkData(0, 8, None)
tdSql.checkData(1, 8, None)
tdSql.checkData(4, 8, None)
tdSql.checkData(6, 8, None)
return
tdSql.query("select * from ct1")
tdSql.checkRows(4)
def checkSnapshotMultiVgroups(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
tdSql.query("select * from ct2")
tdSql.checkRows(0)
self.checkData()
tdSql.query("select * from ct0 order by c1")
tdSql.checkRows(2)
tdSql.checkData(0, 3, "a")
tdSql.checkData(1, 4, None)
return
tdSql.query("select * from n1 order by cc3 desc")
tdSql.checkRows(2)
tdSql.checkData(0, 1, "eeee")
tdSql.checkData(1, 2, 940)
def checkSnapshotMultiVgroupsWithDropTable(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -sv 2 -dv 4 -s -d'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
tdSql.query("select * from jt order by i desc")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 11)
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 1, 1)
tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}')
self.checkDropData()
return
def run(self):
tdSql.prepare()
self.checkFileContent()
self.checkFileContentSnapshot()
self.checkWal1Vgroup()
self.checkSnapshot1Vgroup()
self.checkWalMultiVgroups()
self.checkSnapshotMultiVgroups()
self.checkWalMultiVgroupsWithDropTable()
self.checkSnapshotMultiVgroupsWithDropTable()
def stop(self):
tdSql.close()
......
......@@ -2,7 +2,6 @@ add_executable(tmq_demo tmqDemo.c)
add_executable(tmq_sim tmqSim.c)
add_executable(create_table createTable.c)
add_executable(tmq_taosx_ci tmq_taosx_ci.c)
add_executable(tmq_taosx_snapshot_ci tmq_taosx_snapshot_ci.c)
add_executable(sml_test sml_test.c)
target_link_libraries(
create_table
......@@ -32,13 +31,6 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_taosx_snapshot_ci
PUBLIC taos_static
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
sml_test
......
......@@ -22,8 +22,16 @@
#include "types.h"
static int running = 1;
TdFilePtr g_fp = NULL;
char dir[64]={0};
TdFilePtr g_fp = NULL;
typedef struct{
bool snapShot;
bool dropTable;
int srcVgroups;
int dstVgroups;
char dir[64];
}Config;
Config g_conf = {0};
static TAOS* use_db(){
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -41,7 +49,6 @@ static TAOS* use_db(){
}
static void msg_process(TAOS_RES* msg) {
/*memset(buf, 0, 1024);*/
printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg));
printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
......@@ -51,8 +58,11 @@ static void msg_process(TAOS_RES* msg) {
if (result) {
printf("meta result: %s\n", result);
}
taosFprintfFile(g_fp, result);
taosFprintfFile(g_fp, "\n");
if(g_fp){
taosFprintfFile(g_fp, result);
taosFprintfFile(g_fp, "\n");
}
tmq_free_json_meta(result);
}
......@@ -61,22 +71,10 @@ static void msg_process(TAOS_RES* msg) {
int32_t ret = tmq_write_raw(pConn, raw);
printf("write raw data: %s\n", tmq_err2str(ret));
// else{
// while(1){
// int numOfRows = 0;
// void *pData = NULL;
// taos_fetch_raw_block(msg, &numOfRows, &pData);
// if(numOfRows == 0) break;
// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows);
// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg));
// printf("write raw data: %s\n", tmq_err2str(ret));
// }
// }
taos_close(pConn);
}
int32_t init_env() {
int32_t init_env(Config *conf) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
......@@ -89,13 +87,22 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 1");
char sql[128] = {0};
snprintf(sql, 128, "create database if not exists db_taosx vgroups %d", conf->dstVgroups);
pRes = taos_query(pConn, sql);
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists topic_db");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists abc1");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
......@@ -103,7 +110,8 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
snprintf(sql, 128, "create database if not exists abc1 vgroups %d", conf->srcVgroups);
pRes = taos_query(pConn, sql);
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -133,7 +141,7 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct0 values(1626006833600, 1, 2, 'a')");
pRes = taos_query(pConn, "insert into ct0 values(1626006833400, 1, 2, 'a')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -168,7 +176,7 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')");
pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -224,6 +232,22 @@ int32_t init_env() {
}
taos_free_result(pRes);
if(conf->dropTable){
pRes = taos_query(pConn, "drop table ct3 ct1");
if (taos_errno(pRes) != 0) {
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
}
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
if (taos_errno(pRes) != 0) {
printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
......@@ -273,6 +297,15 @@ int32_t init_env() {
}
taos_free_result(pRes);
if(conf->dropTable){
pRes = taos_query(pConn, "drop table n1");
if (taos_errno(pRes) != 0) {
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
}
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
......@@ -308,6 +341,23 @@ int32_t init_env() {
}
taos_free_result(pRes);
if(conf->dropTable){
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
}
taos_close(pConn);
return 0;
}
......@@ -327,9 +377,9 @@ int32_t create_topic() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
pRes = taos_query(pConn, "create topic topic_db with meta as database abc1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
......@@ -342,18 +392,7 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p param %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_t* build_consumer(Config *config) {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
......@@ -363,7 +402,9 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "enable.heartbeat.background", "true");
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
if(config->snapShot){
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
}
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
......@@ -374,8 +415,7 @@ tmq_t* build_consumer() {
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_ctb_column");
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
tmq_list_append(topic_list, "topic_db");
return topic_list;
}
......@@ -393,12 +433,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
/*if (cnt >= 2) break;*/
/*printf("get data\n");*/
taos_free_result(tmqmessage);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}else{
break;
}
......@@ -411,52 +446,18 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1;
int msg_count = 0;
int32_t code;
if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
return;
}
tmq_list_t* subList = NULL;
tmq_subscription(tmq, &subList);
char** subTopics = tmq_list_to_c_array(subList);
int32_t sz = tmq_list_get_size(subList);
printf("subscribed topics: ");
for (int32_t i = 0; i < sz; i++) {
printf("%s, ", subTopics[i]);
}
printf("\n");
tmq_list_destroy(subList);
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
msg_process(tmqmessage);
taos_free_result(tmqmessage);
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
code = tmq_consumer_close(tmq);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
void initLogFile() {
void initLogFile(Config *conf) {
char f1[256] = {0};
char f2[256] = {0};
sprintf(f1, "%s/../log/tmq_taosx_tmp.source", dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp.result", dir);
if(conf->snapShot){
sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", conf->dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", conf->dir);
}else{
sprintf(f1, "%s/../log/tmq_taosx_tmp.source", conf->dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp.result", conf->dir);
}
TdFilePtr pFile = taosOpenFile(f1, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM);
if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", f1);
......@@ -469,53 +470,82 @@ void initLogFile() {
fprintf(stderr, "Failed to open %s for save result\n", f2);
exit(-1);
}
char *result[] = {
"{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}",
"{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}",
"{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}",
"{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[]}",
"{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":3000}]}",
"{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}",
"{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":7,\"colName\":\"c3\",\"colType\":8,\"colLength\":64}",
"{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":1,\"colName\":\"t2\",\"colType\":8,\"colLength\":64}",
"{\"type\":\"alter\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"alterType\":4,\"colName\":\"t1\",\"colValue\":\"5000\",\"colValueNull\":false}",
"{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":10,\"length\":4}],\"tags\":[]}",
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":5,\"colName\":\"c3\",\"colType\":5}",
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":7,\"colName\":\"c2\",\"colType\":10,\"colLength\":8}",
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":10,\"colName\":\"c3\",\"colNewName\":\"cc3\"}",
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":9}",
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":6,\"colName\":\"c1\"}",
"{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
"{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}",
"{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}"
};
for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){
taosFprintfFile(pFile2, result[i]);
taosFprintfFile(pFile2, "\n");
if(conf->snapShot){
char *result[] = {
"{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}",
"{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}",
"{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}",
"{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[]}",
"{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":5000}]}",
"{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c2\",\"type\":10,\"length\":8},{\"name\":\"cc3\",\"type\":5}],\"tags\":[]}",
"{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
"{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}",
"{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}",
};
for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){
taosFprintfFile(pFile2, result[i]);
taosFprintfFile(pFile2, "\n");
}
}else{
char *result[] = {
"{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":16}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1}]}",
"{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}",
"{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}",
"{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[]}",
"{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":3,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":3000}]}",
"{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}",
"{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":7,\"colName\":\"c3\",\"colType\":8,\"colLength\":64}",
"{\"type\":\"alter\",\"tableName\":\"st1\",\"tableType\":\"super\",\"alterType\":1,\"colName\":\"t2\",\"colType\":8,\"colLength\":64}",
"{\"type\":\"alter\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"alterType\":4,\"colName\":\"t1\",\"colValue\":\"5000\",\"colValueNull\":false}",
"{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":10,\"length\":4}],\"tags\":[]}",
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":5,\"colName\":\"c3\",\"colType\":5}",
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":7,\"colName\":\"c2\",\"colType\":10,\"colLength\":8}",
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":10,\"colName\":\"c3\",\"colNewName\":\"cc3\"}",
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":9}",
"{\"type\":\"alter\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"alterType\":6,\"colName\":\"c1\"}",
"{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
"{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}",
"{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}"
};
for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){
taosFprintfFile(pFile2, result[i]);
taosFprintfFile(pFile2, "\n");
}
}
taosCloseFile(&pFile2);
}
int main(int argc, char* argv[]) {
if(argc == 3 && strcmp(argv[1], "-c") == 0) {
strcpy(dir, argv[2]);
}else{
// strcpy(dir, "../../../sim/psim/cfg");
strcpy(dir, "/var/log");
for (int32_t i = 1; i < argc; i++) {
if(strcmp(argv[i], "-c") == 0){
strcpy(g_conf.dir, argv[++i]);
}else if(strcmp(argv[i], "-s") == 0){
g_conf.snapShot = true;
}else if(strcmp(argv[i], "-d") == 0){
g_conf.dropTable = true;
}else if(strcmp(argv[i], "-sv") == 0){
g_conf.srcVgroups = atol(argv[++i]);
}else if(strcmp(argv[i], "-dv") == 0){
g_conf.dstVgroups = atol(argv[++i]);
}
}
printf("env init\n");
initLogFile();
if(strlen(g_conf.dir) != 0){
initLogFile(&g_conf);
}
if (init_env() < 0) {
if (init_env(&g_conf) < 0) {
return -1;
}
create_topic();
tmq_t* tmq = build_consumer();
tmq_t* tmq = build_consumer(&g_conf);
tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
taosCloseFile(&g_fp);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
static int running = 1;
TdFilePtr g_fp = NULL;
char dir[64]={0};
static TAOS* use_db(){
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return NULL;
}
TAOS_RES* pRes = taos_query(pConn, "use db_taosx");
if (taos_errno(pRes) != 0) {
printf("error in use db_taosx, reason:%s\n", taos_errstr(pRes));
return NULL;
}
taos_free_result(pRes);
return pConn;
}
static void msg_process(TAOS_RES* msg) {
/*memset(buf, 0, 1024);*/
printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg));
printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg));
TAOS *pConn = use_db();
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
char* result = tmq_get_json_meta(msg);
if (result) {
printf("meta result: %s\n", result);
}
taosFprintfFile(g_fp, result);
taosFprintfFile(g_fp, "\n");
tmq_free_json_meta(result);
}
tmq_raw_data raw = {0};
tmq_get_raw(msg, &raw);
int32_t ret = tmq_write_raw(pConn, raw);
printf("write raw data: %s\n", tmq_err2str(ret));
// else{
// while(1){
// int numOfRows = 0;
// void *pData = NULL;
// taos_fetch_raw_block(msg, &numOfRows, &pData);
// if(numOfRows == 0) break;
// printf("write data: tbname:%s, numOfRows:%d\n", tmq_get_table_name(msg), numOfRows);
// int ret = taos_write_raw_block(pConn, numOfRows, pData, tmq_get_table_name(msg));
// printf("write raw data: %s\n", tmq_err2str(ret));
// }
// }
taos_close(pConn);
}
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 1");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists abc1");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct0 values(1626006833600, 1, 2, 'a')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, 'ddd') ct0 values(1626006833602, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (1626006833609, 51, 62, 'c333', 940)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 select * from ct1");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table ct3 set tag t1=5000");
if (taos_errno(pRes) != 0) {
printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
if (taos_errno(pRes) != 0) {
printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 add column c3 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 rename column c3 cc3");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 comment 'hello'");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 drop column c1");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt2 using jt tags('')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into jt1 values(now, 1)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into jt2 values(now, 11)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
int32_t create_topic() {
printf("create topic\n");
TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p param %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
#if 0
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
#endif
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "enable.heartbeat.background", "true");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_ctb_column");
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/
return topic_list;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t code;
if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
printf("subscribe err\n");
return;
}
int32_t cnt = 0;
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
/*if (cnt >= 2) break;*/
/*printf("get data\n");*/
taos_free_result(tmqmessage);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}else{
break;
}
}
code = tmq_consumer_close(tmq);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1;
int msg_count = 0;
int32_t code;
if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
return;
}
tmq_list_t* subList = NULL;
tmq_subscription(tmq, &subList);
char** subTopics = tmq_list_to_c_array(subList);
int32_t sz = tmq_list_get_size(subList);
printf("subscribed topics: ");
for (int32_t i = 0; i < sz; i++) {
printf("%s, ", subTopics[i]);
}
printf("\n");
tmq_list_destroy(subList);
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
msg_process(tmqmessage);
taos_free_result(tmqmessage);
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
code = tmq_consumer_close(tmq);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
void initLogFile() {
char f1[256] = {0};
char f2[256] = {0};
sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", dir);
TdFilePtr pFile = taosOpenFile(f1, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM);
if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", f1);
exit(-1);
}
g_fp = pFile;
TdFilePtr pFile2 = taosOpenFile(f2, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM);
if (NULL == pFile2) {
fprintf(stderr, "Failed to open %s for save result\n", f2);
exit(-1);
}
char *result[] = {
"{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}",
"{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}",
"{\"type\":\"create\",\"tableName\":\"ct1\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":2000}]}",
"{\"type\":\"create\",\"tableName\":\"ct2\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[]}",
"{\"type\":\"create\",\"tableName\":\"ct3\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":5000}]}",
"{\"type\":\"create\",\"tableName\":\"n1\",\"tableType\":\"normal\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c2\",\"type\":10,\"length\":8},{\"name\":\"cc3\",\"type\":5}],\"tags\":[]}",
"{\"type\":\"create\",\"tableName\":\"jt\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"i\",\"type\":4}],\"tags\":[{\"name\":\"t\",\"type\":15}]}",
"{\"type\":\"create\",\"tableName\":\"jt1\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[{\"name\":\"t\",\"type\":15,\"value\":\"{\\\"k1\\\":1,\\\"k2\\\":\\\"hello\\\"}\"}]}",
"{\"type\":\"create\",\"tableName\":\"jt2\",\"tableType\":\"child\",\"using\":\"jt\",\"tagNum\":1,\"tags\":[]}",
};
for(int i = 0; i < sizeof(result)/sizeof(result[0]); i++){
taosFprintfFile(pFile2, result[i]);
taosFprintfFile(pFile2, "\n");
}
taosCloseFile(&pFile2);
}
int main(int argc, char* argv[]) {
if(argc == 3 && strcmp(argv[1], "-c") == 0) {
strcpy(dir, argv[2]);
}else{
// strcpy(dir, "../../../sim/psim/cfg");
strcpy(dir, "/var/log");
}
printf("env init\n");
initLogFile();
if (init_env() < 0) {
return -1;
}
create_topic();
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
taosCloseFile(&g_fp);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册