提交 2b527c33 编写于 作者: P Ping Xiao

Merge branch 'master' into xiaoping/add_test_case

...@@ -937,6 +937,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC ...@@ -937,6 +937,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
return ret; return ret;
} }
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, true); code = tscGetTableMetaEx(pSql, pTableMetaInfo, true);
if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) { if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
return code; return code;
...@@ -945,6 +949,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC ...@@ -945,6 +949,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
} else { } else {
sql = sToken.z; sql = sToken.z;
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, false); code = tscGetTableMetaEx(pSql, pTableMetaInfo, false);
if (pCmd->curSql == NULL) { if (pCmd->curSql == NULL) {
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS); assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
...@@ -953,10 +961,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC ...@@ -953,10 +961,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
*sqlstr = sql; *sqlstr = sql;
if (*sqlstr == NULL) {
code = TSDB_CODE_TSC_INVALID_SQL;
}
return code; return code;
} }
......
...@@ -67,6 +67,12 @@ enum TEST_MODE { ...@@ -67,6 +67,12 @@ enum TEST_MODE {
INVAID_TEST INVAID_TEST
}; };
enum QUERY_MODE {
SYNC_QUERY_MODE, // 0
ASYNC_QUERY_MODE, // 1
INVALID_MODE
};
#define MAX_SQL_SIZE 65536 #define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2) #define BUFFER_SIZE (65536*2)
#define MAX_USERNAME_SIZE 64 #define MAX_USERNAME_SIZE 64
...@@ -198,7 +204,7 @@ typedef struct SArguments_S { ...@@ -198,7 +204,7 @@ typedef struct SArguments_S {
bool verbose_print; bool verbose_print;
bool performance_print; bool performance_print;
char * output_file; char * output_file;
int mode; int query_mode;
char * datatype[MAX_NUM_DATATYPE + 1]; char * datatype[MAX_NUM_DATATYPE + 1];
int len_of_binary; int len_of_binary;
int num_of_CPR; int num_of_CPR;
...@@ -351,7 +357,7 @@ typedef struct SpecifiedQueryInfo_S { ...@@ -351,7 +357,7 @@ typedef struct SpecifiedQueryInfo_S {
int rate; // 0: unlimit > 0 loop/s int rate; // 0: unlimit > 0 loop/s
int concurrent; int concurrent;
int sqlCount; int sqlCount;
int subscribeMode; // 0: sync, 1: async int mode; // 0: sync, 1: async
int subscribeInterval; // ms int subscribeInterval; // ms
int queryTimes; int queryTimes;
int subscribeRestart; int subscribeRestart;
...@@ -365,7 +371,7 @@ typedef struct SuperQueryInfo_S { ...@@ -365,7 +371,7 @@ typedef struct SuperQueryInfo_S {
char sTblName[MAX_TB_NAME_SIZE+1]; char sTblName[MAX_TB_NAME_SIZE+1];
int rate; // 0: unlimit > 0 loop/s int rate; // 0: unlimit > 0 loop/s
int threadCnt; int threadCnt;
int subscribeMode; // 0: sync, 1: async int mode; // 0: sync, 1: async
int subscribeInterval; // ms int subscribeInterval; // ms
int subscribeRestart; int subscribeRestart;
int subscribeKeepProgress; int subscribeKeepProgress;
...@@ -429,6 +435,8 @@ typedef struct SThreadInfo_S { ...@@ -429,6 +435,8 @@ typedef struct SThreadInfo_S {
int64_t maxDelay; int64_t maxDelay;
int64_t minDelay; int64_t minDelay;
// query
int querySeq; // sequence number of sql command
} threadInfo; } threadInfo;
#ifdef WINDOWS #ifdef WINDOWS
...@@ -714,7 +722,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -714,7 +722,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} else if (strcmp(argv[i], "-s") == 0) { } else if (strcmp(argv[i], "-s") == 0) {
arguments->sqlFile = argv[++i]; arguments->sqlFile = argv[++i];
} else if (strcmp(argv[i], "-q") == 0) { } else if (strcmp(argv[i], "-q") == 0) {
arguments->mode = atoi(argv[++i]); arguments->query_mode = atoi(argv[++i]);
} else if (strcmp(argv[i], "-T") == 0) { } else if (strcmp(argv[i], "-T") == 0) {
arguments->num_of_threads = atoi(argv[++i]); arguments->num_of_threads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") == 0) { } else if (strcmp(argv[i], "-i") == 0) {
...@@ -758,7 +766,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -758,7 +766,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
char *dupstr = strdup(argv[i]); char *dupstr = strdup(argv[i]);
char *running = dupstr; char *running = dupstr;
char *token = strsep(&running, ","); char *token = strsep(&running, ",");
while (token != NULL) { while(token != NULL) {
if (strcasecmp(token, "INT") if (strcasecmp(token, "INT")
&& strcasecmp(token, "FLOAT") && strcasecmp(token, "FLOAT")
&& strcasecmp(token, "TINYINT") && strcasecmp(token, "TINYINT")
...@@ -964,7 +972,7 @@ static void getResult(TAOS_RES *res, char* resultFileName) { ...@@ -964,7 +972,7 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
char temp[16000]; char temp[16000];
// fetch the records row by row // fetch the records row by row
while ((row = taos_fetch_row(res))) { while((row = taos_fetch_row(res))) {
if (totalLen >= 100*1024*1024 - 32000) { if (totalLen >= 100*1024*1024 - 32000) {
if (fp) fprintf(fp, "%s", databuf); if (fp) fprintf(fp, "%s", databuf);
totalLen = 0; totalLen = 0;
...@@ -986,7 +994,8 @@ static void getResult(TAOS_RES *res, char* resultFileName) { ...@@ -986,7 +994,8 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName) { static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName) {
TAOS_RES *res = taos_query(taos, command); TAOS_RES *res = taos_query(taos, command);
if (res == NULL || taos_errno(res) != 0) { if (res == NULL || taos_errno(res) != 0) {
printf("failed to sql:%s, reason:%s\n", command, taos_errstr(res)); errorPrint("%s() LN%d, failed to execute sql:%s, reason:%s\n",
__func__, __LINE__, command, taos_errstr(res));
taos_free_result(res); taos_free_result(res);
return; return;
} }
...@@ -1163,7 +1172,8 @@ static int printfInsertMeta() { ...@@ -1163,7 +1172,8 @@ static int printfInsertMeta() {
if (g_Dbs.db[i].dbCfg.precision[0] != 0) { if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
printf(" precision: \033[33m%s\033[0m\n", g_Dbs.db[i].dbCfg.precision); printf(" precision: \033[33m%s\033[0m\n",
g_Dbs.db[i].dbCfg.precision);
} else { } else {
printf("\033[1m\033[40;31m precision error: %s\033[0m\n", printf("\033[1m\033[40;31m precision error: %s\033[0m\n",
g_Dbs.db[i].dbCfg.precision); g_Dbs.db[i].dbCfg.precision);
...@@ -1171,11 +1181,13 @@ static int printfInsertMeta() { ...@@ -1171,11 +1181,13 @@ static int printfInsertMeta() {
} }
} }
printf(" super table count: \033[33m%d\033[0m\n", g_Dbs.db[i].superTblCount); printf(" super table count: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
printf(" super table[\033[33m%d\033[0m]:\n", j); printf(" super table[\033[33m%d\033[0m]:\n", j);
printf(" stbName: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sTblName); printf(" stbName: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].sTblName);
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "no"); printf(" autoCreateTable: \033[33m%s\033[0m\n", "no");
...@@ -1241,7 +1253,7 @@ static int printfInsertMeta() { ...@@ -1241,7 +1253,7 @@ static int printfInsertMeta() {
g_Dbs.db[i].superTbls[j].sampleFile); g_Dbs.db[i].superTbls[j].sampleFile);
printf(" tagsFile: \033[33m%s\033[0m\n", printf(" tagsFile: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].tagsFile); g_Dbs.db[i].superTbls[j].tagsFile);
printf(" columnCount: \033[33m%d\033[0m\n ", printf(" columnCount: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].columnCount); g_Dbs.db[i].superTbls[j].columnCount);
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) { for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
...@@ -1459,41 +1471,61 @@ static void printfQueryMeta() { ...@@ -1459,41 +1471,61 @@ static void printfQueryMeta() {
printf("\n"); printf("\n");
printf("specified table query info: \n"); printf("specified table query info: \n");
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.rate); printf("query interval: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.rate);
printf("top query times:\033[33m%d\033[0m\n", g_args.query_times); printf("top query times:\033[33m%d\033[0m\n", g_args.query_times);
printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.concurrent); printf("concurrent: \033[33m%d\033[0m\n",
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.sqlCount); g_queryInfo.specifiedQueryInfo.concurrent);
printf("sqlCount: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.sqlCount);
printf("specified tbl query times:\n"); printf("specified tbl query times:\n");
printf(" \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.queryTimes); printf(" \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.queryTimes);
if (SUBSCRIBE_TEST == g_args.test_mode) { if (SUBSCRIBE_TEST == g_args.test_mode) {
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeMode); printf("mod: \033[33m%d\033[0m\n",
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.mode);
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeRestart); printf("interval: \033[33m%d\033[0m\n",
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); g_queryInfo.specifiedQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeRestart);
printf("keepProgress: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
} }
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.specifiedQueryInfo.sql[i]); printf(" sql[%d]: \033[33m%s\033[0m\n",
i, g_queryInfo.specifiedQueryInfo.sql[i]);
} }
printf("\n"); printf("\n");
printf("super table query info: \n"); printf("super table query info:\n");
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.rate); printf("query interval: \033[33m%d\033[0m\n",
printf("threadCnt: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.threadCnt); g_queryInfo.superQueryInfo.rate);
printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.childTblCount); printf("threadCnt: \033[33m%d\033[0m\n",
printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.superQueryInfo.sTblName); g_queryInfo.superQueryInfo.threadCnt);
printf("stb query times:\033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.queryTimes); printf("childTblCount: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.childTblCount);
printf("stable name: \033[33m%s\033[0m\n",
g_queryInfo.superQueryInfo.sTblName);
printf("stb query times:\033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.queryTimes);
if (SUBSCRIBE_TEST == g_args.test_mode) { if (SUBSCRIBE_TEST == g_args.test_mode) {
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode); printf("mod: \033[33m%d\033[0m\n",
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval); g_queryInfo.superQueryInfo.mode);
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart); printf("interval: \033[33m%d\033[0m\n",
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeKeepProgress); g_queryInfo.superQueryInfo.subscribeInterval);
} printf("restart: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.subscribeRestart);
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount); printf("keepProgress: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress);
}
printf("sqlCount: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.sqlCount);
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.superQueryInfo.sql[i]); printf(" sql[%d]: \033[33m%s\033[0m\n",
i, g_queryInfo.superQueryInfo.sql[i]);
} }
printf("\n"); printf("\n");
...@@ -1637,7 +1669,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { ...@@ -1637,7 +1669,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
TAOS_FIELD *fields = taos_fetch_fields(res); TAOS_FIELD *fields = taos_fetch_fields(res);
while ((row = taos_fetch_row(res)) != NULL) { while((row = taos_fetch_row(res)) != NULL) {
// sys database name : 'log' // sys database name : 'log'
if (strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log", if (strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log",
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0) { fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0) {
...@@ -1670,7 +1702,8 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { ...@@ -1670,7 +1702,8 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
dbInfos[count]->wallevel = *((int8_t *)row[TSDB_SHOW_DB_WALLEVEL_INDEX]); dbInfos[count]->wallevel = *((int8_t *)row[TSDB_SHOW_DB_WALLEVEL_INDEX]);
dbInfos[count]->fsync = *((int32_t *)row[TSDB_SHOW_DB_FSYNC_INDEX]); dbInfos[count]->fsync = *((int32_t *)row[TSDB_SHOW_DB_FSYNC_INDEX]);
dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX])); dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX])); dbInfos[count]->cachelast =
(int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
tstrncpy(dbInfos[count]->precision, tstrncpy(dbInfos[count]->precision,
(char *)row[TSDB_SHOW_DB_PRECISION_INDEX], (char *)row[TSDB_SHOW_DB_PRECISION_INDEX],
...@@ -1681,7 +1714,8 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { ...@@ -1681,7 +1714,8 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
count++; count++;
if (count > MAX_DATABASE_COUNT) { if (count > MAX_DATABASE_COUNT) {
errorPrint( "The database count overflow than %d\n", MAX_DATABASE_COUNT); errorPrint("%s() LN%d, The database count overflow than %d\n",
__func__, __LINE__, MAX_DATABASE_COUNT);
break; break;
} }
} }
...@@ -1691,6 +1725,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { ...@@ -1691,6 +1725,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
static void printfDbInfoForQueryToFile( static void printfDbInfoForQueryToFile(
char* filename, SDbInfo* dbInfos, int index) { char* filename, SDbInfo* dbInfos, int index) {
if (filename[0] == 0) if (filename[0] == 0)
return; return;
...@@ -1909,7 +1944,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr) ...@@ -1909,7 +1944,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
if (bytes == 0) if (bytes == 0)
break; break;
sent+=bytes; sent+=bytes;
} while (sent < req_str_len); } while(sent < req_str_len);
memset(response_buf, 0, RESP_BUF_LEN); memset(response_buf, 0, RESP_BUF_LEN);
resp_len = sizeof(response_buf) - 1; resp_len = sizeof(response_buf) - 1;
...@@ -1927,7 +1962,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr) ...@@ -1927,7 +1962,7 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
if (bytes == 0) if (bytes == 0)
break; break;
received += bytes; received += bytes;
} while (received < resp_len); } while(received < resp_len);
if (received == resp_len) { if (received == resp_len) {
free(request_buf); free(request_buf);
...@@ -1951,7 +1986,8 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr) ...@@ -1951,7 +1986,8 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) { static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) {
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
if (NULL == dataBuf) { if (NULL == dataBuf) {
errorPrint("%s() LN%d, calloc failed! size:%d\n", __func__, __LINE__, TSDB_MAX_SQL_LEN+1); errorPrint("%s() LN%d, calloc failed! size:%d\n",
__func__, __LINE__, TSDB_MAX_SQL_LEN+1);
return NULL; return NULL;
} }
...@@ -2155,7 +2191,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, ...@@ -2155,7 +2191,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
} }
char* pTblName = childTblName; char* pTblName = childTblName;
while ((row = taos_fetch_row(res)) != NULL) { while((row = taos_fetch_row(res)) != NULL) {
int32_t* len = taos_fetch_lengths(res); int32_t* len = taos_fetch_lengths(res);
tstrncpy(pTblName, (char *)row[0], len[0]+1); tstrncpy(pTblName, (char *)row[0], len[0]+1);
//printf("==== sub table name: %s\n", pTblName); //printf("==== sub table name: %s\n", pTblName);
...@@ -2218,7 +2254,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, ...@@ -2218,7 +2254,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
int tagIndex = 0; int tagIndex = 0;
int columnIndex = 0; int columnIndex = 0;
TAOS_FIELD *fields = taos_fetch_fields(res); TAOS_FIELD *fields = taos_fetch_fields(res);
while ((row = taos_fetch_row(res)) != NULL) { while((row = taos_fetch_row(res)) != NULL) {
if (0 == count) { if (0 == count) {
count++; count++;
continue; continue;
...@@ -2765,7 +2801,7 @@ static void createChildTables() { ...@@ -2765,7 +2801,7 @@ static void createChildTables() {
// normal table // normal table
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP"); len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
int j = 0; int j = 0;
while (g_args.datatype[j]) { while(g_args.datatype[j]) {
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0) if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.datatype[j], || (strncasecmp(g_args.datatype[j],
"NCHAR", strlen("NCHAR")) == 0)) { "NCHAR", strlen("NCHAR")) == 0)) {
...@@ -2824,7 +2860,7 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { ...@@ -2824,7 +2860,7 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
return -1; return -1;
} }
while ((readLen = tgetline(&line, &n, fp)) != -1) { while((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0; line[--readLen] = 0;
} }
...@@ -2888,7 +2924,7 @@ static int readSampleFromCsvFileToMem( ...@@ -2888,7 +2924,7 @@ static int readSampleFromCsvFileToMem(
assert(superTblInfo->sampleDataBuf); assert(superTblInfo->sampleDataBuf);
memset(superTblInfo->sampleDataBuf, 0, memset(superTblInfo->sampleDataBuf, 0,
MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow); MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow);
while (1) { while(1) {
readLen = tgetline(&line, &n, fp); readLen = tgetline(&line, &n, fp);
if (-1 == readLen) { if (-1 == readLen) {
if(0 != fseek(fp, 0, SEEK_SET)) { if(0 != fseek(fp, 0, SEEK_SET)) {
...@@ -2967,7 +3003,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile( ...@@ -2967,7 +3003,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
if (countObj && countObj->type == cJSON_Number) { if (countObj && countObj->type == cJSON_Number) {
count = countObj->valueint; count = countObj->valueint;
} else if (countObj && countObj->type != cJSON_Number) { } else if (countObj && countObj->type != cJSON_Number) {
errorPrint("%s() LN%d, failed to read json, column count not found\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, column count not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
count = 1; count = 1;
...@@ -2976,8 +3013,10 @@ static bool getColumnAndTagTypeFromInsertJsonFile( ...@@ -2976,8 +3013,10 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
// column info // column info
memset(&columnCase, 0, sizeof(StrColumn)); memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(column, "type"); cJSON *dataType = cJSON_GetObjectItem(column, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { if (!dataType || dataType->type != cJSON_String
errorPrint("%s() LN%d: failed to read json, column type not found\n", __func__, __LINE__); || dataType->valuestring == NULL) {
errorPrint("%s() LN%d: failed to read json, column type not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
//tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE); //tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
...@@ -2987,7 +3026,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile( ...@@ -2987,7 +3026,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
if (dataLen && dataLen->type == cJSON_Number) { if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint; columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) { } else if (dataLen && dataLen->type != cJSON_Number) {
debugPrint("%s() LN%d: failed to read json, column len not found\n", __func__, __LINE__); debugPrint("%s() LN%d: failed to read json, column len not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
columnCase.dataLen = 8; columnCase.dataLen = 8;
...@@ -3007,13 +3047,15 @@ static bool getColumnAndTagTypeFromInsertJsonFile( ...@@ -3007,13 +3047,15 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
// tags // tags
cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags"); cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags");
if (!tags || tags->type != cJSON_Array) { if (!tags || tags->type != cJSON_Array) {
debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, tags not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
int tagSize = cJSON_GetArraySize(tags); int tagSize = cJSON_GetArraySize(tags);
if (tagSize > MAX_TAG_COUNT) { if (tagSize > MAX_TAG_COUNT) {
debugPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n", __func__, __LINE__, MAX_TAG_COUNT); errorPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n",
__func__, __LINE__, MAX_TAG_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3036,8 +3078,10 @@ static bool getColumnAndTagTypeFromInsertJsonFile( ...@@ -3036,8 +3078,10 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
// column info // column info
memset(&columnCase, 0, sizeof(StrColumn)); memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(tag, "type"); cJSON *dataType = cJSON_GetObjectItem(tag, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { if (!dataType || dataType->type != cJSON_String
printf("ERROR: failed to read json, tag type not found\n"); || dataType->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, tag type not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
tstrncpy(columnCase.dataType, dataType->valuestring, MAX_TB_NAME_SIZE); tstrncpy(columnCase.dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
...@@ -3046,14 +3090,16 @@ static bool getColumnAndTagTypeFromInsertJsonFile( ...@@ -3046,14 +3090,16 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
if (dataLen && dataLen->type == cJSON_Number) { if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint; columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) { } else if (dataLen && dataLen->type != cJSON_Number) {
printf("ERROR: failed to read json, column len not found\n"); errorPrint("%s() LN%d, failed to read json, column len not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
columnCase.dataLen = 0; columnCase.dataLen = 0;
} }
for (int n = 0; n < count; ++n) { for (int n = 0; n < count; ++n) {
tstrncpy(superTbls->tags[index].dataType, columnCase.dataType, MAX_TB_NAME_SIZE); tstrncpy(superTbls->tags[index].dataType, columnCase.dataType,
MAX_TB_NAME_SIZE);
superTbls->tags[index].dataLen = columnCase.dataLen; superTbls->tags[index].dataLen = columnCase.dataLen;
index++; index++;
} }
...@@ -3063,9 +3109,6 @@ static bool getColumnAndTagTypeFromInsertJsonFile( ...@@ -3063,9 +3109,6 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
ret = true; ret = true;
PARSE_OVER: PARSE_OVER:
//free(content);
//cJSON_Delete(root);
//fclose(fp);
return ret; return ret;
} }
...@@ -3142,7 +3185,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3142,7 +3185,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!gInsertInterval) { } else if (!gInsertInterval) {
g_args.insert_interval = 0; g_args.insert_interval = 0;
} else { } else {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3163,7 +3207,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3163,7 +3207,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!interlaceRows) { } else if (!interlaceRows) {
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else { } else {
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3173,7 +3218,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3173,7 +3218,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!maxSqlLen) { } else if (!maxSqlLen) {
g_args.max_sql_len = TSDB_PAYLOAD_SIZE; g_args.max_sql_len = TSDB_PAYLOAD_SIZE;
} else { } else {
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3183,7 +3229,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3183,7 +3229,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!numRecPerReq) { } else if (!numRecPerReq) {
g_args.num_of_RPR = 0xffff; g_args.num_of_RPR = 0xffff;
} else { } else {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3509,7 +3556,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3509,7 +3556,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!dataSource) { } else if (!dataSource) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE); tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE);
} else { } else {
errorPrint("%s() LN%d, failed to read json, data_source not found\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, data_source not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3584,7 +3632,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3584,7 +3632,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} }
cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file"); cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file");
if (sampleFile && sampleFile->type == cJSON_String && sampleFile->valuestring != NULL) { if (sampleFile && sampleFile->type == cJSON_String
&& sampleFile->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile, tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile,
sampleFile->valuestring, MAX_FILE_NAME_LEN); sampleFile->valuestring, MAX_FILE_NAME_LEN);
} else if (!sampleFile) { } else if (!sampleFile) {
...@@ -3727,9 +3776,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3727,9 +3776,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
ret = true; ret = true;
PARSE_OVER: PARSE_OVER:
//free(content);
//cJSON_Delete(root);
//fclose(fp);
return ret; return ret;
} }
...@@ -3795,7 +3841,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -3795,7 +3841,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (!gQueryTimes) { } else if (!gQueryTimes) {
g_args.query_times = 1; g_args.query_times = 1;
} else { } else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3833,35 +3880,45 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -3833,35 +3880,45 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.specifiedQueryInfo.rate = 0; g_queryInfo.specifiedQueryInfo.rate = 0;
} }
cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery, "query_times"); cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery,
"query_times");
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) { if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint; g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint;
} else if (!specifiedQueryTimes) { } else if (!specifiedQueryTimes) {
g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times; g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times;
} else { } else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent"); cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent");
if (concurrent && concurrent->type == cJSON_Number) { if (concurrent && concurrent->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint; g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint;
if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) {
errorPrint("%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
__func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.specifiedQueryInfo.concurrent);
goto PARSE_OVER;
}
} else if (!concurrent) { } else if (!concurrent) {
g_queryInfo.specifiedQueryInfo.concurrent = 1; g_queryInfo.specifiedQueryInfo.concurrent = 1;
} }
cJSON* mode = cJSON_GetObjectItem(specifiedQuery, "mode"); cJSON* queryMode = cJSON_GetObjectItem(specifiedQuery, "mode");
if (mode && mode->type == cJSON_String && mode->valuestring != NULL) { if (queryMode && queryMode->type == cJSON_String
if (0 == strcmp("sync", mode->valuestring)) { && queryMode->valuestring != NULL) {
g_queryInfo.specifiedQueryInfo.subscribeMode = 0; if (0 == strcmp("sync", queryMode->valuestring)) {
} else if (0 == strcmp("async", mode->valuestring)) { g_queryInfo.specifiedQueryInfo.mode = SYNC_QUERY_MODE;
g_queryInfo.specifiedQueryInfo.subscribeMode = 1; } else if (0 == strcmp("async", queryMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.mode = ASYNC_QUERY_MODE;
} else { } else {
printf("ERROR: failed to read json, subscribe mod error\n"); errorPrint("%s() LN%d, failed to read json, query mode input error\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.specifiedQueryInfo.subscribeMode = 0; g_queryInfo.specifiedQueryInfo.mode = SYNC_QUERY_MODE;
} }
cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval"); cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval");
...@@ -3908,12 +3965,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -3908,12 +3965,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (!superSqls) { if (!superSqls) {
g_queryInfo.specifiedQueryInfo.sqlCount = 0; g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (superSqls->type != cJSON_Array) { } else if (superSqls->type != cJSON_Array) {
printf("ERROR: failed to read json, super sqls not found\n"); errorPrint("%s() LN%d, failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
int superSqlSize = cJSON_GetArraySize(superSqls); int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) { if (superSqlSize > MAX_QUERY_SQL_COUNT) {
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3965,7 +4024,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -3965,7 +4024,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (!superQueryTimes) { } else if (!superQueryTimes) {
g_queryInfo.superQueryInfo.queryTimes = g_args.query_times; g_queryInfo.superQueryInfo.queryTimes = g_args.query_times;
} else { } else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3984,25 +4044,30 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -3984,25 +4044,30 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
//} //}
cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname"); cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname");
if (stblname && stblname->type == cJSON_String && stblname->valuestring != NULL) { if (stblname && stblname->type == cJSON_String
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring, MAX_TB_NAME_SIZE); && stblname->valuestring != NULL) {
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring,
MAX_TB_NAME_SIZE);
} else { } else {
printf("ERROR: failed to read json, super table name not found\n"); errorPrint("%s() LN%d, failed to read json, super table name input error\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* submode = cJSON_GetObjectItem(superQuery, "mode"); cJSON* submode = cJSON_GetObjectItem(superQuery, "mode");
if (submode && submode->type == cJSON_String && submode->valuestring != NULL) { if (submode && submode->type == cJSON_String
&& submode->valuestring != NULL) {
if (0 == strcmp("sync", submode->valuestring)) { if (0 == strcmp("sync", submode->valuestring)) {
g_queryInfo.superQueryInfo.subscribeMode = 0; g_queryInfo.superQueryInfo.mode = SYNC_QUERY_MODE;
} else if (0 == strcmp("async", submode->valuestring)) { } else if (0 == strcmp("async", submode->valuestring)) {
g_queryInfo.superQueryInfo.subscribeMode = 1; g_queryInfo.superQueryInfo.mode = ASYNC_QUERY_MODE;
} else { } else {
printf("ERROR: failed to read json, subscribe mod error\n"); errorPrint("%s() LN%d, failed to read json, query mode input error\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.superQueryInfo.subscribeMode = 0; g_queryInfo.superQueryInfo.mode = SYNC_QUERY_MODE;
} }
cJSON* subinterval = cJSON_GetObjectItem(superQuery, "interval"); cJSON* subinterval = cJSON_GetObjectItem(superQuery, "interval");
...@@ -4015,7 +4080,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4015,7 +4080,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} }
cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart"); cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart");
if (subrestart && subrestart->type == cJSON_String && subrestart->valuestring != NULL) { if (subrestart && subrestart->type == cJSON_String
&& subrestart->valuestring != NULL) {
if (0 == strcmp("yes", subrestart->valuestring)) { if (0 == strcmp("yes", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 1; g_queryInfo.superQueryInfo.subscribeRestart = 1;
} else if (0 == strcmp("no", subrestart->valuestring)) { } else if (0 == strcmp("no", subrestart->valuestring)) {
...@@ -4049,12 +4115,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4049,12 +4115,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (!subsqls) { if (!subsqls) {
g_queryInfo.superQueryInfo.sqlCount = 0; g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (subsqls->type != cJSON_Array) { } else if (subsqls->type != cJSON_Array) {
printf("ERROR: failed to read json, super sqls not found\n"); errorPrint("%s() LN%d: failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
int superSqlSize = cJSON_GetArraySize(subsqls); int superSqlSize = cJSON_GetArraySize(subsqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) { if (superSqlSize > MAX_QUERY_SQL_COUNT) {
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -4064,19 +4132,25 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4064,19 +4132,25 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (sql == NULL) continue; if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) { if (!sqlStr || sqlStr->type != cJSON_String
printf("ERROR: failed to read json, sql not found\n"); || sqlStr->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, sql not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
MAX_QUERY_SQL_LENGTH);
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String && result->valuestring != NULL){ if (result != NULL && result->type == cJSON_String
tstrncpy(g_queryInfo.superQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); && result->valuestring != NULL){
tstrncpy(g_queryInfo.superQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) { } else if (NULL == result) {
memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
} else { } else {
printf("ERROR: failed to read json, sub query result file not found\n"); errorPrint("%s() LN%d, failed to read json, sub query result file not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
} }
...@@ -4086,9 +4160,6 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4086,9 +4160,6 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
ret = true; ret = true;
PARSE_OVER: PARSE_OVER:
//free(content);
//cJSON_Delete(root);
//fclose(fp);
return ret; return ret;
} }
...@@ -4450,22 +4521,23 @@ static int generateDataTail(char *tableName, int32_t tableSeq, ...@@ -4450,22 +4521,23 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
} else if (0 == strncasecmp(superTblInfo->dataSource, } else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) { "rand", strlen("rand"))) {
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
int randTail;
if (0 != superTblInfo->disorderRatio if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) { && rand_num < superTblInfo->disorderRatio) {
int64_t d = startTime randTail = (superTblInfo->timeStampStep * k
+ superTblInfo->timeStampStep * k + (taosRandom() % superTblInfo->disorderRange + 1)) * (-1);
- taosRandom() % superTblInfo->disorderRange; debugPrint("rand data generated, back %d\n", randTail);
retLen = generateRowData(
data,
d,
superTblInfo);
} else { } else {
randTail = superTblInfo->timeStampStep * k;
}
uint64_t d = startTime
+ randTail;
retLen = generateRowData( retLen = generateRowData(
data, data,
startTime + superTblInfo->timeStampStep * k, d,
superTblInfo); superTblInfo);
} }
}
if (retLen > remainderBufLen) { if (retLen > remainderBufLen) {
break; break;
...@@ -4480,20 +4552,21 @@ static int generateDataTail(char *tableName, int32_t tableSeq, ...@@ -4480,20 +4552,21 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
int lenOfBinary = g_args.len_of_binary; int lenOfBinary = g_args.len_of_binary;
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
int randTail;
if ((g_args.disorderRatio != 0) if ((g_args.disorderRatio != 0)
&& (rand_num < g_args.disorderRatio)) { && (rand_num < g_args.disorderRatio)) {
randTail = (DEFAULT_TIMESTAMP_STEP * k
int64_t d = startTime + DEFAULT_TIMESTAMP_STEP * k + (taosRandom() % g_args.disorderRange + 1)) * (-1);
- taosRandom() % g_args.disorderRange; debugPrint("rand data generated, back %d\n", randTail);
retLen = generateData(data, data_type,
ncols_per_record, d, lenOfBinary);
} else { } else {
randTail = DEFAULT_TIMESTAMP_STEP * k;
}
retLen = generateData(data, data_type, retLen = generateData(data, data_type,
ncols_per_record, ncols_per_record,
startTime + DEFAULT_TIMESTAMP_STEP * k, startTime + randTail,
lenOfBinary); lenOfBinary);
}
if (len > remainderBufLen) if (len > remainderBufLen)
break; break;
...@@ -5035,7 +5108,7 @@ static void callBack(void *param, TAOS_RES *res, int code) { ...@@ -5035,7 +5108,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
if (0 != winfo->superTblInfo->disorderRatio if (0 != winfo->superTblInfo->disorderRatio
&& rand_num < winfo->superTblInfo->disorderRatio) { && rand_num < winfo->superTblInfo->disorderRatio) {
int64_t d = winfo->lastTs - taosRandom() % winfo->superTblInfo->disorderRange; int64_t d = winfo->lastTs - (taosRandom() % winfo->superTblInfo->disorderRange + 1);
generateRowData(data, d, winfo->superTblInfo); generateRowData(data, d, winfo->superTblInfo);
} else { } else {
generateRowData(data, winfo->lastTs += 1000, winfo->superTblInfo); generateRowData(data, winfo->lastTs += 1000, winfo->superTblInfo);
...@@ -5415,7 +5488,7 @@ static void *readTable(void *sarg) { ...@@ -5415,7 +5488,7 @@ static void *readTable(void *sarg) {
return NULL; return NULL;
} }
while (taos_fetch_row(pSql) != NULL) { while(taos_fetch_row(pSql) != NULL) {
count++; count++;
} }
...@@ -5491,7 +5564,7 @@ static void *readMetric(void *sarg) { ...@@ -5491,7 +5564,7 @@ static void *readMetric(void *sarg) {
return NULL; return NULL;
} }
int count = 0; int count = 0;
while (taos_fetch_row(pSql) != NULL) { while(taos_fetch_row(pSql) != NULL) {
count++; count++;
} }
t = getCurrentTimeUs() - t; t = getCurrentTimeUs() - t;
...@@ -5602,7 +5675,7 @@ static int insertTestProcess() { ...@@ -5602,7 +5675,7 @@ static int insertTestProcess() {
return 0; return 0;
} }
static void *superQueryProcess(void *sarg) { static void *specifiedQueryProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
if (winfo->taos == NULL) { if (winfo->taos == NULL) {
...@@ -5643,22 +5716,25 @@ static void *superQueryProcess(void *sarg) { ...@@ -5643,22 +5716,25 @@ static void *superQueryProcess(void *sarg) {
} }
st = taosGetTimestampUs(); st = taosGetTimestampUs();
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
int64_t t1 = taosGetTimestampUs(); int64_t t1 = taosGetTimestampUs();
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { if (g_queryInfo.specifiedQueryInfo.result[winfo->querySeq][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID); g_queryInfo.specifiedQueryInfo.result[winfo->querySeq],
winfo->threadID);
} }
selectAndGetResult(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], tmpFile); selectAndGetResult(winfo->taos,
g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq], tmpFile);
int64_t t2 = taosGetTimestampUs(); int64_t t2 = taosGetTimestampUs();
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0); taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
} else { } else {
int64_t t1 = taosGetTimestampUs(); int64_t t1 = taosGetTimestampUs();
int retCode = postProceSql(g_queryInfo.host, int retCode = postProceSql(g_queryInfo.host,
g_queryInfo.port, g_queryInfo.specifiedQueryInfo.sql[i]); g_queryInfo.port,
g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq]);
int64_t t2 = taosGetTimestampUs(); int64_t t2 = taosGetTimestampUs();
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0); taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
...@@ -5668,7 +5744,7 @@ static void *superQueryProcess(void *sarg) { ...@@ -5668,7 +5744,7 @@ static void *superQueryProcess(void *sarg) {
return NULL; return NULL;
} }
} }
}
et = taosGetTimestampUs(); et = taosGetTimestampUs();
printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n",
taosGetSelfPthreadId(), (double)(et - st)/1000.0); taosGetSelfPthreadId(), (double)(et - st)/1000.0);
...@@ -5698,7 +5774,7 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { ...@@ -5698,7 +5774,7 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
//printf("3: %s\n", outSql); //printf("3: %s\n", outSql);
} }
static void *subQueryProcess(void *sarg) { static void *superQueryProcess(void *sarg) {
char sqlstr[1024]; char sqlstr[1024];
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
...@@ -5791,24 +5867,24 @@ static int queryTestProcess() { ...@@ -5791,24 +5867,24 @@ static int queryTestProcess() {
pthread_t *pids = NULL; pthread_t *pids = NULL;
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query from specify table //==== create sub threads for query from specify table
if (g_queryInfo.specifiedQueryInfo.sqlCount > 0 int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
&& g_queryInfo.specifiedQueryInfo.concurrent > 0) { int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); if ((nSqlCount > 0) && (nConcurrent > 0)) {
if (NULL == pids) {
taos_close(taos); pids = malloc(nConcurrent * nSqlCount * sizeof(pthread_t));
ERROR_EXIT("memory allocation failed\n"); infos = malloc(nConcurrent * nSqlCount * sizeof(threadInfo));
}
infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) {
if (NULL == infos) {
taos_close(taos); taos_close(taos);
free(pids);
ERROR_EXIT("memory allocation failed for create threads\n"); ERROR_EXIT("memory allocation failed for create threads\n");
} }
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { for (int i = 0; i < nConcurrent; i++) {
threadInfo *t_info = infos + i; for (int j = 0; j < nSqlCount; j++) {
t_info->threadID = i; threadInfo *t_info = infos + i * nSqlCount + j;
t_info->threadID = i * nSqlCount + j;
t_info->querySeq = j;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
...@@ -5827,7 +5903,9 @@ static int queryTestProcess() { ...@@ -5827,7 +5903,9 @@ static int queryTestProcess() {
t_info->taos = NULL;// TODO: workaround to use separate taos connection; t_info->taos = NULL;// TODO: workaround to use separate taos connection;
pthread_create(pids + i, NULL, superQueryProcess, t_info); pthread_create(pids + i * nSqlCount + j, NULL, specifiedQueryProcess,
t_info);
}
} }
} else { } else {
g_queryInfo.specifiedQueryInfo.concurrent = 0; g_queryInfo.specifiedQueryInfo.concurrent = 0;
...@@ -5841,18 +5919,12 @@ static int queryTestProcess() { ...@@ -5841,18 +5919,12 @@ static int queryTestProcess() {
if ((g_queryInfo.superQueryInfo.sqlCount > 0) if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) { && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t)); pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t));
if (NULL == pidsOfSub) {
free(infos);
free(pids);
ERROR_EXIT("memory allocation failed for create threads\n");
}
infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo)); infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo));
if (NULL == infosOfSub) {
free(pidsOfSub); if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
free(infos); free(infos);
free(pids); free(pids);
ERROR_EXIT("memory allocation failed for create threads\n"); ERROR_EXIT("memory allocation failed for create threads\n");
} }
...@@ -5880,7 +5952,7 @@ static int queryTestProcess() { ...@@ -5880,7 +5952,7 @@ static int queryTestProcess() {
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1; t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1; startFrom = t_info->end_table_to + 1;
t_info->taos = NULL; // TODO: workaround to use separate taos connection; t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info); pthread_create(pidsOfSub + i, NULL, superQueryProcess, t_info);
} }
g_queryInfo.superQueryInfo.threadCnt = threads; g_queryInfo.superQueryInfo.threadCnt = threads;
...@@ -5888,8 +5960,12 @@ static int queryTestProcess() { ...@@ -5888,8 +5960,12 @@ static int queryTestProcess() {
g_queryInfo.superQueryInfo.threadCnt = 0; g_queryInfo.superQueryInfo.threadCnt = 0;
} }
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { if ((nSqlCount > 0) && (nConcurrent > 0)) {
pthread_join(pids[i], NULL); for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) {
pthread_join(pids[i * nSqlCount + j], NULL);
}
}
} }
tmfree((char*)pids); tmfree((char*)pids);
...@@ -5920,7 +5996,7 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c ...@@ -5920,7 +5996,7 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) { static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) {
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (g_queryInfo.specifiedQueryInfo.subscribeMode) { if (g_queryInfo.specifiedQueryInfo.mode) {
tsub = taos_subscribe(taos, tsub = taos_subscribe(taos,
g_queryInfo.specifiedQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.subscribeRestart,
topic, sql, subscribe_callback, (void*)resultFileName, topic, sql, subscribe_callback, (void*)resultFileName,
...@@ -5996,13 +6072,13 @@ static void *subSubscribeProcess(void *sarg) { ...@@ -5996,13 +6072,13 @@ static void *subSubscribeProcess(void *sarg) {
} }
//et = taosGetTimestampMs(); //et = taosGetTimestampMs();
//printf("========thread[%"PRId64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0); //printf("========thread[%"PRId64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0);
} while (0); } while(0);
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while (1) { while(1) {
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
if (1 == g_queryInfo.superQueryInfo.subscribeMode) { if (1 == g_queryInfo.superQueryInfo.mode) {
continue; continue;
} }
...@@ -6073,7 +6149,8 @@ static void *superSubscribeProcess(void *sarg) { ...@@ -6073,7 +6149,8 @@ static void *superSubscribeProcess(void *sarg) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID); g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
} }
tsub[i] = subscribeImpl(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile); tsub[i] = subscribeImpl(winfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile);
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) { if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) {
taos_close(winfo->taos); taos_close(winfo->taos);
return NULL; return NULL;
...@@ -6081,13 +6158,13 @@ static void *superSubscribeProcess(void *sarg) { ...@@ -6081,13 +6158,13 @@ static void *superSubscribeProcess(void *sarg) {
} }
//et = taosGetTimestampMs(); //et = taosGetTimestampMs();
//printf("========thread[%"PRId64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0); //printf("========thread[%"PRId64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0);
} while (0); } while(0);
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while (1) { while(1) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (1 == g_queryInfo.specifiedQueryInfo.subscribeMode) { if (SYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) {
continue; continue;
} }
...@@ -6105,7 +6182,8 @@ static void *superSubscribeProcess(void *sarg) { ...@@ -6105,7 +6182,8 @@ static void *superSubscribeProcess(void *sarg) {
taos_free_result(res); taos_free_result(res);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
taos_unsubscribe(tsub[i], g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); taos_unsubscribe(tsub[i],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
} }
taos_close(winfo->taos); taos_close(winfo->taos);
...@@ -6308,7 +6386,7 @@ static void setParaFromArg(){ ...@@ -6308,7 +6386,7 @@ static void setParaFromArg(){
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables; g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
g_Dbs.threadCount = g_args.num_of_threads; g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads; g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.queryMode = g_args.mode; g_Dbs.queryMode = g_args.query_mode;
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL; g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS; g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
...@@ -6410,7 +6488,7 @@ static void querySqlFile(TAOS* taos, char* sqlFile) ...@@ -6410,7 +6488,7 @@ static void querySqlFile(TAOS* taos, char* sqlFile)
double t = getCurrentTimeUs(); double t = getCurrentTimeUs();
while ((read_len = tgetline(&line, &line_len, fp)) != -1) { while((read_len = tgetline(&line, &line_len, fp)) != -1) {
if (read_len >= MAX_SQL_SIZE) continue; if (read_len >= MAX_SQL_SIZE) continue;
line[--read_len] = '\0'; line[--read_len] = '\0';
...@@ -6473,12 +6551,11 @@ static void testMetaFile() { ...@@ -6473,12 +6551,11 @@ static void testMetaFile() {
} }
static void queryResult() { static void queryResult() {
// select
if (false == g_Dbs.insert_only) {
// query data // query data
pthread_t read_id; pthread_t read_id;
threadInfo *rInfo = malloc(sizeof(threadInfo)); threadInfo *rInfo = malloc(sizeof(threadInfo));
assert(rInfo);
rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000 rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
rInfo->start_table_from = 0; rInfo->start_table_from = 0;
...@@ -6518,7 +6595,6 @@ static void queryResult() { ...@@ -6518,7 +6595,6 @@ static void queryResult() {
pthread_join(read_id, NULL); pthread_join(read_id, NULL);
taos_close(rInfo->taos); taos_close(rInfo->taos);
free(rInfo); free(rInfo);
}
} }
static void testCmdLine() { static void testCmdLine() {
...@@ -6536,9 +6612,7 @@ static void testCmdLine() { ...@@ -6536,9 +6612,7 @@ static void testCmdLine() {
g_args.test_mode = INSERT_TEST; g_args.test_mode = INSERT_TEST;
insertTestProcess(); insertTestProcess();
if (g_Dbs.insert_only) if (false == g_Dbs.insert_only)
return;
else
queryResult(); queryResult();
} }
......
...@@ -124,7 +124,7 @@ bool greaterEqualOperator(SColumnFilterElem *pFilter, const char *minval, const ...@@ -124,7 +124,7 @@ bool greaterEqualOperator(SColumnFilterElem *pFilter, const char *minval, const
bool equalOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) { bool equalOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) {
SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo; SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo;
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t minv = -1, maxv = -1; int64_t minv = -1, maxv = -1;
GET_TYPED_DATA(minv, int64_t, type, minval); GET_TYPED_DATA(minv, int64_t, type, minval);
GET_TYPED_DATA(maxv, int64_t, type, maxval); GET_TYPED_DATA(maxv, int64_t, type, maxval);
...@@ -202,7 +202,7 @@ bool likeOperator(SColumnFilterElem *pFilter, const char *minval, const char *ma ...@@ -202,7 +202,7 @@ bool likeOperator(SColumnFilterElem *pFilter, const char *minval, const char *ma
bool notEqualOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) { bool notEqualOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) {
SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo; SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo;
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t minv = -1, maxv = -1; int64_t minv = -1, maxv = -1;
GET_TYPED_DATA(minv, int64_t, type, minval); GET_TYPED_DATA(minv, int64_t, type, minval);
GET_TYPED_DATA(maxv, int64_t, type, maxval); GET_TYPED_DATA(maxv, int64_t, type, maxval);
......
...@@ -2861,12 +2861,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta ...@@ -2861,12 +2861,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
} }
SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
pHandle->statis[i].max = pBlockInfo->compBlock->keyLast;
}
} }
int64_t elapsed = taosGetTimestampUs() - stime; int64_t elapsed = taosGetTimestampUs() - stime;
......
...@@ -28,7 +28,8 @@ ...@@ -28,7 +28,8 @@
int points = 5; int points = 5;
int numOfTables = 3; int numOfTables = 3;
int tablesProcessed = 0; int tablesInsertProcessed = 0;
int tablesSelectProcessed = 0;
int64_t st, et; int64_t st, et;
typedef struct { typedef struct {
...@@ -134,6 +135,9 @@ int main(int argc, char *argv[]) ...@@ -134,6 +135,9 @@ int main(int argc, char *argv[])
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
tablesInsertProcessed = 0;
tablesSelectProcessed = 0;
for (i = 0; i<numOfTables; ++i) { for (i = 0; i<numOfTables; ++i) {
// insert records in asynchronous API // insert records in asynchronous API
sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i); sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i);
...@@ -143,10 +147,20 @@ int main(int argc, char *argv[]) ...@@ -143,10 +147,20 @@ int main(int argc, char *argv[])
printf("once insert finished, presse any key to query\n"); printf("once insert finished, presse any key to query\n");
getchar(); getchar();
while(1) {
if (tablesInsertProcessed < numOfTables) {
printf("wait for process finished\n");
sleep(1);
continue;
}
break;
}
printf("start to query...\n"); printf("start to query...\n");
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
tablesProcessed = 0;
for (i = 0; i < numOfTables; ++i) { for (i = 0; i < numOfTables; ++i) {
// select records in asynchronous API // select records in asynchronous API
...@@ -157,14 +171,8 @@ int main(int argc, char *argv[]) ...@@ -157,14 +171,8 @@ int main(int argc, char *argv[])
printf("\nonce finished, press any key to exit\n"); printf("\nonce finished, press any key to exit\n");
getchar(); getchar();
for (i = 0; i<numOfTables; ++i) {
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
}
getchar();
while(1) { while(1) {
if (tablesProcessed < numOfTables) { if (tablesSelectProcessed < numOfTables) {
printf("wait for process finished\n"); printf("wait for process finished\n");
sleep(1); sleep(1);
continue; continue;
...@@ -173,6 +181,10 @@ int main(int argc, char *argv[]) ...@@ -173,6 +181,10 @@ int main(int argc, char *argv[])
break; break;
} }
for (i = 0; i<numOfTables; ++i) {
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
}
taos_close(taos); taos_close(taos);
free(tableList); free(tableList);
...@@ -214,8 +226,8 @@ void taos_insert_call_back(void *param, TAOS_RES *tres, int code) ...@@ -214,8 +226,8 @@ void taos_insert_call_back(void *param, TAOS_RES *tres, int code)
} }
else { else {
printf("%d rows data are inserted into %s\n", points, pTable->name); printf("%d rows data are inserted into %s\n", points, pTable->name);
tablesProcessed++; tablesInsertProcessed++;
if (tablesProcessed >= numOfTables) { if (tablesInsertProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables); printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables);
...@@ -251,15 +263,17 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) ...@@ -251,15 +263,17 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
//taos_free_result(tres); //taos_free_result(tres);
printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name); printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name);
tablesProcessed++; tablesSelectProcessed++;
if (tablesProcessed >= numOfTables) { if (tablesSelectProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables); printf("%lld mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables);
} }
}
taos_free_result(tres); taos_free_result(tres);
}
} }
void taos_select_call_back(void *param, TAOS_RES *tres, int code) void taos_select_call_back(void *param, TAOS_RES *tres, int code)
...@@ -276,6 +290,4 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code) ...@@ -276,6 +290,4 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code)
taos_cleanup(); taos_cleanup();
exit(1); exit(1);
} }
taos_free_result(tres);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册