未验证 提交 2c4bcbe3 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Feature/sangshuduo/td 3317 taosdemo interlace (#5596)

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

check offset 0.

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

fix sample file import bug.

* [TD-3316] <fix>: add test case for limit and offset. fix sample data issue.

* [TD-3327] <fix>: fix taosdemo segfault when import data from sample data file.

* [TD-3317] <feature>: make taosdemo support interlace mode.

json parameter rows_per_tbl support.

* [TD-3317] <feature>: support interlace mode.

refactor

* [TD-3317] <feature>: support interlace mode.

refactor

* [TD-3317] <feature>: support interlace mode insertion.

refactor.

* [TD-3317] <feature>: support interlace mode insertion.

change json file.

* [TD-3317] <feature>: support interlace mode insertion.

fix multithread create table regression.

* [TD-3317] <feature>: support interlace mode insertion.

working but not perfect.

* [TD-3317] <feature>: support interlace mode insertion.

rename lowaTest with taosdemoTestWithJson

* [TD-3317] <feature>: support interlace mode insertion.

perfect

* [TD-3317] <feature>: support interlace mode insertion.

cleanup.

* [TD-3317] <feature>: support interlace mode insertion.

adjust algorithm of loop times.

* [TD-3317] <feature>: support interlace mode insertion.

fix delay time bug.

* [TD-3317] <feature>: support interlace mode insertion.

fix progressive timestamp bug.

* [TD-3317] <feature>: support interlace mode insertion.

add an option for performance print.

* [TD-3317] <feature>: support interlace mode insertion.

change json test case with less table for acceleration.

* [TD-3317] <feature>: support interlace mode insertion.

change progressive mode timestamp step and testcase.

* [TD-3197] <fix>: fix taosdemo coverity scan issues.

* [TD-3197] <fix>: fix taosdemo coverity scan issue.

fix subscribeTest pids uninitialized.

* [TD-3317] <feature>: support interlace mode insertion.

add time shift for no sleep time.

* [TD-3317] <feature>: support interlace insert.

rework timestamp.

* [TD-3317] <feature>: support interlace mode insertion.

change rows_per_tbl to interlace_rows.

* [TD-3317] <feature>: taosdemo suppoert interlace mode.

remove trailing spaces.

* [TD-3317] <feature>: taosdemo support interlace insertion.

prompt if interlace > num_of_records_per_req
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 cb4ca3a5
......@@ -37,10 +37,10 @@
#include <unistd.h>
#include <wordexp.h>
#include <regex.h>
#else
#else
#include <regex.h>
#include <stdio.h>
#endif
#endif
#include <assert.h>
#include <stdlib.h>
......@@ -100,7 +100,7 @@ typedef enum CREATE_SUB_TALBE_MOD_EN {
AUTO_CREATE_SUBTBL,
NO_CREATE_SUBTBL
} CREATE_SUB_TALBE_MOD_EN;
typedef enum TALBE_EXISTS_EN {
TBL_NO_EXISTS,
TBL_ALREADY_EXISTS,
......@@ -108,7 +108,7 @@ typedef enum TALBE_EXISTS_EN {
} TALBE_EXISTS_EN;
enum MODE {
SYNC,
SYNC,
ASYNC,
MODE_BUT
};
......@@ -131,7 +131,7 @@ enum _show_db_index {
TSDB_SHOW_DB_NTABLES_INDEX,
TSDB_SHOW_DB_VGROUPS_INDEX,
TSDB_SHOW_DB_REPLICA_INDEX,
TSDB_SHOW_DB_QUORUM_INDEX,
TSDB_SHOW_DB_QUORUM_INDEX,
TSDB_SHOW_DB_DAYS_INDEX,
TSDB_SHOW_DB_KEEP_INDEX,
TSDB_SHOW_DB_CACHE_INDEX,
......@@ -153,10 +153,10 @@ enum _show_stables_index {
TSDB_SHOW_STABLES_NAME_INDEX,
TSDB_SHOW_STABLES_CREATED_TIME_INDEX,
TSDB_SHOW_STABLES_COLUMNS_INDEX,
TSDB_SHOW_STABLES_METRIC_INDEX,
TSDB_SHOW_STABLES_UID_INDEX,
TSDB_SHOW_STABLES_METRIC_INDEX,
TSDB_SHOW_STABLES_UID_INDEX,
TSDB_SHOW_STABLES_TID_INDEX,
TSDB_SHOW_STABLES_VGID_INDEX,
TSDB_SHOW_STABLES_VGID_INDEX,
TSDB_MAX_SHOW_STABLES
};
......@@ -220,7 +220,7 @@ typedef struct SColumn_S {
char field[TSDB_COL_NAME_LEN + 1];
char dataType[MAX_TB_NAME_SIZE];
int dataLen;
char note[128];
char note[128];
} StrColumn;
typedef struct SSuperTable_S {
......@@ -269,7 +269,7 @@ typedef struct SSuperTable_S {
int tagSampleCount;
int tagUsePos;
// statistics
// statistics
int64_t totalInsertRows;
int64_t totalAffectedRows;
} SSuperTable;
......@@ -278,10 +278,10 @@ typedef struct {
char name[TSDB_DB_NAME_LEN + 1];
char create_time[32];
int32_t ntables;
int32_t vgroups;
int32_t vgroups;
int16_t replica;
int16_t quorum;
int16_t days;
int16_t days;
char keeplist[32];
int32_t cache; //MB
int32_t blocks;
......@@ -296,14 +296,14 @@ typedef struct {
char status[16];
} SDbInfo;
typedef struct SDbCfg_S {
typedef struct SDbCfg_S {
// int maxtablesPerVnode;
int minRows;
int minRows;
int maxRows;
int comp;
int walLevel;
int cacheLast;
int fsync;
int fsync;
int replica;
int update;
int keep;
......@@ -311,7 +311,7 @@ typedef struct SDbCfg_S {
int cache;
int blocks;
int quorum;
char precision[MAX_TB_NAME_SIZE];
char precision[MAX_TB_NAME_SIZE];
} SDbCfg;
typedef struct SDataBase_S {
......@@ -333,7 +333,7 @@ typedef struct SDbs_S {
bool insert_only;
bool do_aggreFunc;
bool queryMode;
int threadCount;
int threadCountByCreateTbl;
int dbCount;
......@@ -372,7 +372,7 @@ typedef struct SubQueryInfo_S {
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char* childTblName;
} SubQueryInfo;
......@@ -386,7 +386,7 @@ typedef struct SQueryMetaInfo_S {
char queryMode[MAX_TB_NAME_SIZE]; // taosc, restful
SuperQueryInfo superQueryInfo;
SubQueryInfo subQueryInfo;
SubQueryInfo subQueryInfo;
} SQueryMetaInfo;
typedef struct SThreadInfo_S {
......@@ -424,7 +424,7 @@ typedef struct SThreadInfo_S {
int64_t avgDelay;
int64_t maxDelay;
int64_t minDelay;
} threadInfo;
#ifdef WINDOWS
......@@ -463,12 +463,12 @@ static void setupForAnsiEscape(void) {
if(!SetConsoleMode(g_stdoutHandle, mode)) {
exit(GetLastError());
}
}
}
static void resetAfterAnsiEscape(void) {
// Reset colors
printf("\x1b[0m");
printf("\x1b[0m");
// Reset console mode
if(!SetConsoleMode(g_stdoutHandle, g_consoleMode)) {
......@@ -508,7 +508,7 @@ int32_t randint[MAX_PREPARED_RAND];
int64_t randbigint[MAX_PREPARED_RAND];
float randfloat[MAX_PREPARED_RAND];
double randdouble[MAX_PREPARED_RAND];
char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)",
char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)",
"max(col0)", "min(col0)", "first(col0)", "last(col0)"};
SArguments g_args = {
......@@ -517,7 +517,7 @@ SArguments g_args = {
"127.0.0.1", // host
6030, // port
"root", // user
#ifdef _TD_POWER_
#ifdef _TD_POWER_
"powerdb", // password
#else
"taosdata", // password
......@@ -599,7 +599,7 @@ static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
#define TD_VERNUMBER "unknown"
#endif
#ifndef TAOSDEMO_STATUS
#ifndef TAOSDEMO_STATUS
#define TAOSDEMO_STATUS "unknown"
#endif
......@@ -619,62 +619,62 @@ static void printVersion() {
static void printHelp() {
char indent[10] = " ";
printf("%s%s%s%s\n", indent, "-f", indent,
printf("%s%s%s%s\n", indent, "-f", indent,
"The meta file to the execution procedure. Default is './meta.json'.");
printf("%s%s%s%s\n", indent, "-u", indent,
printf("%s%s%s%s\n", indent, "-u", indent,
"The TDengine user name to use when connecting to the server. Default is 'root'.");
#ifdef _TD_POWER_
printf("%s%s%s%s\n", indent, "-P", indent,
printf("%s%s%s%s\n", indent, "-P", indent,
"The password to use when connecting to the server. Default is 'powerdb'.");
printf("%s%s%s%s\n", indent, "-c", indent,
printf("%s%s%s%s\n", indent, "-c", indent,
"Configuration directory. Default is '/etc/power/'.");
#else
printf("%s%s%s%s\n", indent, "-P", indent,
printf("%s%s%s%s\n", indent, "-P", indent,
"The password to use when connecting to the server. Default is 'taosdata'.");
printf("%s%s%s%s\n", indent, "-c", indent,
printf("%s%s%s%s\n", indent, "-c", indent,
"Configuration directory. Default is '/etc/taos/'.");
#endif
printf("%s%s%s%s\n", indent, "-h", indent,
printf("%s%s%s%s\n", indent, "-h", indent,
"The host to connect to TDengine. Default is localhost.");
printf("%s%s%s%s\n", indent, "-p", indent,
printf("%s%s%s%s\n", indent, "-p", indent,
"The TCP/IP port number to use for the connection. Default is 0.");
printf("%s%s%s%s\n", indent, "-d", indent,
printf("%s%s%s%s\n", indent, "-d", indent,
"Destination database. Default is 'test'.");
printf("%s%s%s%s\n", indent, "-a", indent,
printf("%s%s%s%s\n", indent, "-a", indent,
"Set the replica parameters of the database, Default 1, min: 1, max: 3.");
printf("%s%s%s%s\n", indent, "-m", indent,
printf("%s%s%s%s\n", indent, "-m", indent,
"Table prefix name. Default is 't'.");
printf("%s%s%s%s\n", indent, "-s", indent, "The select sql file.");
printf("%s%s%s%s\n", indent, "-N", indent, "Use normal table flag.");
printf("%s%s%s%s\n", indent, "-o", indent,
printf("%s%s%s%s\n", indent, "-o", indent,
"Direct output to the named file. Default is './output.txt'.");
printf("%s%s%s%s\n", indent, "-q", indent,
printf("%s%s%s%s\n", indent, "-q", indent,
"Query mode--0: SYNC, 1: ASYNC. Default is SYNC.");
printf("%s%s%s%s\n", indent, "-b", indent,
printf("%s%s%s%s\n", indent, "-b", indent,
"The data_type of columns, default: TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,BINARY,NCHAR,BOOL,TIMESTAMP.");
printf("%s%s%s%s\n", indent, "-w", indent,
printf("%s%s%s%s\n", indent, "-w", indent,
"The length of data_type 'BINARY' or 'NCHAR'. Default is 16");
printf("%s%s%s%s\n", indent, "-l", indent,
printf("%s%s%s%s\n", indent, "-l", indent,
"The number of columns per record. Default is 10.");
printf("%s%s%s%s\n", indent, "-T", indent,
printf("%s%s%s%s\n", indent, "-T", indent,
"The number of threads. Default is 10.");
printf("%s%s%s%s\n", indent, "-i", indent,
printf("%s%s%s%s\n", indent, "-i", indent,
"The sleep time (ms) between insertion. Default is 0.");
printf("%s%s%s%s\n", indent, "-r", indent,
printf("%s%s%s%s\n", indent, "-r", indent,
"The number of records per request. Default is 100.");
printf("%s%s%s%s\n", indent, "-t", indent,
printf("%s%s%s%s\n", indent, "-t", indent,
"The number of tables. Default is 10000.");
printf("%s%s%s%s\n", indent, "-n", indent,
printf("%s%s%s%s\n", indent, "-n", indent,
"The number of records per table. Default is 10000.");
printf("%s%s%s%s\n", indent, "-x", indent, "Not insert only flag.");
printf("%s%s%s%s\n", indent, "-y", indent, "Default input yes for prompt.");
printf("%s%s%s%s\n", indent, "-O", indent,
printf("%s%s%s%s\n", indent, "-O", indent,
"Insert mode--0: In order, > 0: disorder ratio. Default is in order.");
printf("%s%s%s%s\n", indent, "-R", indent,
printf("%s%s%s%s\n", indent, "-R", indent,
"Out of order data's range, ms, default is 1000.");
printf("%s%s%s%s\n", indent, "-g", indent,
printf("%s%s%s%s\n", indent, "-g", indent,
"Print debug info.");
printf("%s%s%s%s\n", indent, "-V, --version", indent,
printf("%s%s%s%s\n", indent, "-V, --version", indent,
"Print version info.");
/* printf("%s%s%s%s\n", indent, "-D", indent,
"if elete database if exists. 0: no, 1: yes, default is 1");
......@@ -696,6 +696,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
taos_options(TSDB_OPTION_CONFIGDIR, full_path.we_wordv[0]);
wordfree(&full_path);
} else if (strcmp(argv[i], "-h") == 0) {
arguments->host = argv[++i];
} else if (strcmp(argv[i], "-p") == 0) {
......@@ -793,7 +794,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-O") == 0) {
arguments->disorderRatio = atoi(argv[++i]);
if (arguments->disorderRatio > 1
if (arguments->disorderRatio > 1
|| arguments->disorderRatio < 0) {
arguments->disorderRatio = 0;
} else if (arguments->disorderRatio == 1) {
......@@ -801,8 +802,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
} else if (strcmp(argv[i], "-R") == 0) {
arguments->disorderRange = atoi(argv[++i]);
if (arguments->disorderRange == 1
&& (arguments->disorderRange > 50
if (arguments->disorderRange == 1
&& (arguments->disorderRange > 50
|| arguments->disorderRange <= 0)) {
arguments->disorderRange = 10;
}
......@@ -835,7 +836,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|| arguments->verbose_print) {
printf("###################################################################\n");
printf("# meta file: %s\n", arguments->metaFile);
printf("# Server IP: %s:%hu\n",
printf("# Server IP: %s:%hu\n",
arguments->host == NULL ? "localhost" : arguments->host,
arguments->port );
printf("# User: %s\n", arguments->user);
......@@ -862,7 +863,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
if (arguments->disorderRatio) {
printf("# Data order: %d\n", arguments->disorderRatio);
printf("# Data out of order rate: %d\n", arguments->disorderRange);
}
printf("# Delete method: %d\n", arguments->method_of_delete);
printf("# Answer yes when prompt: %d\n", arguments->answer_yes);
......@@ -930,7 +931,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
return 0;
}
static void getResult(TAOS_RES *res, char* resultFileName) {
static void getResult(TAOS_RES *res, char* resultFileName) {
TAOS_ROW row = NULL;
int num_rows = 0;
int num_fields = taos_field_count(res);
......@@ -940,13 +941,15 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
if (resultFileName[0] != 0) {
fp = fopen(resultFileName, "at");
if (fp == NULL) {
errorPrint("%s() LN%d, failed to open result file: %s, result will not save to file\n", __func__, __LINE__, resultFileName);
errorPrint("%s() LN%d, failed to open result file: %s, result will not save to file\n",
__func__, __LINE__, resultFileName);
}
}
char* databuf = (char*) calloc(1, 100*1024*1024);
if (databuf == NULL) {
errorPrint("%s() LN%d, failed to malloc, warning: save result to file slowly!\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to malloc, warning: save result to file slowly!\n",
__func__, __LINE__);
if (fp)
fclose(fp);
return ;
......@@ -982,7 +985,7 @@ static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName)
taos_free_result(res);
return;
}
getResult(res, resultFileName);
taos_free_result(res);
}
......@@ -1030,14 +1033,13 @@ static int64_t rand_bigint(){
cursor++;
cursor = cursor % MAX_PREPARED_RAND;
return randbigint[cursor];
}
static float rand_float(){
static int cursor;
cursor++;
cursor = cursor % MAX_PREPARED_RAND;
return randfloat[cursor];
return randfloat[cursor];
}
static const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
......@@ -1110,9 +1112,9 @@ static int printfInsertMeta() {
printf("database[\033[33m%d\033[0m]:\n", i);
printf(" database[%d] name: \033[33m%s\033[0m\n", i, g_Dbs.db[i].dbName);
if (0 == g_Dbs.db[i].drop) {
printf(" drop: \033[33mno\033[0m\n");
}else {
printf(" drop: \033[33myes\033[0m\n");
printf(" drop: \033[33mno\033[0m\n");
} else {
printf(" drop: \033[33myes\033[0m\n");
}
if (g_Dbs.db[i].dbCfg.blocks > 0) {
......@@ -1165,8 +1167,8 @@ static int printfInsertMeta() {
printf(" super table count: \033[33m%d\033[0m\n", g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
printf(" super table[\033[33m%d\033[0m]:\n", j);
printf(" stbName: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sTblName);
printf(" stbName: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sTblName);
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "no");
......@@ -1175,7 +1177,7 @@ static int printfInsertMeta() {
} else {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "error");
}
if (TBL_NO_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
printf(" childTblExists: \033[33m%s\033[0m\n", "no");
} else if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
......@@ -1278,9 +1280,9 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, "database[%d]:\n", i);
fprintf(fp, " database[%d] name: %s\n", i, g_Dbs.db[i].dbName);
if (0 == g_Dbs.db[i].drop) {
fprintf(fp, " drop: no\n");
fprintf(fp, " drop: no\n");
}else {
fprintf(fp, " drop: yes\n");
fprintf(fp, " drop: yes\n");
}
if (g_Dbs.db[i].dbCfg.blocks > 0) {
......@@ -1331,8 +1333,8 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " super table count: %d\n", g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
fprintf(fp, " super table[%d]:\n", j);
fprintf(fp, " stbName: %s\n", g_Dbs.db[i].superTbls[j].sTblName);
fprintf(fp, " stbName: %s\n", g_Dbs.db[i].superTbls[j].sTblName);
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
fprintf(fp, " autoCreateTable: %s\n", "no");
......@@ -1341,7 +1343,7 @@ static void printfInsertMetaToFile(FILE* fp) {
} else {
fprintf(fp, " autoCreateTable: %s\n", "error");
}
if (TBL_NO_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
fprintf(fp, " childTblExists: %s\n", "no");
} else if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
......@@ -1349,33 +1351,33 @@ static void printfInsertMetaToFile(FILE* fp) {
} else {
fprintf(fp, " childTblExists: %s\n", "error");
}
fprintf(fp, " childTblCount: %d\n", g_Dbs.db[i].superTbls[j].childTblCount);
fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode);
fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
fprintf(fp, " childTblCount: %d\n", g_Dbs.db[i].superTbls[j].childTblCount);
fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode);
fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
fprintf(fp, " interlace rows: %d\n", g_Dbs.db[i].superTbls[j].interlaceRows);
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
fprintf(fp, " insert interval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval);
}
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
fprintf(fp, " multiThreadWriteOneTbl: no\n");
fprintf(fp, " multiThreadWriteOneTbl: no\n");
}else {
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
}
fprintf(fp, " interlaceRows: %d\n", g_Dbs.db[i].superTbls[j].interlaceRows);
fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange);
fprintf(fp, " interlaceRows: %d\n", g_Dbs.db[i].superTbls[j].interlaceRows);
fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange);
fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio);
fprintf(fp, " maxSqlLen: %d\n", g_Dbs.db[i].superTbls[j].maxSqlLen);
fprintf(fp, " timeStampStep: %d\n", g_Dbs.db[i].superTbls[j].timeStampStep);
fprintf(fp, " startTimestamp: %s\n", g_Dbs.db[i].superTbls[j].startTimestamp);
fprintf(fp, " maxSqlLen: %d\n", g_Dbs.db[i].superTbls[j].maxSqlLen);
fprintf(fp, " timeStampStep: %d\n", g_Dbs.db[i].superTbls[j].timeStampStep);
fprintf(fp, " startTimestamp: %s\n", g_Dbs.db[i].superTbls[j].startTimestamp);
fprintf(fp, " sampleFormat: %s\n", g_Dbs.db[i].superTbls[j].sampleFormat);
fprintf(fp, " sampleFile: %s\n", g_Dbs.db[i].superTbls[j].sampleFile);
fprintf(fp, " tagsFile: %s\n", g_Dbs.db[i].superTbls[j].tagsFile);
fprintf(fp, " sampleFile: %s\n", g_Dbs.db[i].superTbls[j].sampleFile);
fprintf(fp, " tagsFile: %s\n", g_Dbs.db[i].superTbls[j].tagsFile);
fprintf(fp, " columnCount: %d\n ", g_Dbs.db[i].superTbls[j].columnCount);
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
......@@ -1565,7 +1567,7 @@ static int xDumpResultToFile(const char* fname, TAOS_RES* tres) {
fprintf(fp, "%s", fields[col].name);
}
fputc('\n', fp);
int numOfRows = 0;
do {
int32_t* length = taos_fetch_lengths(tres);
......@@ -1586,14 +1588,14 @@ static int xDumpResultToFile(const char* fname, TAOS_RES* tres) {
return numOfRows;
}
static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
TAOS_RES * res;
static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
TAOS_RES * res;
TAOS_ROW row = NULL;
int count = 0;
res = taos_query(taos, "show databases;");
res = taos_query(taos, "show databases;");
int32_t code = taos_errno(res);
if (code != 0) {
errorPrint( "failed to run <show databases>, reason: %s\n", taos_errstr(res));
return -1;
......@@ -1617,13 +1619,13 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
*(int64_t*)row[TSDB_SHOW_DB_CREATED_TIME_INDEX],
TSDB_TIME_PRECISION_MILLI);
dbInfos[count]->ntables = *((int32_t *)row[TSDB_SHOW_DB_NTABLES_INDEX]);
dbInfos[count]->vgroups = *((int32_t *)row[TSDB_SHOW_DB_VGROUPS_INDEX]);
dbInfos[count]->vgroups = *((int32_t *)row[TSDB_SHOW_DB_VGROUPS_INDEX]);
dbInfos[count]->replica = *((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]);
dbInfos[count]->quorum = *((int16_t *)row[TSDB_SHOW_DB_QUORUM_INDEX]);
dbInfos[count]->days = *((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]);
dbInfos[count]->days = *((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]);
tstrncpy(dbInfos[count]->keeplist, (char *)row[TSDB_SHOW_DB_KEEP_INDEX],
fields[TSDB_SHOW_DB_KEEP_INDEX].bytes);
fields[TSDB_SHOW_DB_KEEP_INDEX].bytes);
dbInfos[count]->cache = *((int32_t *)row[TSDB_SHOW_DB_CACHE_INDEX]);
dbInfos[count]->blocks = *((int32_t *)row[TSDB_SHOW_DB_BLOCKS_INDEX]);
dbInfos[count]->minrows = *((int32_t *)row[TSDB_SHOW_DB_MINROWS_INDEX]);
......@@ -1633,16 +1635,16 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
tstrncpy(dbInfos[count]->precision,
tstrncpy(dbInfos[count]->precision,
(char *)row[TSDB_SHOW_DB_PRECISION_INDEX],
fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes);
fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes);
dbInfos[count]->update = *((int8_t *)row[TSDB_SHOW_DB_UPDATE_INDEX]);
tstrncpy(dbInfos[count]->status, (char *)row[TSDB_SHOW_DB_STATUS_INDEX],
fields[TSDB_SHOW_DB_STATUS_INDEX].bytes);
fields[TSDB_SHOW_DB_STATUS_INDEX].bytes);
count++;
if (count > MAX_DATABASE_COUNT) {
errorPrint( "The database count overflow than %d\n", MAX_DATABASE_COUNT);
errorPrint( "The database count overflow than %d\n", MAX_DATABASE_COUNT);
break;
}
}
......@@ -1664,11 +1666,11 @@ static void printfDbInfoForQueryToFile(char* filename, SDbInfo* dbInfos, int ind
fprintf(fp, "name: %s\n", dbInfos->name);
fprintf(fp, "created_time: %s\n", dbInfos->create_time);
fprintf(fp, "ntables: %d\n", dbInfos->ntables);
fprintf(fp, "vgroups: %d\n", dbInfos->vgroups);
fprintf(fp, "vgroups: %d\n", dbInfos->vgroups);
fprintf(fp, "replica: %d\n", dbInfos->replica);
fprintf(fp, "quorum: %d\n", dbInfos->quorum);
fprintf(fp, "days: %d\n", dbInfos->days);
fprintf(fp, "keep0,keep1,keep(D): %s\n", dbInfos->keeplist);
fprintf(fp, "days: %d\n", dbInfos->days);
fprintf(fp, "keep0,keep1,keep(D): %s\n", dbInfos->keeplist);
fprintf(fp, "cache(MB): %d\n", dbInfos->cache);
fprintf(fp, "blocks: %d\n", dbInfos->blocks);
fprintf(fp, "minrows: %d\n", dbInfos->minrows);
......@@ -1676,10 +1678,10 @@ static void printfDbInfoForQueryToFile(char* filename, SDbInfo* dbInfos, int ind
fprintf(fp, "wallevel: %d\n", dbInfos->wallevel);
fprintf(fp, "fsync: %d\n", dbInfos->fsync);
fprintf(fp, "comp: %d\n", dbInfos->comp);
fprintf(fp, "cachelast: %d\n", dbInfos->cachelast);
fprintf(fp, "precision: %s\n", dbInfos->precision);
fprintf(fp, "cachelast: %d\n", dbInfos->cachelast);
fprintf(fp, "precision: %s\n", dbInfos->precision);
fprintf(fp, "update: %d\n", dbInfos->update);
fprintf(fp, "status: %s\n", dbInfos->status);
fprintf(fp, "status: %s\n", dbInfos->status);
fprintf(fp, "\n");
fclose(fp);
......@@ -1729,7 +1731,7 @@ static void printfQuerySystemInfo(TAOS * taos) {
snprintf(buffer, MAX_QUERY_SQL_LENGTH, "show %s.vgroups;", dbInfos[i]->name);
res = taos_query(taos, buffer);
xDumpResultToFile(filename, res);
// show db.stables
snprintf(buffer, MAX_QUERY_SQL_LENGTH, "show %s.stables;", dbInfos[i]->name);
res = taos_query(taos, buffer);
......@@ -1739,7 +1741,6 @@ static void printfQuerySystemInfo(TAOS * taos) {
}
free(dbInfos);
}
static int postProceSql(char* host, uint16_t port, char* sqlstr)
......@@ -1991,17 +1992,17 @@ static char* generateTagVaulesForStb(SSuperTable* stbInfo) {
}
dataLen -= 2;
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, ")");
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, ")");
return dataBuf;
}
static int calcRowLen(SSuperTable* superTbls) {
static int calcRowLen(SSuperTable* superTbls) {
int colIndex;
int lenOfOneRow = 0;
for (colIndex = 0; colIndex < superTbls->columnCount; colIndex++) {
char* dataType = superTbls->columns[colIndex].dataType;
if (strcasecmp(dataType, "BINARY") == 0) {
lenOfOneRow += superTbls->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "NCHAR") == 0) {
......@@ -2018,9 +2019,9 @@ static int calcRowLen(SSuperTable* superTbls) {
lenOfOneRow += 6;
} else if (strcasecmp(dataType, "FLOAT") == 0) {
lenOfOneRow += 22;
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
lenOfOneRow += 42;
} else if (strcasecmp(dataType, "TIMESTAMP") == 0) {
} else if (strcasecmp(dataType, "TIMESTAMP") == 0) {
lenOfOneRow += 21;
} else {
printf("get error data type : %s\n", dataType);
......@@ -2060,7 +2061,7 @@ static int calcRowLen(SSuperTable* superTbls) {
}
superTbls->lenOfTagOfOneRow = lenOfTagOfOneRow;
return 0;
}
......@@ -2072,7 +2073,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
char command[BUFFER_SIZE] = "\0";
char limitBuf[100] = "\0";
TAOS_RES * res;
TAOS_RES * res;
TAOS_ROW row = NULL;
char* childTblName = *childTblNameOfSuperTbl;
......@@ -2081,10 +2082,10 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
snprintf(limitBuf, 100, " limit %d offset %d", limit, offset);
}
//get all child table name use cmd: select tbname from superTblName;
//get all child table name use cmd: select tbname from superTblName;
snprintf(command, BUFFER_SIZE, "select tbname from %s.%s %s", dbName, sTblName, limitBuf);
res = taos_query(taos, command);
res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
taos_free_result(res);
......@@ -2153,7 +2154,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
SSuperTable* superTbls) {
char command[BUFFER_SIZE] = "\0";
TAOS_RES * res;
TAOS_RES * res;
TAOS_ROW row = NULL;
int count = 0;
......@@ -2474,7 +2475,7 @@ static int createDatabases() {
if ((ret != 0) || (g_Dbs.db[i].drop)) {
ret = createSuperTable(taos, g_Dbs.db[i].dbName,
&g_Dbs.db[i].superTbls[j], g_Dbs.use_metric);
if (0 != ret) {
errorPrint("\ncreate super table %d failed!\n\n", j);
taos_close(taos);
......@@ -2618,7 +2619,7 @@ static int startMultiThreadCreateChildTable(
int b = 0;
b = ntables % threads;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
......@@ -2806,7 +2807,7 @@ static int readSampleFromCsvFileToMem(
ssize_t readLen = 0;
char * line = NULL;
int getRows = 0;
FILE* fp = fopen(superTblInfo->sampleFile, "r");
if (fp == NULL) {
errorPrint( "Failed to open sample file: %s, reason:%s\n",
......@@ -2828,7 +2829,7 @@ static int readSampleFromCsvFileToMem(
}
continue;
}
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0;
}
......@@ -2939,7 +2940,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__);
goto PARSE_OVER;
}
int tagSize = cJSON_GetArraySize(tags);
if (tagSize > MAX_TAG_COUNT) {
debugPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n", __func__, __LINE__, MAX_TAG_COUNT);
......@@ -3025,7 +3026,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON* user = cJSON_GetObjectItem(root, "user");
if (user && user->type == cJSON_String && user->valuestring != NULL) {
tstrncpy(g_Dbs.user, user->valuestring, MAX_DB_NAME_SIZE);
tstrncpy(g_Dbs.user, user->valuestring, MAX_DB_NAME_SIZE);
} else if (!user) {
tstrncpy(g_Dbs.user, "root", MAX_DB_NAME_SIZE);
}
......@@ -3052,8 +3053,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else {
printf("ERROR: failed to read json, threads not found\n");
goto PARSE_OVER;
}
}
cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl");
if (threads2 && threads2->type == cJSON_Number) {
g_Dbs.threadCountByCreateTbl = threads2->valueint;
......@@ -3063,7 +3064,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
errorPrint("%s() LN%d, failed to read json, threads2 not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
}
cJSON* gInsertInterval = cJSON_GetObjectItem(root, "insert_interval");
if (gInsertInterval && gInsertInterval->type == cJSON_Number) {
......@@ -3088,12 +3089,23 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON* interlaceRows = cJSON_GetObjectItem(root, "interlace_rows");
if (interlaceRows && interlaceRows->type == cJSON_Number) {
g_args.interlace_rows = interlaceRows->valueint;
// rows per table need be less than insert batch
if (g_args.interlace_rows > g_args.num_of_RPR) {
printf("NOTICE: interlace rows value %d > num_of_records_per_request %d\n\n",
g_args.interlace_rows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_request %d\n\n",
g_args.num_of_RPR);
printf(" press Enter key to continue or Ctrl+C to stop.");
(void)getchar();
g_args.interlace_rows = g_args.num_of_RPR;
}
} else if (!interlaceRows) {
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n", __func__, __LINE__);
goto PARSE_OVER;
}
}
cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len");
if (maxSqlLen && maxSqlLen->type == cJSON_Number) {
......@@ -3104,7 +3116,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n", __func__, __LINE__);
goto PARSE_OVER;
}
cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req");
if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
......@@ -3117,7 +3128,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
}
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
if (answerPrompt
if (answerPrompt
&& answerPrompt->type == cJSON_String
&& answerPrompt->valuestring != NULL) {
if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) {
......@@ -3132,7 +3143,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else {
printf("ERROR: failed to read json, confirm_parameter_prompt not found\n");
goto PARSE_OVER;
}
}
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (!dbs || dbs->type != cJSON_Array) {
......@@ -3159,7 +3170,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, dbinfo not found\n");
goto PARSE_OVER;
}
cJSON *dbName = cJSON_GetObjectItem(dbinfo, "name");
if (!dbName || dbName->type != cJSON_String || dbName->valuestring == NULL) {
printf("ERROR: failed to read json, db name not found\n");
......@@ -3173,7 +3184,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
g_Dbs.db[i].drop = true;
} else {
g_Dbs.db[i].drop = false;
}
}
} else if (!drop) {
g_Dbs.db[i].drop = g_args.drop_database;
} else {
......@@ -3224,7 +3235,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, keep not found\n");
goto PARSE_OVER;
}
cJSON* days = cJSON_GetObjectItem(dbinfo, "days");
if (days && days->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.days = days->valueint;
......@@ -3234,7 +3245,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, days not found\n");
goto PARSE_OVER;
}
cJSON* cache = cJSON_GetObjectItem(dbinfo, "cache");
if (cache && cache->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.cache = cache->valueint;
......@@ -3244,7 +3255,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, cache not found\n");
goto PARSE_OVER;
}
cJSON* blocks= cJSON_GetObjectItem(dbinfo, "blocks");
if (blocks && blocks->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.blocks = blocks->valueint;
......@@ -3333,15 +3344,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else {
printf("ERROR: failed to read json, fsync not found\n");
goto PARSE_OVER;
}
}
// super_talbes
// super_talbes
cJSON *stables = cJSON_GetObjectItem(dbinfos, "super_tables");
if (!stables || stables->type != cJSON_Array) {
printf("ERROR: failed to read json, super_tables not found\n");
goto PARSE_OVER;
}
}
int stbSize = cJSON_GetArraySize(stables);
if (stbSize > MAX_SUPER_TABLE_COUNT) {
errorPrint(
......@@ -3354,15 +3365,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
for (int j = 0; j < stbSize; ++j) {
cJSON* stbInfo = cJSON_GetArrayItem(stables, j);
if (stbInfo == NULL) continue;
// dbinfo
// dbinfo
cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name");
if (!stbName || stbName->type != cJSON_String || stbName->valuestring == NULL) {
printf("ERROR: failed to read json, stb name not found\n");
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring, MAX_TB_NAME_SIZE);
cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix");
if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) {
printf("ERROR: failed to read json, childtable_prefix not found\n");
......@@ -3387,7 +3398,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, auto_create_table not found\n");
goto PARSE_OVER;
}
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
......@@ -3396,7 +3407,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else {
printf("ERROR: failed to read json, batch_create_tbl_num not found\n");
goto PARSE_OVER;
}
}
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
if (childTblExists
......@@ -3412,13 +3423,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!childTblExists) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
} else {
errorPrint("%s() LN%d, failed to read json, child_table_exists not found\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, child_table_exists not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count");
if (!count || count->type != cJSON_Number || 0 >= count->valueint) {
errorPrint("%s() LN%d, failed to read json, childtable_count not found\n", __func__, __LINE__);
errorPrint("%s() LN%d, failed to read json, childtable_count not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblCount = count->valueint;
......@@ -3502,10 +3515,11 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else {
printf("ERROR: failed to read json, sample_buf_size not found\n");
goto PARSE_OVER;
}
}
cJSON *sampleFormat = cJSON_GetObjectItem(stbInfo, "sample_format");
if (sampleFormat && sampleFormat->type == cJSON_String && sampleFormat->valuestring != NULL) {
if (sampleFormat && sampleFormat->type
== cJSON_String && sampleFormat->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat,
sampleFormat->valuestring, MAX_DB_NAME_SIZE);
} else if (!sampleFormat) {
......@@ -3513,7 +3527,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else {
printf("ERROR: failed to read json, sample_format not found\n");
goto PARSE_OVER;
}
}
cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file");
if (sampleFile && sampleFile->type == cJSON_String && sampleFile->valuestring != NULL) {
......@@ -3524,7 +3538,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else {
printf("ERROR: failed to read json, sample_file not found\n");
goto PARSE_OVER;
}
}
cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file");
if (tagsFile && tagsFile->type == cJSON_String && tagsFile->valuestring != NULL) {
......@@ -3550,14 +3564,14 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
len = TSDB_MAX_ALLOWED_SQL_LEN;
} else if (len < TSDB_MAX_SQL_LEN) {
len = TSDB_MAX_SQL_LEN;
}
}
g_Dbs.db[i].superTbls[j].maxSqlLen = len;
} else if (!maxSqlLen) {
g_Dbs.db[i].superTbls[j].maxSqlLen = TSDB_MAX_SQL_LEN;
} else {
printf("ERROR: failed to read json, maxSqlLen not found\n");
goto PARSE_OVER;
}
}
cJSON *multiThreadWriteOneTbl =
cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes
......@@ -3568,7 +3582,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 1;
} else {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0;
}
}
} else if (!multiThreadWriteOneTbl) {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0;
} else {
......@@ -3579,6 +3593,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON* interlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
if (interlaceRows && interlaceRows->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint;
// rows per table need be less than insert batch
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) {
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %d > num_of_records_per_request %d\n\n",
i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_request %d\n\n",
g_args.num_of_RPR);
printf(" press Enter key to continue or Ctrl+C to stop.");
(void)getchar();
g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR;
}
} else if (!interlaceRows) {
g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
......@@ -3586,7 +3610,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
"%s() LN%d, failed to read json, interlace rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
}
cJSON* disorderRatio = cJSON_GetObjectItem(stbInfo, "disorder_ratio");
if (disorderRatio && disorderRatio->type == cJSON_Number) {
......@@ -3596,7 +3620,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else {
printf("ERROR: failed to read json, disorderRatio not found\n");
goto PARSE_OVER;
}
}
cJSON* disorderRange = cJSON_GetObjectItem(stbInfo, "disorder_range");
if (disorderRange && disorderRange->type == cJSON_Number) {
......@@ -3636,8 +3660,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
stbInfo, &g_Dbs.db[i].superTbls[j]);
if (false == retVal) {
goto PARSE_OVER;
}
}
}
}
}
ret = true;
......@@ -3703,7 +3727,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else {
printf("ERROR: failed to read json, confirm_parameter_prompt not found\n");
goto PARSE_OVER;
}
}
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (dbs && dbs->type == cJSON_String && dbs->valuestring != NULL) {
......@@ -3722,7 +3746,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
printf("ERROR: failed to read json, query_mode not found\n");
goto PARSE_OVER;
}
// super_table_query
cJSON *superQuery = cJSON_GetObjectItem(root, "specified_table_query");
if (!superQuery) {
......@@ -3731,26 +3755,26 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (superQuery->type != cJSON_Object) {
printf("ERROR: failed to read json, super_table_query not found\n");
goto PARSE_OVER;
} else {
} else {
cJSON* rate = cJSON_GetObjectItem(superQuery, "query_interval");
if (rate && rate->type == cJSON_Number) {
g_queryInfo.superQueryInfo.rate = rate->valueint;
} else if (!rate) {
g_queryInfo.superQueryInfo.rate = 0;
}
cJSON* concurrent = cJSON_GetObjectItem(superQuery, "concurrent");
if (concurrent && concurrent->type == cJSON_Number) {
g_queryInfo.superQueryInfo.concurrent = concurrent->valueint;
} else if (!concurrent) {
g_queryInfo.superQueryInfo.concurrent = 1;
}
cJSON* mode = cJSON_GetObjectItem(superQuery, "mode");
if (mode && mode->type == cJSON_String && mode->valuestring != NULL) {
if (0 == strcmp("sync", mode->valuestring)) {
if (0 == strcmp("sync", mode->valuestring)) {
g_queryInfo.superQueryInfo.subscribeMode = 0;
} else if (0 == strcmp("async", mode->valuestring)) {
} else if (0 == strcmp("async", mode->valuestring)) {
g_queryInfo.superQueryInfo.subscribeMode = 1;
} else {
printf("ERROR: failed to read json, subscribe mod error\n");
......@@ -3759,21 +3783,21 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else {
g_queryInfo.superQueryInfo.subscribeMode = 0;
}
cJSON* interval = cJSON_GetObjectItem(superQuery, "interval");
if (interval && interval->type == cJSON_Number) {
g_queryInfo.superQueryInfo.subscribeInterval = interval->valueint;
} else if (!interval) {
} else if (!interval) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.superQueryInfo.subscribeInterval = 10000;
}
cJSON* restart = cJSON_GetObjectItem(superQuery, "restart");
if (restart && restart->type == cJSON_String && restart->valuestring != NULL) {
if (0 == strcmp("yes", restart->valuestring)) {
if (0 == strcmp("yes", restart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 1;
} else if (0 == strcmp("no", restart->valuestring)) {
} else if (0 == strcmp("no", restart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 0;
} else {
printf("ERROR: failed to read json, subscribe restart error\n");
......@@ -3782,14 +3806,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else {
g_queryInfo.superQueryInfo.subscribeRestart = 1;
}
cJSON* keepProgress = cJSON_GetObjectItem(superQuery, "keepProgress");
if (keepProgress
&& keepProgress->type == cJSON_String
&& keepProgress->valuestring != NULL) {
if (0 == strcmp("yes", keepProgress->valuestring)) {
if (0 == strcmp("yes", keepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", keepProgress->valuestring)) {
} else if (0 == strcmp("no", keepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} else {
printf("ERROR: failed to read json, subscribe keepProgress error\n");
......@@ -3799,14 +3823,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
}
// sqls
// sqls
cJSON* superSqls = cJSON_GetObjectItem(superQuery, "sqls");
if (!superSqls) {
g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (superSqls->type != cJSON_Array) {
printf("ERROR: failed to read json, super sqls not found\n");
goto PARSE_OVER;
} else {
} else {
int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT);
......@@ -3817,7 +3841,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(superSqls, j);
if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) {
printf("ERROR: failed to read json, sql not found\n");
......@@ -3833,7 +3857,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else {
printf("ERROR: failed to read json, super query result file not found\n");
goto PARSE_OVER;
}
}
}
}
}
......@@ -3879,9 +3903,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* submode = cJSON_GetObjectItem(subQuery, "mode");
if (submode && submode->type == cJSON_String && submode->valuestring != NULL) {
if (0 == strcmp("sync", submode->valuestring)) {
if (0 == strcmp("sync", submode->valuestring)) {
g_queryInfo.subQueryInfo.subscribeMode = 0;
} else if (0 == strcmp("async", submode->valuestring)) {
} else if (0 == strcmp("async", submode->valuestring)) {
g_queryInfo.subQueryInfo.subscribeMode = 1;
} else {
printf("ERROR: failed to read json, subscribe mod error\n");
......@@ -3899,12 +3923,12 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
//goto PARSE_OVER;
g_queryInfo.subQueryInfo.subscribeInterval = 10000;
}
cJSON* subrestart = cJSON_GetObjectItem(subQuery, "restart");
if (subrestart && subrestart->type == cJSON_String && subrestart->valuestring != NULL) {
if (0 == strcmp("yes", subrestart->valuestring)) {
if (0 == strcmp("yes", subrestart->valuestring)) {
g_queryInfo.subQueryInfo.subscribeRestart = 1;
} else if (0 == strcmp("no", subrestart->valuestring)) {
} else if (0 == strcmp("no", subrestart->valuestring)) {
g_queryInfo.subQueryInfo.subscribeRestart = 0;
} else {
printf("ERROR: failed to read json, subscribe restart error\n");
......@@ -3913,14 +3937,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else {
g_queryInfo.subQueryInfo.subscribeRestart = 1;
}
cJSON* subkeepProgress = cJSON_GetObjectItem(subQuery, "keepProgress");
if (subkeepProgress &&
subkeepProgress->type == cJSON_String
&& subkeepProgress->valuestring != NULL) {
if (0 == strcmp("yes", subkeepProgress->valuestring)) {
if (0 == strcmp("yes", subkeepProgress->valuestring)) {
g_queryInfo.subQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", subkeepProgress->valuestring)) {
} else if (0 == strcmp("no", subkeepProgress->valuestring)) {
g_queryInfo.subQueryInfo.subscribeKeepProgress = 0;
} else {
printf("ERROR: failed to read json, subscribe keepProgress error\n");
......@@ -3928,27 +3952,27 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
}
} else {
g_queryInfo.subQueryInfo.subscribeKeepProgress = 0;
}
}
// sqls
// sqls
cJSON* subsqls = cJSON_GetObjectItem(subQuery, "sqls");
if (!subsqls) {
g_queryInfo.subQueryInfo.sqlCount = 0;
} else if (subsqls->type != cJSON_Array) {
printf("ERROR: failed to read json, super sqls not found\n");
goto PARSE_OVER;
} else {
} else {
int superSqlSize = cJSON_GetArraySize(subsqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
g_queryInfo.subQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) {
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(subsqls, j);
if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) {
printf("ERROR: failed to read json, sql not found\n");
......@@ -3964,7 +3988,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else {
printf("ERROR: failed to read json, sub query result file not found\n");
goto PARSE_OVER;
}
}
}
}
}
......@@ -4033,7 +4057,7 @@ static bool getInfoFromJsonFile(char* file) {
} else {
errorPrint("%s() LN%d, input json file type error! please input correct file type: insert or query or subscribe\n", __func__, __LINE__);
goto PARSE_OVER;
}
}
PARSE_OVER:
free(content);
......@@ -4043,7 +4067,7 @@ PARSE_OVER:
}
static void prepareSampleData() {
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) {
(void)readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]);
......@@ -4054,7 +4078,7 @@ static void prepareSampleData() {
static void postFreeResource() {
tmfclose(g_fpOfInsertResult);
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) {
free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
......@@ -4105,7 +4129,7 @@ static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp,
static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* stbInfo) {
int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp);
for (int i = 0; i < stbInfo->columnCount; i++) {
for (int i = 0; i < stbInfo->columnCount; i++) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6))
|| (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
......@@ -4316,28 +4340,28 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) {
retLen = getRowDataFromSample(
buffer + len,
superTblInfo->maxSqlLen - len,
buffer + len,
superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * k,
superTblInfo,
superTblInfo,
pSamplePos);
} else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio
if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) {
int64_t d = startTime
+ superTblInfo->timeStampStep * k
- taosRandom() % superTblInfo->disorderRange;
retLen = generateRowData(
buffer + len,
buffer + len,
superTblInfo->maxSqlLen - len,
d,
d,
superTblInfo);
} else {
retLen = generateRowData(
buffer + len,
superTblInfo->maxSqlLen - len,
buffer + len,
superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * k,
superTblInfo);
}
......@@ -4361,7 +4385,7 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
if ((g_args.disorderRatio != 0)
&& (rand_num < g_args.disorderRange)) {
int64_t d = startTime + DEFAULT_TIMESTAMP_STEP * k
- taosRandom() % 1000000 + rand_num;
len = generateData(data, data_type,
......@@ -4489,6 +4513,18 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->threadID, __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows;
int insertMode;
if (interlaceRows > 0) {
insertMode = INTERLACE_INSERT_MODE;
} else {
insertMode = PROGRESSIVE_INSERT_MODE;
}
// TODO: prompt tbl count multple interlace rows and batch
//
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
if (NULL == buffer) {
......@@ -4497,21 +4533,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
strerror(errno));
return NULL;
}
int insertMode;
char tableName[TSDB_TABLE_NAME_LEN];
int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows;
if (interlaceRows > 0) {
insertMode = INTERLACE_INSERT_MODE;
} else {
insertMode = PROGRESSIVE_INSERT_MODE;
}
// rows per table need be less than insert batch
if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR;
char tableName[TSDB_TABLE_NAME_LEN];
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
......@@ -4715,7 +4739,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
strerror(errno));
return NULL;
}
int64_t lastPrintTime = taosGetTimestampMs();
int64_t startTs = taosGetTimestampUs();
int64_t endTs;
......@@ -4856,7 +4880,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
taosMsleep(insert_interval - (winfo->et - winfo->st)/1000); // ms
}
}
char *buffer = calloc(1, winfo->superTblInfo->maxSqlLen);
char *data = calloc(1, MAX_DATA_SIZE);
char *pstr = buffer;
......@@ -4874,7 +4898,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
taos_free_result(res);
return;
}
for (int i = 0; i < g_args.num_of_RPR; i++) {
int rand_num = taosRandom() % 100;
if (0 != winfo->superTblInfo->disorderRatio
......@@ -4911,7 +4935,7 @@ static void *asyncWrite(void *sarg) {
winfo->st = 0;
winfo->et = 0;
winfo->lastTs = winfo->start_time;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
......@@ -5000,7 +5024,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
double start = getCurrentTime();
int startFrom;
if ((superTblInfo) && (superTblInfo->childTblOffset >= 0))
......@@ -5192,7 +5216,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
static void *readTable(void *sarg) {
#if 1
#if 1
threadInfo *rinfo = (threadInfo *)sarg;
TAOS *taos = rinfo->taos;
char command[BUFFER_SIZE] = "\0";
......@@ -5359,7 +5383,7 @@ static int insertTestProcess() {
printf("Press enter key to continue\n\n");
(void)getchar();
}
init_rand_data();
// create database and super tables
......@@ -5448,7 +5472,7 @@ static void *superQueryProcess(void *sarg) {
//char sqlStr[MAX_TB_NAME_SIZE*2];
//sprintf(sqlStr, "use %s", g_queryInfo.dbName);
//queryDB(winfo->taos, sqlStr);
int64_t st = 0;
int64_t et = 0;
......@@ -5603,7 +5627,7 @@ static int queryTestProcess() {
printf("Press enter key to continue\n\n");
(void)getchar();
}
printfQuerySystemInfo(taos);
pthread_t *pids = NULL;
......@@ -5676,20 +5700,20 @@ static int queryTestProcess() {
int ntables = g_queryInfo.subQueryInfo.childTblCount;
int threads = g_queryInfo.subQueryInfo.threadCnt;
int a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int b = 0;
if (threads != 0) {
b = ntables % threads;
}
int startFrom = 0;
for (int i = 0; i < threads; i++) {
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i;
t_info->threadID = i;
......@@ -5705,21 +5729,21 @@ static int queryTestProcess() {
} else {
g_queryInfo.subQueryInfo.threadCnt = 0;
}
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
pthread_join(pids[i], NULL);
}
tmfree((char*)pids);
tmfree((char*)infos);
for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL);
}
tmfree((char*)pidsOfSub);
tmfree((char*)infosOfSub);
// taos_close(taos);// TODO: workaround to use separate taos connection;
return 0;
}
......@@ -5784,7 +5808,7 @@ static void *subSubscribeProcess(void *sarg) {
taos_close(winfo->taos);
return NULL;
}
//int64_t st = 0;
//int64_t et = 0;
do {
......@@ -5871,7 +5895,7 @@ static void *superSubscribeProcess(void *sarg) {
taos_close(winfo->taos);
return NULL;
}
//int64_t st = 0;
//int64_t et = 0;
do {
......@@ -5988,7 +6012,7 @@ static int subscribeTestProcess() {
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + i, NULL, superSubscribeProcess, t_info);
}
//==== create sub threads for query from sub table
pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL;
......@@ -6007,18 +6031,18 @@ static int subscribeTestProcess() {
int ntables = g_queryInfo.subQueryInfo.childTblCount;
int threads = g_queryInfo.subQueryInfo.threadCnt;
int a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int b = 0;
if (threads != 0) {
b = ntables % threads;
}
int startFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i;
......@@ -6033,7 +6057,7 @@ static int subscribeTestProcess() {
}
g_queryInfo.subQueryInfo.threadCnt = threads;
}
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
pthread_join(pids[i], NULL);
}
......@@ -6128,7 +6152,7 @@ static void setParaFromArg(){
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.queryMode = g_args.mode;
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange;
......@@ -6149,7 +6173,7 @@ static void setParaFromArg(){
if (data_type[i] == NULL) {
break;
}
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
data_type[i], MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.len_of_binary;
......@@ -6168,7 +6192,7 @@ static void setParaFromArg(){
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType, "INT", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0;
tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType, "BINARY", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].tagCount = 2;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册