未验证 提交 590b9015 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Feature/sangshuduo/td 3408 taosdemo async query (#5731)

* test

* [TD-3677]<test>: test pr message 1

* [TD-3671]<test>change target branch

* [TD-3677]<test>: test pr message 2

* [TD-3677]<test>: test pr message 3

* Hotfix/sangshuduo/td 3197 fix taosdemo coverity scan (#5688)

* [TD-3197] <fix>: fix taosdemo coverity scan issues.

* [TD-3197] <fix>: fix taosdemo coverity scan issue.

fix subscribeTest pids uninitialized.

* [TD-3197] <fix>: fix taosdemo coverity scan issues.

* [TD-3197] <fix>: fix coverity scan issues.

check super tbl info pointer.

* [TD-3197] <fix>: fix coverity scan issues.

move sub tbl query thread join into loop

* [TD-3197] <fix>: fix coverity scan issues.

remove unused variable

* [TD-3197] <fix>: fix coverity scan issues.

use more secure random library

* [TD-3197] <fix>: fix coverity scan issues.

use strncpy for more safe

* [TD-3197] <fix>: fix taosdemo coverity scan issue.

replace arc4random with rand().

* [TD-3197] <fix>: fix coverity scan issues.

check stb info pointer for start time

* [TD-3197] <fix>: fix coverity scan issues.

fix strcpy vulnerability

* [TD-3197] <fix>: fix taosdemo coverity scan issue.

modify taosdemoTest2. try to check database continously.

* [TD-3197] <fix>: taosdemo coverity scan issues.

* [TD-3197] <fix>: fix memory leak when parsing arguments.

* [TD-3197] <fix>: fix cmake strip arguments.

* [TD-3197] <fix>: taosdemo coverity scan.

fix cmake string manipulation.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>

* remove useless file

* fix changing target branch

* fix

* fix

* Hotfix/sangshuduo/td 3607 taosdemo buffer overflow (#5706)

* [TD-3607] <fix>: fix taosdemo buffer overflow.

* [TD-3607] <fix>: taosdemo buffer overflow.

add tmp buffer.

* [TD-3607] <fix>: taosdemo buffer overflow.

fix data generation.

* [TD-3607] <fix>: taosdemo buffer overflow.

fix normal table writting.

* [TD-3607] <fix>: taosdemo buffer overflow.

remove tail spaces.

* [TD-3607] <fix>: taosdemo buffer overflow.

fix taosdemo alter table test case.

* [TD-3607] <fix>: taosdemo buffer overflow.

fix taosdemo alter table case.

* [TD-3607] <fix>: taosdemo buffer overflow.

adjust limit offset count warning.

* [TD-3607] <fix>: taosdemo buffer overflow.

add more logic for child tables exist.

* [TD-3607] <fix>: taosdemo buffer overflow.

create database if database be dropped only.

* [TD-3607] <fix>: fix taosdemo buffer overflow.

adjust limit and offset test cases.

* [TD-3607] <fix>: taosdemo buffer overflow.

adjust sample data test case.

* [TD-3607]<fix>: taosdemo limit and offset.

if limit+offset > count
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>

* Hotfix/sangshuduo/td 3607 taosdemo buffer overflow (#5713)

* [TD-3607] <fix>: fix taosdemo buffer overflow.

* [TD-3607] <fix>: taosdemo buffer overflow.

add tmp buffer.

* [TD-3607] <fix>: taosdemo buffer overflow.

fix data generation.

* [TD-3607] <fix>: taosdemo buffer overflow.

fix normal table writting.

* [TD-3607] <fix>: taosdemo buffer overflow.

remove tail spaces.

* [TD-3607] <fix>: taosdemo buffer overflow.

fix taosdemo alter table test case.

* [TD-3607] <fix>: taosdemo buffer overflow.

fix taosdemo alter table case.

* [TD-3607] <fix>: taosdemo buffer overflow.

adjust limit offset count warning.

* [TD-3607] <fix>: taosdemo buffer overflow.

add more logic for child tables exist.

* [TD-3607] <fix>: taosdemo buffer overflow.

create database if database be dropped only.

* [TD-3607] <fix>: fix taosdemo buffer overflow.

adjust limit and offset test cases.

* [TD-3607] <fix>: taosdemo buffer overflow.

adjust sample data test case.

* [TD-3607]<fix>: taosdemo limit and offset.

if limit+offset > count

* [TD-3607]<fix>: taosdemo limit and offset.

if child tbl not exist, dont take limit and offset value.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>

* [TD-3683]<fix>: reduce buffer size for more stable table creation. (#5719)
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>

* [TD-3408]<feature>: taosdemo support async query.

* [TD-3408]<feature>: taosdemo support async query.

refactor

* [TD-3408]<feature>: taosdemo support async query.

refactor 2

* [TD-3408]<feature>: taosdemo support async query.

refactor 3

* [TD-3408]<feature>: taosdemo support async query.

refactor 4

* [TD-3408]<feature>: taosdemo support specified sql more than one line.
Co-authored-by: Nhuili <52318143+plum-lihui@users.noreply.github.com>
Co-authored-by: Nliuyq-617 <yqliu@taosdata.com>
Co-authored-by: Nplum-lihui <huili@taosdata.com>
Co-authored-by: NElias Soong <elias.soong@gmail.com>
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
Co-authored-by: NShengliang Guan <slguan@taosdata.com>
上级 766f1634
......@@ -67,6 +67,12 @@ enum TEST_MODE {
INVAID_TEST
};
enum QUERY_MODE {
SYNC_QUERY_MODE, // 0
ASYNC_QUERY_MODE, // 1
INVALID_MODE
};
#define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2)
#define MAX_USERNAME_SIZE 64
......@@ -198,7 +204,7 @@ typedef struct SArguments_S {
bool verbose_print;
bool performance_print;
char * output_file;
int mode;
int query_mode;
char * datatype[MAX_NUM_DATATYPE + 1];
int len_of_binary;
int num_of_CPR;
......@@ -351,7 +357,7 @@ typedef struct SpecifiedQueryInfo_S {
int rate; // 0: unlimit > 0 loop/s
int concurrent;
int sqlCount;
int subscribeMode; // 0: sync, 1: async
int mode; // 0: sync, 1: async
int subscribeInterval; // ms
int queryTimes;
int subscribeRestart;
......@@ -365,7 +371,7 @@ typedef struct SuperQueryInfo_S {
char sTblName[MAX_TB_NAME_SIZE+1];
int rate; // 0: unlimit > 0 loop/s
int threadCnt;
int subscribeMode; // 0: sync, 1: async
int mode; // 0: sync, 1: async
int subscribeInterval; // ms
int subscribeRestart;
int subscribeKeepProgress;
......@@ -429,6 +435,8 @@ typedef struct SThreadInfo_S {
int64_t maxDelay;
int64_t minDelay;
// query
int querySeq; // sequence number of sql command
} threadInfo;
#ifdef WINDOWS
......@@ -714,7 +722,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} else if (strcmp(argv[i], "-s") == 0) {
arguments->sqlFile = argv[++i];
} else if (strcmp(argv[i], "-q") == 0) {
arguments->mode = atoi(argv[++i]);
arguments->query_mode = atoi(argv[++i]);
} else if (strcmp(argv[i], "-T") == 0) {
arguments->num_of_threads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") == 0) {
......@@ -758,7 +766,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
char *dupstr = strdup(argv[i]);
char *running = dupstr;
char *token = strsep(&running, ",");
while (token != NULL) {
while(token != NULL) {
if (strcasecmp(token, "INT")
&& strcasecmp(token, "FLOAT")
&& strcasecmp(token, "TINYINT")
......@@ -964,7 +972,7 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
char temp[16000];
// fetch the records row by row
while ((row = taos_fetch_row(res))) {
while((row = taos_fetch_row(res))) {
if (totalLen >= 100*1024*1024 - 32000) {
if (fp) fprintf(fp, "%s", databuf);
totalLen = 0;
......@@ -986,7 +994,8 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName) {
TAOS_RES *res = taos_query(taos, command);
if (res == NULL || taos_errno(res) != 0) {
printf("failed to sql:%s, reason:%s\n", command, taos_errstr(res));
errorPrint("%s() LN%d, failed to execute sql:%s, reason:%s\n",
__func__, __LINE__, command, taos_errstr(res));
taos_free_result(res);
return;
}
......@@ -1163,7 +1172,8 @@ static int printfInsertMeta() {
if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
printf(" precision: \033[33m%s\033[0m\n", g_Dbs.db[i].dbCfg.precision);
printf(" precision: \033[33m%s\033[0m\n",
g_Dbs.db[i].dbCfg.precision);
} else {
printf("\033[1m\033[40;31m precision error: %s\033[0m\n",
g_Dbs.db[i].dbCfg.precision);
......@@ -1171,11 +1181,13 @@ static int printfInsertMeta() {
}
}
printf(" super table count: \033[33m%d\033[0m\n", g_Dbs.db[i].superTblCount);
printf(" super table count: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
printf(" super table[\033[33m%d\033[0m]:\n", j);
printf(" stbName: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sTblName);
printf(" stbName: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].sTblName);
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "no");
......@@ -1241,7 +1253,7 @@ static int printfInsertMeta() {
g_Dbs.db[i].superTbls[j].sampleFile);
printf(" tagsFile: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].tagsFile);
printf(" columnCount: \033[33m%d\033[0m\n ",
printf(" columnCount: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].columnCount);
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
......@@ -1459,41 +1471,61 @@ static void printfQueryMeta() {
printf("\n");
printf("specified table query info: \n");
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.rate);
printf("query interval: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.rate);
printf("top query times:\033[33m%d\033[0m\n", g_args.query_times);
printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.concurrent);
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.sqlCount);
printf("concurrent: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.concurrent);
printf("sqlCount: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.sqlCount);
printf("specified tbl query times:\n");
printf(" \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.queryTimes);
printf(" \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.queryTimes);
if (SUBSCRIBE_TEST == g_args.test_mode) {
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeMode);
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeRestart);
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
printf("mod: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.mode);
printf("interval: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeRestart);
printf("keepProgress: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
}
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.specifiedQueryInfo.sql[i]);
printf(" sql[%d]: \033[33m%s\033[0m\n",
i, g_queryInfo.specifiedQueryInfo.sql[i]);
}
printf("\n");
printf("super table query info: \n");
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.rate);
printf("threadCnt: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.threadCnt);
printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.childTblCount);
printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.superQueryInfo.sTblName);
printf("stb query times:\033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.queryTimes);
printf("super table query info:\n");
printf("query interval: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.rate);
printf("threadCnt: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.threadCnt);
printf("childTblCount: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.childTblCount);
printf("stable name: \033[33m%s\033[0m\n",
g_queryInfo.superQueryInfo.sTblName);
printf("stb query times:\033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.queryTimes);
if (SUBSCRIBE_TEST == g_args.test_mode) {
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode);
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart);
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeKeepProgress);
}
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount);
printf("mod: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.mode);
printf("interval: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.subscribeRestart);
printf("keepProgress: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress);
}
printf("sqlCount: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.sqlCount);
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.superQueryInfo.sql[i]);
printf(" sql[%d]: \033[33m%s\033[0m\n",
i, g_queryInfo.superQueryInfo.sql[i]);
}
printf("\n");
......@@ -1637,7 +1669,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
TAOS_FIELD *fields = taos_fetch_fields(res);
while ((row = taos_fetch_row(res)) != NULL) {
while((row = taos_fetch_row(res)) != NULL) {
// sys database name : 'log'
if (strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log",
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0) {
......@@ -1670,7 +1702,8 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
dbInfos[count]->wallevel = *((int8_t *)row[TSDB_SHOW_DB_WALLEVEL_INDEX]);
dbInfos[count]->fsync = *((int32_t *)row[TSDB_SHOW_DB_FSYNC_INDEX]);
dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
dbInfos[count]->cachelast =
(int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
tstrncpy(dbInfos[count]->precision,
(char *)row[TSDB_SHOW_DB_PRECISION_INDEX],
......@@ -1681,7 +1714,8 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
count++;
if (count > MAX_DATABASE_COUNT) {
errorPrint( "The database count overflow than %d\n", MAX_DATABASE_COUNT);
errorPrint("%s() LN%d, The database count overflow than %d\n",
__func__, __LINE__, MAX_DATABASE_COUNT);
break;
}
}
......@@ -1691,6 +1725,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
static void printfDbInfoForQueryToFile(
char* filename, SDbInfo* dbInfos, int index) {
if (filename[0] == 0)
return;
......@@ -1909,7 +1944,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
if (bytes == 0)
break;
sent+=bytes;
} while (sent < req_str_len);
} while(sent < req_str_len);
memset(response_buf, 0, RESP_BUF_LEN);
resp_len = sizeof(response_buf) - 1;
......@@ -1927,7 +1962,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
if (bytes == 0)
break;
received += bytes;
} while (received < resp_len);
} while(received < resp_len);
if (received == resp_len) {
free(request_buf);
......@@ -1951,7 +1986,8 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) {
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
if (NULL == dataBuf) {
errorPrint("%s() LN%d, calloc failed! size:%d\n", __func__, __LINE__, TSDB_MAX_SQL_LEN+1);
errorPrint("%s() LN%d, calloc failed! size:%d\n",
__func__, __LINE__, TSDB_MAX_SQL_LEN+1);
return NULL;
}
......@@ -2155,7 +2191,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
}
char* pTblName = childTblName;
while ((row = taos_fetch_row(res)) != NULL) {
while((row = taos_fetch_row(res)) != NULL) {
int32_t* len = taos_fetch_lengths(res);
tstrncpy(pTblName, (char *)row[0], len[0]+1);
//printf("==== sub table name: %s\n", pTblName);
......@@ -2218,7 +2254,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
int tagIndex = 0;
int columnIndex = 0;
TAOS_FIELD *fields = taos_fetch_fields(res);
while ((row = taos_fetch_row(res)) != NULL) {
while((row = taos_fetch_row(res)) != NULL) {
if (0 == count) {
count++;
continue;
......@@ -2765,7 +2801,7 @@ static void createChildTables() {
// normal table
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
int j = 0;
while (g_args.datatype[j]) {
while(g_args.datatype[j]) {
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.datatype[j],
"NCHAR", strlen("NCHAR")) == 0)) {
......@@ -2824,7 +2860,7 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
return -1;
}
while ((readLen = tgetline(&line, &n, fp)) != -1) {
while((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
}
......@@ -2888,7 +2924,7 @@ static int readSampleFromCsvFileToMem(
assert(superTblInfo->sampleDataBuf);
memset(superTblInfo->sampleDataBuf, 0,
MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow);
while (1) {
while(1) {
readLen = tgetline(&line, &n, fp);
if (-1 == readLen) {
if(0 != fseek(fp, 0, SEEK_SET)) {
......@@ -2967,7 +3003,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
if (countObj && countObj->type == cJSON_Number) {
count = countObj->valueint;
} else if (countObj && countObj->type != cJSON_Number) {
errorPrint("%s() LN%d, failed to read json, column count not found\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, column count not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
count = 1;
......@@ -2976,8 +3013,10 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
// column info
memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(column, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) {
errorPrint("%s() LN%d: failed to read json, column type not found\n", __func__, __LINE__);
if (!dataType || dataType->type != cJSON_String
|| dataType->valuestring == NULL) {
errorPrint("%s() LN%d: failed to read json, column type not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
//tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
......@@ -2987,7 +3026,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) {
debugPrint("%s() LN%d: failed to read json, column len not found\n", __func__, __LINE__);
debugPrint("%s() LN%d: failed to read json, column len not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
columnCase.dataLen = 8;
......@@ -3007,13 +3047,15 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
// tags
cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags");
if (!tags || tags->type != cJSON_Array) {
debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, tags not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
int tagSize = cJSON_GetArraySize(tags);
if (tagSize > MAX_TAG_COUNT) {
debugPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n", __func__, __LINE__, MAX_TAG_COUNT);
errorPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n",
__func__, __LINE__, MAX_TAG_COUNT);
goto PARSE_OVER;
}
......@@ -3036,8 +3078,10 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
// column info
memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(tag, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) {
printf("ERROR: failed to read json, tag type not found\n");
if (!dataType || dataType->type != cJSON_String
|| dataType->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, tag type not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
tstrncpy(columnCase.dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
......@@ -3046,14 +3090,16 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) {
printf("ERROR: failed to read json, column len not found\n");
errorPrint("%s() LN%d, failed to read json, column len not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
columnCase.dataLen = 0;
}
for (int n = 0; n < count; ++n) {
tstrncpy(superTbls->tags[index].dataType, columnCase.dataType, MAX_TB_NAME_SIZE);
tstrncpy(superTbls->tags[index].dataType, columnCase.dataType,
MAX_TB_NAME_SIZE);
superTbls->tags[index].dataLen = columnCase.dataLen;
index++;
}
......@@ -3063,9 +3109,6 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
ret = true;
PARSE_OVER:
//free(content);
//cJSON_Delete(root);
//fclose(fp);
return ret;
}
......@@ -3142,7 +3185,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!gInsertInterval) {
g_args.insert_interval = 0;
} else {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3163,7 +3207,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!interlaceRows) {
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3173,7 +3218,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!maxSqlLen) {
g_args.max_sql_len = TSDB_PAYLOAD_SIZE;
} else {
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3183,7 +3229,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!numRecPerReq) {
g_args.num_of_RPR = 0xffff;
} else {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3509,7 +3556,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!dataSource) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE);
} else {
errorPrint("%s() LN%d, failed to read json, data_source not found\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, data_source not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3584,7 +3632,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
}
cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file");
if (sampleFile && sampleFile->type == cJSON_String && sampleFile->valuestring != NULL) {
if (sampleFile && sampleFile->type == cJSON_String
&& sampleFile->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile,
sampleFile->valuestring, MAX_FILE_NAME_LEN);
} else if (!sampleFile) {
......@@ -3727,9 +3776,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
ret = true;
PARSE_OVER:
//free(content);
//cJSON_Delete(root);
//fclose(fp);
return ret;
}
......@@ -3795,7 +3841,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (!gQueryTimes) {
g_args.query_times = 1;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3833,35 +3880,45 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.specifiedQueryInfo.rate = 0;
}
cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery, "query_times");
cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery,
"query_times");
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint;
} else if (!specifiedQueryTimes) {
g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent");
if (concurrent && concurrent->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint;
if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) {
errorPrint("%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
__func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.specifiedQueryInfo.concurrent);
goto PARSE_OVER;
}
} else if (!concurrent) {
g_queryInfo.specifiedQueryInfo.concurrent = 1;
}
cJSON* mode = cJSON_GetObjectItem(specifiedQuery, "mode");
if (mode && mode->type == cJSON_String && mode->valuestring != NULL) {
if (0 == strcmp("sync", mode->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeMode = 0;
} else if (0 == strcmp("async", mode->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeMode = 1;
cJSON* queryMode = cJSON_GetObjectItem(specifiedQuery, "mode");
if (queryMode && queryMode->type == cJSON_String
&& queryMode->valuestring != NULL) {
if (0 == strcmp("sync", queryMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.mode = SYNC_QUERY_MODE;
} else if (0 == strcmp("async", queryMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.mode = ASYNC_QUERY_MODE;
} else {
printf("ERROR: failed to read json, subscribe mod error\n");
errorPrint("%s() LN%d, failed to read json, query mode input error\n",
__func__, __LINE__);
goto PARSE_OVER;
}
} else {
g_queryInfo.specifiedQueryInfo.subscribeMode = 0;
g_queryInfo.specifiedQueryInfo.mode = SYNC_QUERY_MODE;
}
cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval");
......@@ -3908,12 +3965,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (!superSqls) {
g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (superSqls->type != cJSON_Array) {
printf("ERROR: failed to read json, super sqls not found\n");
errorPrint("%s() LN%d, failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT);
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
......@@ -3965,7 +4024,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (!superQueryTimes) {
g_queryInfo.superQueryInfo.queryTimes = g_args.query_times;
} else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3984,25 +4044,30 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
//}
cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname");
if (stblname && stblname->type == cJSON_String && stblname->valuestring != NULL) {
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring, MAX_TB_NAME_SIZE);
if (stblname && stblname->type == cJSON_String
&& stblname->valuestring != NULL) {
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring,
MAX_TB_NAME_SIZE);
} else {
printf("ERROR: failed to read json, super table name not found\n");
errorPrint("%s() LN%d, failed to read json, super table name input error\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* submode = cJSON_GetObjectItem(superQuery, "mode");
if (submode && submode->type == cJSON_String && submode->valuestring != NULL) {
if (submode && submode->type == cJSON_String
&& submode->valuestring != NULL) {
if (0 == strcmp("sync", submode->valuestring)) {
g_queryInfo.superQueryInfo.subscribeMode = 0;
g_queryInfo.superQueryInfo.mode = SYNC_QUERY_MODE;
} else if (0 == strcmp("async", submode->valuestring)) {
g_queryInfo.superQueryInfo.subscribeMode = 1;
g_queryInfo.superQueryInfo.mode = ASYNC_QUERY_MODE;
} else {
printf("ERROR: failed to read json, subscribe mod error\n");
errorPrint("%s() LN%d, failed to read json, query mode input error\n",
__func__, __LINE__);
goto PARSE_OVER;
}
} else {
g_queryInfo.superQueryInfo.subscribeMode = 0;
g_queryInfo.superQueryInfo.mode = SYNC_QUERY_MODE;
}
cJSON* subinterval = cJSON_GetObjectItem(superQuery, "interval");
......@@ -4015,7 +4080,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
}
cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart");
if (subrestart && subrestart->type == cJSON_String && subrestart->valuestring != NULL) {
if (subrestart && subrestart->type == cJSON_String
&& subrestart->valuestring != NULL) {
if (0 == strcmp("yes", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 1;
} else if (0 == strcmp("no", subrestart->valuestring)) {
......@@ -4049,12 +4115,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (!subsqls) {
g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (subsqls->type != cJSON_Array) {
printf("ERROR: failed to read json, super sqls not found\n");
errorPrint("%s() LN%d: failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(subsqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT);
errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
......@@ -4064,19 +4132,25 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) {
printf("ERROR: failed to read json, sql not found\n");
if (!sqlStr || sqlStr->type != cJSON_String
|| sqlStr->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, sql not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
MAX_QUERY_SQL_LENGTH);
cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String && result->valuestring != NULL){
tstrncpy(g_queryInfo.superQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN);
if (result != NULL && result->type == cJSON_String
&& result->valuestring != NULL){
tstrncpy(g_queryInfo.superQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) {
memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
} else {
printf("ERROR: failed to read json, sub query result file not found\n");
errorPrint("%s() LN%d, failed to read json, sub query result file not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
}
......@@ -4086,9 +4160,6 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
ret = true;
PARSE_OVER:
//free(content);
//cJSON_Delete(root);
//fclose(fp);
return ret;
}
......@@ -5415,7 +5486,7 @@ static void *readTable(void *sarg) {
return NULL;
}
while (taos_fetch_row(pSql) != NULL) {
while(taos_fetch_row(pSql) != NULL) {
count++;
}
......@@ -5491,7 +5562,7 @@ static void *readMetric(void *sarg) {
return NULL;
}
int count = 0;
while (taos_fetch_row(pSql) != NULL) {
while(taos_fetch_row(pSql) != NULL) {
count++;
}
t = getCurrentTimeUs() - t;
......@@ -5602,7 +5673,7 @@ static int insertTestProcess() {
return 0;
}
static void *superQueryProcess(void *sarg) {
static void *specifiedQueryProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
if (winfo->taos == NULL) {
......@@ -5643,32 +5714,35 @@ static void *superQueryProcess(void *sarg) {
}
st = taosGetTimestampUs();
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
int64_t t1 = taosGetTimestampUs();
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
}
selectAndGetResult(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], tmpFile);
int64_t t2 = taosGetTimestampUs();
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
} else {
int64_t t1 = taosGetTimestampUs();
int retCode = postProceSql(g_queryInfo.host,
g_queryInfo.port, g_queryInfo.specifiedQueryInfo.sql[i]);
int64_t t2 = taosGetTimestampUs();
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n", winfo->threadID);
return NULL;
}
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
int64_t t1 = taosGetTimestampUs();
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[winfo->querySeq][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[winfo->querySeq],
winfo->threadID);
}
selectAndGetResult(winfo->taos,
g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq], tmpFile);
int64_t t2 = taosGetTimestampUs();
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
} else {
int64_t t1 = taosGetTimestampUs();
int retCode = postProceSql(g_queryInfo.host,
g_queryInfo.port,
g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq]);
int64_t t2 = taosGetTimestampUs();
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n", winfo->threadID);
return NULL;
}
}
et = taosGetTimestampUs();
printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n",
taosGetSelfPthreadId(), (double)(et - st)/1000.0);
......@@ -5698,7 +5772,7 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
//printf("3: %s\n", outSql);
}
static void *subQueryProcess(void *sarg) {
static void *superQueryProcess(void *sarg) {
char sqlstr[1024];
threadInfo *winfo = (threadInfo *)sarg;
......@@ -5791,43 +5865,45 @@ static int queryTestProcess() {
pthread_t *pids = NULL;
threadInfo *infos = NULL;
//==== create sub threads for query from specify table
if (g_queryInfo.specifiedQueryInfo.sqlCount > 0
&& g_queryInfo.specifiedQueryInfo.concurrent > 0) {
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t));
if (NULL == pids) {
taos_close(taos);
ERROR_EXIT("memory allocation failed\n");
}
infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo));
if (NULL == infos) {
if ((nSqlCount > 0) && (nConcurrent > 0)) {
pids = malloc(nConcurrent * nSqlCount * sizeof(pthread_t));
infos = malloc(nConcurrent * nSqlCount * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
taos_close(taos);
free(pids);
ERROR_EXIT("memory allocation failed for create threads\n");
}
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) {
threadInfo *t_info = infos + i * nSqlCount + j;
t_info->threadID = i * nSqlCount + j;
t_info->querySeq = j;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) {
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(taos);
free(infos);
free(pids);
errorPrint( "use database %s failed!\n\n",
g_queryInfo.dbName);
return -1;
}
}
}
t_info->taos = NULL;// TODO: workaround to use separate taos connection;
t_info->taos = NULL;// TODO: workaround to use separate taos connection;
pthread_create(pids + i, NULL, superQueryProcess, t_info);
pthread_create(pids + i * nSqlCount + j, NULL, specifiedQueryProcess,
t_info);
}
}
} else {
g_queryInfo.specifiedQueryInfo.concurrent = 0;
......@@ -5841,18 +5917,12 @@ static int queryTestProcess() {
if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t));
if (NULL == pidsOfSub) {
free(infos);
free(pids);
ERROR_EXIT("memory allocation failed for create threads\n");
}
infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo));
if (NULL == infosOfSub) {
free(pidsOfSub);
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
free(infos);
free(pids);
ERROR_EXIT("memory allocation failed for create threads\n");
}
......@@ -5880,7 +5950,7 @@ static int queryTestProcess() {
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info);
pthread_create(pidsOfSub + i, NULL, superQueryProcess, t_info);
}
g_queryInfo.superQueryInfo.threadCnt = threads;
......@@ -5888,8 +5958,12 @@ static int queryTestProcess() {
g_queryInfo.superQueryInfo.threadCnt = 0;
}
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
pthread_join(pids[i], NULL);
if ((nSqlCount > 0) && (nConcurrent > 0)) {
for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) {
pthread_join(pids[i * nSqlCount + j], NULL);
}
}
}
tmfree((char*)pids);
......@@ -5920,7 +5994,7 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) {
TAOS_SUB* tsub = NULL;
if (g_queryInfo.specifiedQueryInfo.subscribeMode) {
if (g_queryInfo.specifiedQueryInfo.mode) {
tsub = taos_subscribe(taos,
g_queryInfo.specifiedQueryInfo.subscribeRestart,
topic, sql, subscribe_callback, (void*)resultFileName,
......@@ -5996,13 +6070,13 @@ static void *subSubscribeProcess(void *sarg) {
}
//et = taosGetTimestampMs();
//printf("========thread[%"PRId64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0);
} while (0);
} while(0);
// start loop to consume result
TAOS_RES* res = NULL;
while (1) {
while(1) {
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
if (1 == g_queryInfo.superQueryInfo.subscribeMode) {
if (1 == g_queryInfo.superQueryInfo.mode) {
continue;
}
......@@ -6073,7 +6147,8 @@ static void *superSubscribeProcess(void *sarg) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
}
tsub[i] = subscribeImpl(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile);
tsub[i] = subscribeImpl(winfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile);
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) {
taos_close(winfo->taos);
return NULL;
......@@ -6081,13 +6156,13 @@ static void *superSubscribeProcess(void *sarg) {
}
//et = taosGetTimestampMs();
//printf("========thread[%"PRId64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0);
} while (0);
} while(0);
// start loop to consume result
TAOS_RES* res = NULL;
while (1) {
while(1) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (1 == g_queryInfo.specifiedQueryInfo.subscribeMode) {
if (SYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) {
continue;
}
......@@ -6105,7 +6180,8 @@ static void *superSubscribeProcess(void *sarg) {
taos_free_result(res);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
taos_unsubscribe(tsub[i], g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
taos_unsubscribe(tsub[i],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
}
taos_close(winfo->taos);
......@@ -6308,7 +6384,7 @@ static void setParaFromArg(){
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.queryMode = g_args.mode;
g_Dbs.queryMode = g_args.query_mode;
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
......@@ -6410,7 +6486,7 @@ static void querySqlFile(TAOS* taos, char* sqlFile)
double t = getCurrentTimeUs();
while ((read_len = tgetline(&line, &line_len, fp)) != -1) {
while((read_len = tgetline(&line, &line_len, fp)) != -1) {
if (read_len >= MAX_SQL_SIZE) continue;
line[--read_len] = '\0';
......@@ -6473,52 +6549,50 @@ static void testMetaFile() {
}
static void queryResult() {
// select
if (false == g_Dbs.insert_only) {
// query data
pthread_t read_id;
threadInfo *rInfo = malloc(sizeof(threadInfo));
rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
rInfo->start_table_from = 0;
//rInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
if (g_args.use_metric) {
rInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
rInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(rInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix, MAX_TB_NAME_SIZE);
} else {
rInfo->ntables = g_args.num_of_tables;
rInfo->end_table_to = g_args.num_of_tables -1;
tstrncpy(rInfo->tb_prefix, g_args.tb_prefix, MAX_TB_NAME_SIZE);
}
rInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
if (rInfo->taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
free(rInfo);
exit(-1);
}
// query data
tstrncpy(rInfo->fp, g_Dbs.resultFile, MAX_FILE_NAME_LEN);
pthread_t read_id;
threadInfo *rInfo = malloc(sizeof(threadInfo));
assert(rInfo);
rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
rInfo->start_table_from = 0;
if (!g_Dbs.use_metric) {
pthread_create(&read_id, NULL, readTable, rInfo);
} else {
pthread_create(&read_id, NULL, readMetric, rInfo);
}
pthread_join(read_id, NULL);
taos_close(rInfo->taos);
free(rInfo);
}
//rInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
if (g_args.use_metric) {
rInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
rInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(rInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix, MAX_TB_NAME_SIZE);
} else {
rInfo->ntables = g_args.num_of_tables;
rInfo->end_table_to = g_args.num_of_tables -1;
tstrncpy(rInfo->tb_prefix, g_args.tb_prefix, MAX_TB_NAME_SIZE);
}
rInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
if (rInfo->taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
free(rInfo);
exit(-1);
}
tstrncpy(rInfo->fp, g_Dbs.resultFile, MAX_FILE_NAME_LEN);
if (!g_Dbs.use_metric) {
pthread_create(&read_id, NULL, readTable, rInfo);
} else {
pthread_create(&read_id, NULL, readMetric, rInfo);
}
pthread_join(read_id, NULL);
taos_close(rInfo->taos);
free(rInfo);
}
static void testCmdLine() {
......@@ -6536,9 +6610,7 @@ static void testCmdLine() {
g_args.test_mode = INSERT_TEST;
insertTestProcess();
if (g_Dbs.insert_only)
return;
else
if (false == g_Dbs.insert_only)
queryResult();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册