未验证 提交 c06b23c5 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

cherry pick from develop branch. (#6903)

上级 98c026fd
...@@ -54,6 +54,7 @@ ...@@ -54,6 +54,7 @@
#include "tutil.h" #include "tutil.h"
#define STMT_IFACE_ENABLED 0 #define STMT_IFACE_ENABLED 0
#define NANO_SECOND_ENABLED 0
#define REQ_EXTRA_BUF_LEN 1024 #define REQ_EXTRA_BUF_LEN 1024
#define RESP_BUF_LEN 4096 #define RESP_BUF_LEN 4096
...@@ -66,13 +67,6 @@ extern char configDir[]; ...@@ -66,13 +67,6 @@ extern char configDir[];
#define STR_INSERT_INTO "INSERT INTO " #define STR_INSERT_INTO "INSERT INTO "
enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
SUBSCRIBE_TEST, // 2
INVAID_TEST
};
#define MAX_RECORDS_PER_REQ 32766 #define MAX_RECORDS_PER_REQ 32766
#define HEAD_BUFF_LEN TSDB_MAX_COLUMNS*24 // 16*MAX_COLUMNS + (192+32)*2 + insert into .. #define HEAD_BUFF_LEN TSDB_MAX_COLUMNS*24 // 16*MAX_COLUMNS + (192+32)*2 + insert into ..
...@@ -105,6 +99,13 @@ enum TEST_MODE { ...@@ -105,6 +99,13 @@ enum TEST_MODE {
#define DEFAULT_TIMESTAMP_STEP 1 #define DEFAULT_TIMESTAMP_STEP 1
enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
SUBSCRIBE_TEST, // 2
INVAID_TEST
};
typedef enum CREATE_SUB_TALBE_MOD_EN { typedef enum CREATE_SUB_TALBE_MOD_EN {
PRE_CREATE_SUBTBL, PRE_CREATE_SUBTBL,
AUTO_CREATE_SUBTBL, AUTO_CREATE_SUBTBL,
...@@ -112,15 +113,15 @@ typedef enum CREATE_SUB_TALBE_MOD_EN { ...@@ -112,15 +113,15 @@ typedef enum CREATE_SUB_TALBE_MOD_EN {
} CREATE_SUB_TALBE_MOD_EN; } CREATE_SUB_TALBE_MOD_EN;
typedef enum TALBE_EXISTS_EN { typedef enum TALBE_EXISTS_EN {
TBL_NO_EXISTS, TBL_NO_EXISTS,
TBL_ALREADY_EXISTS, TBL_ALREADY_EXISTS,
TBL_EXISTS_BUTT TBL_EXISTS_BUTT
} TALBE_EXISTS_EN; } TALBE_EXISTS_EN;
enum enumSYNC_MODE { enum enumSYNC_MODE {
SYNC_MODE, SYNC_MODE,
ASYNC_MODE, ASYNC_MODE,
MODE_BUT MODE_BUT
}; };
enum enum_TAOS_INTERFACE { enum enum_TAOS_INTERFACE {
...@@ -143,52 +144,52 @@ typedef enum enum_PROGRESSIVE_OR_INTERLACE { ...@@ -143,52 +144,52 @@ typedef enum enum_PROGRESSIVE_OR_INTERLACE {
} PROG_OR_INTERLACE_MODE; } PROG_OR_INTERLACE_MODE;
typedef enum enumQUERY_TYPE { typedef enum enumQUERY_TYPE {
NO_INSERT_TYPE, NO_INSERT_TYPE,
INSERT_TYPE, INSERT_TYPE,
QUERY_TYPE_BUT QUERY_TYPE_BUT
} QUERY_TYPE; } QUERY_TYPE;
enum _show_db_index { enum _show_db_index {
TSDB_SHOW_DB_NAME_INDEX, TSDB_SHOW_DB_NAME_INDEX,
TSDB_SHOW_DB_CREATED_TIME_INDEX, TSDB_SHOW_DB_CREATED_TIME_INDEX,
TSDB_SHOW_DB_NTABLES_INDEX, TSDB_SHOW_DB_NTABLES_INDEX,
TSDB_SHOW_DB_VGROUPS_INDEX, TSDB_SHOW_DB_VGROUPS_INDEX,
TSDB_SHOW_DB_REPLICA_INDEX, TSDB_SHOW_DB_REPLICA_INDEX,
TSDB_SHOW_DB_QUORUM_INDEX, TSDB_SHOW_DB_QUORUM_INDEX,
TSDB_SHOW_DB_DAYS_INDEX, TSDB_SHOW_DB_DAYS_INDEX,
TSDB_SHOW_DB_KEEP_INDEX, TSDB_SHOW_DB_KEEP_INDEX,
TSDB_SHOW_DB_CACHE_INDEX, TSDB_SHOW_DB_CACHE_INDEX,
TSDB_SHOW_DB_BLOCKS_INDEX, TSDB_SHOW_DB_BLOCKS_INDEX,
TSDB_SHOW_DB_MINROWS_INDEX, TSDB_SHOW_DB_MINROWS_INDEX,
TSDB_SHOW_DB_MAXROWS_INDEX, TSDB_SHOW_DB_MAXROWS_INDEX,
TSDB_SHOW_DB_WALLEVEL_INDEX, TSDB_SHOW_DB_WALLEVEL_INDEX,
TSDB_SHOW_DB_FSYNC_INDEX, TSDB_SHOW_DB_FSYNC_INDEX,
TSDB_SHOW_DB_COMP_INDEX, TSDB_SHOW_DB_COMP_INDEX,
TSDB_SHOW_DB_CACHELAST_INDEX, TSDB_SHOW_DB_CACHELAST_INDEX,
TSDB_SHOW_DB_PRECISION_INDEX, TSDB_SHOW_DB_PRECISION_INDEX,
TSDB_SHOW_DB_UPDATE_INDEX, TSDB_SHOW_DB_UPDATE_INDEX,
TSDB_SHOW_DB_STATUS_INDEX, TSDB_SHOW_DB_STATUS_INDEX,
TSDB_MAX_SHOW_DB TSDB_MAX_SHOW_DB
}; };
// -----------------------------------------SHOW TABLES CONFIGURE ------------------------------------- // -----------------------------------------SHOW TABLES CONFIGURE -------------------------------------
enum _show_stables_index { enum _show_stables_index {
TSDB_SHOW_STABLES_NAME_INDEX, TSDB_SHOW_STABLES_NAME_INDEX,
TSDB_SHOW_STABLES_CREATED_TIME_INDEX, TSDB_SHOW_STABLES_CREATED_TIME_INDEX,
TSDB_SHOW_STABLES_COLUMNS_INDEX, TSDB_SHOW_STABLES_COLUMNS_INDEX,
TSDB_SHOW_STABLES_METRIC_INDEX, TSDB_SHOW_STABLES_METRIC_INDEX,
TSDB_SHOW_STABLES_UID_INDEX, TSDB_SHOW_STABLES_UID_INDEX,
TSDB_SHOW_STABLES_TID_INDEX, TSDB_SHOW_STABLES_TID_INDEX,
TSDB_SHOW_STABLES_VGID_INDEX, TSDB_SHOW_STABLES_VGID_INDEX,
TSDB_MAX_SHOW_STABLES TSDB_MAX_SHOW_STABLES
}; };
enum _describe_table_index { enum _describe_table_index {
TSDB_DESCRIBE_METRIC_FIELD_INDEX, TSDB_DESCRIBE_METRIC_FIELD_INDEX,
TSDB_DESCRIBE_METRIC_TYPE_INDEX, TSDB_DESCRIBE_METRIC_TYPE_INDEX,
TSDB_DESCRIBE_METRIC_LENGTH_INDEX, TSDB_DESCRIBE_METRIC_LENGTH_INDEX,
TSDB_DESCRIBE_METRIC_NOTE_INDEX, TSDB_DESCRIBE_METRIC_NOTE_INDEX,
TSDB_MAX_DESCRIBE_METRIC TSDB_MAX_DESCRIBE_METRIC
}; };
/* Used by main to communicate with parse_opt. */ /* Used by main to communicate with parse_opt. */
...@@ -228,7 +229,7 @@ typedef struct SArguments_S { ...@@ -228,7 +229,7 @@ typedef struct SArguments_S {
int64_t num_of_DPT; int64_t num_of_DPT;
int abort; int abort;
uint32_t disorderRatio; // 0: no disorder, >0: x% uint32_t disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision int disorderRange; // ms, us or ns. accordig to database precision
uint32_t method_of_delete; uint32_t method_of_delete;
char ** arg_list; char ** arg_list;
uint64_t totalInsertRows; uint64_t totalInsertRows;
...@@ -237,191 +238,191 @@ typedef struct SArguments_S { ...@@ -237,191 +238,191 @@ typedef struct SArguments_S {
} SArguments; } SArguments;
typedef struct SColumn_S { typedef struct SColumn_S {
char field[TSDB_COL_NAME_LEN]; char field[TSDB_COL_NAME_LEN];
char dataType[16]; char dataType[16];
uint32_t dataLen; uint32_t dataLen;
char note[128]; char note[128];
} StrColumn; } StrColumn;
typedef struct SSuperTable_S { typedef struct SSuperTable_S {
char sTblName[TSDB_TABLE_NAME_LEN]; char sTblName[TSDB_TABLE_NAME_LEN];
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
char childTblPrefix[TSDB_TABLE_NAME_LEN - 20]; // 20 characters reserved for seq char childTblPrefix[TSDB_TABLE_NAME_LEN - 20]; // 20 characters reserved for seq
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
uint16_t childTblExists; uint16_t childTblExists;
int64_t childTblCount; int64_t childTblCount;
uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
uint16_t iface; // 0: taosc, 1: rest, 2: stmt uint16_t iface; // 0: taosc, 1: rest, 2: stmt
int64_t childTblLimit; int64_t childTblLimit;
uint64_t childTblOffset; uint64_t childTblOffset;
// int multiThreadWriteOneTbl; // 0: no, 1: yes // int multiThreadWriteOneTbl; // 0: no, 1: yes
uint32_t interlaceRows; // uint32_t interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x% int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision int disorderRange; // ms, us or ns. according to database precision
uint64_t maxSqlLen; // uint64_t maxSqlLen; //
uint64_t insertInterval; // insert interval, will override global insert interval uint64_t insertInterval; // insert interval, will override global insert interval
int64_t insertRows; int64_t insertRows;
int64_t timeStampStep; int64_t timeStampStep;
char startTimestamp[MAX_TB_NAME_SIZE]; char startTimestamp[MAX_TB_NAME_SIZE];
char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json
char sampleFile[MAX_FILE_NAME_LEN]; char sampleFile[MAX_FILE_NAME_LEN];
char tagsFile[MAX_FILE_NAME_LEN]; char tagsFile[MAX_FILE_NAME_LEN];
uint32_t columnCount; uint32_t columnCount;
StrColumn columns[TSDB_MAX_COLUMNS]; StrColumn columns[TSDB_MAX_COLUMNS];
uint32_t tagCount; uint32_t tagCount;
StrColumn tags[TSDB_MAX_TAGS]; StrColumn tags[TSDB_MAX_TAGS];
char* childTblName; char* childTblName;
char* colsOfCreateChildTable; char* colsOfCreateChildTable;
uint64_t lenOfOneRow; uint64_t lenOfOneRow;
uint64_t lenOfTagOfOneRow; uint64_t lenOfTagOfOneRow;
char* sampleDataBuf; char* sampleDataBuf;
//int sampleRowCount; //int sampleRowCount;
//int sampleUsePos; //int sampleUsePos;
uint32_t tagSource; // 0: rand, 1: tag sample uint32_t tagSource; // 0: rand, 1: tag sample
char* tagDataBuf; char* tagDataBuf;
uint32_t tagSampleCount; uint32_t tagSampleCount;
uint32_t tagUsePos; uint32_t tagUsePos;
// statistics // statistics
uint64_t totalInsertRows; uint64_t totalInsertRows;
uint64_t totalAffectedRows; uint64_t totalAffectedRows;
} SSuperTable; } SSuperTable;
typedef struct { typedef struct {
char name[TSDB_DB_NAME_LEN]; char name[TSDB_DB_NAME_LEN];
char create_time[32]; char create_time[32];
int64_t ntables; int64_t ntables;
int32_t vgroups; int32_t vgroups;
int16_t replica; int16_t replica;
int16_t quorum; int16_t quorum;
int16_t days; int16_t days;
char keeplist[32]; char keeplist[32];
int32_t cache; //MB int32_t cache; //MB
int32_t blocks; int32_t blocks;
int32_t minrows; int32_t minrows;
int32_t maxrows; int32_t maxrows;
int8_t wallevel; int8_t wallevel;
int32_t fsync; int32_t fsync;
int8_t comp; int8_t comp;
int8_t cachelast; int8_t cachelast;
char precision[8]; // time resolution char precision[8]; // time resolution
int8_t update; int8_t update;
char status[16]; char status[16];
} SDbInfo; } SDbInfo;
typedef struct SDbCfg_S { typedef struct SDbCfg_S {
// int maxtablesPerVnode; // int maxtablesPerVnode;
uint32_t minRows; // 0 means default uint32_t minRows; // 0 means default
uint32_t maxRows; // 0 means default uint32_t maxRows; // 0 means default
int comp; int comp;
int walLevel; int walLevel;
int cacheLast; int cacheLast;
int fsync; int fsync;
int replica; int replica;
int update; int update;
int keep; int keep;
int days; int days;
int cache; int cache;
int blocks; int blocks;
int quorum; int quorum;
char precision[8]; char precision[8];
} SDbCfg; } SDbCfg;
typedef struct SDataBase_S { typedef struct SDataBase_S {
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
bool drop; // 0: use exists, 1: if exists, drop then new create bool drop; // 0: use exists, 1: if exists, drop then new create
SDbCfg dbCfg; SDbCfg dbCfg;
uint64_t superTblCount; uint64_t superTblCount;
SSuperTable superTbls[MAX_SUPER_TABLE_COUNT]; SSuperTable superTbls[MAX_SUPER_TABLE_COUNT];
} SDataBase; } SDataBase;
typedef struct SDbs_S { typedef struct SDbs_S {
char cfgDir[MAX_FILE_NAME_LEN]; char cfgDir[MAX_FILE_NAME_LEN];
char host[MAX_HOSTNAME_SIZE]; char host[MAX_HOSTNAME_SIZE];
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
uint16_t port; uint16_t port;
char user[MAX_USERNAME_SIZE]; char user[MAX_USERNAME_SIZE];
char password[MAX_PASSWORD_SIZE]; char password[MAX_PASSWORD_SIZE];
char resultFile[MAX_FILE_NAME_LEN]; char resultFile[MAX_FILE_NAME_LEN];
bool use_metric; bool use_metric;
bool insert_only; bool insert_only;
bool do_aggreFunc; bool do_aggreFunc;
bool asyncMode; bool asyncMode;
uint32_t threadCount; uint32_t threadCount;
uint32_t threadCountByCreateTbl; uint32_t threadCountByCreateTbl;
uint32_t dbCount; uint32_t dbCount;
SDataBase db[MAX_DB_COUNT]; SDataBase db[MAX_DB_COUNT];
// statistics // statistics
uint64_t totalInsertRows; uint64_t totalInsertRows;
uint64_t totalAffectedRows; uint64_t totalAffectedRows;
} SDbs; } SDbs;
typedef struct SpecifiedQueryInfo_S { typedef struct SpecifiedQueryInfo_S {
uint64_t queryInterval; // 0: unlimit > 0 loop/s uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint32_t concurrent; uint32_t concurrent;
int sqlCount; int sqlCount;
uint32_t asyncMode; // 0: sync, 1: async uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms uint64_t subscribeInterval; // ms
uint64_t queryTimes; uint64_t queryTimes;
bool subscribeRestart; bool subscribeRestart;
int subscribeKeepProgress; int subscribeKeepProgress;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN];
int resubAfterConsume[MAX_QUERY_SQL_COUNT]; int resubAfterConsume[MAX_QUERY_SQL_COUNT];
int endAfterConsume[MAX_QUERY_SQL_COUNT]; int endAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char topic[MAX_QUERY_SQL_COUNT][32]; char topic[MAX_QUERY_SQL_COUNT][32];
int consumed[MAX_QUERY_SQL_COUNT]; int consumed[MAX_QUERY_SQL_COUNT];
TAOS_RES* res[MAX_QUERY_SQL_COUNT]; TAOS_RES* res[MAX_QUERY_SQL_COUNT];
uint64_t totalQueried; uint64_t totalQueried;
} SpecifiedQueryInfo; } SpecifiedQueryInfo;
typedef struct SuperQueryInfo_S { typedef struct SuperQueryInfo_S {
char sTblName[TSDB_TABLE_NAME_LEN]; char sTblName[TSDB_TABLE_NAME_LEN];
uint64_t queryInterval; // 0: unlimit > 0 loop/s uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint32_t threadCnt; uint32_t threadCnt;
uint32_t asyncMode; // 0: sync, 1: async uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms uint64_t subscribeInterval; // ms
bool subscribeRestart; bool subscribeRestart;
int subscribeKeepProgress; int subscribeKeepProgress;
uint64_t queryTimes; uint64_t queryTimes;
int64_t childTblCount; int64_t childTblCount;
char childTblPrefix[TSDB_TABLE_NAME_LEN - 20]; // 20 characters reserved for seq char childTblPrefix[TSDB_TABLE_NAME_LEN - 20]; // 20 characters reserved for seq
int sqlCount; int sqlCount;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN];
int resubAfterConsume; int resubAfterConsume;
int endAfterConsume; int endAfterConsume;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char* childTblName; char* childTblName;
uint64_t totalQueried; uint64_t totalQueried;
} SuperQueryInfo; } SuperQueryInfo;
typedef struct SQueryMetaInfo_S { typedef struct SQueryMetaInfo_S {
char cfgDir[MAX_FILE_NAME_LEN]; char cfgDir[MAX_FILE_NAME_LEN];
char host[MAX_HOSTNAME_SIZE]; char host[MAX_HOSTNAME_SIZE];
uint16_t port; uint16_t port;
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
char user[MAX_USERNAME_SIZE]; char user[MAX_USERNAME_SIZE];
char password[MAX_PASSWORD_SIZE]; char password[MAX_PASSWORD_SIZE];
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
char queryMode[MAX_TB_NAME_SIZE]; // taosc, rest char queryMode[MAX_TB_NAME_SIZE]; // taosc, rest
SpecifiedQueryInfo specifiedQueryInfo; SpecifiedQueryInfo specifiedQueryInfo;
SuperQueryInfo superQueryInfo; SuperQueryInfo superQueryInfo;
uint64_t totalQueried; uint64_t totalQueried;
} SQueryMetaInfo; } SQueryMetaInfo;
typedef struct SThreadInfo_S { typedef struct SThreadInfo_S {
...@@ -1514,7 +1515,10 @@ static int printfInsertMeta() { ...@@ -1514,7 +1515,10 @@ static int printfInsertMeta() {
} }
if (g_Dbs.db[i].dbCfg.precision[0] != 0) { if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { #if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))
#endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ns", 2))) {
printf(" precision: \033[33m%s\033[0m\n", printf(" precision: \033[33m%s\033[0m\n",
g_Dbs.db[i].dbCfg.precision); g_Dbs.db[i].dbCfg.precision);
} else { } else {
...@@ -1704,6 +1708,9 @@ static void printfInsertMetaToFile(FILE* fp) { ...@@ -1704,6 +1708,9 @@ static void printfInsertMetaToFile(FILE* fp) {
} }
if (g_Dbs.db[i].dbCfg.precision[0] != 0) { if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
#if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ns", 2))
#endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
fprintf(fp, " precision: %s\n", fprintf(fp, " precision: %s\n",
g_Dbs.db[i].dbCfg.precision); g_Dbs.db[i].dbCfg.precision);
...@@ -1904,10 +1911,12 @@ static void printfQueryMeta() { ...@@ -1904,10 +1911,12 @@ static void printfQueryMeta() {
static char* formatTimestamp(char* buf, int64_t val, int precision) { static char* formatTimestamp(char* buf, int64_t val, int precision) {
time_t tt; time_t tt;
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000000);
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000); tt = (time_t)(val / 1000000);
#if NANO_SECOND_ENABLED == 1
} if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
#endif
} else { } else {
tt = (time_t)(val / 1000); tt = (time_t)(val / 1000);
} }
...@@ -1927,10 +1936,12 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { ...@@ -1927,10 +1936,12 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
struct tm* ptm = localtime(&tt); struct tm* ptm = localtime(&tt);
size_t pos = strftime(buf, 32, "%Y-%m-%d %H:%M:%S", ptm); size_t pos = strftime(buf, 32, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%06d", (int)(val % 1000000)); sprintf(buf + pos, ".%06d", (int)(val % 1000000));
#if NANO_SECOND_ENABLED == 1
} else if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
#endif
} else { } else {
sprintf(buf + pos, ".%03d", (int)(val % 1000)); sprintf(buf + pos, ".%03d", (int)(val % 1000));
} }
...@@ -2876,134 +2887,138 @@ static int createSuperTable( ...@@ -2876,134 +2887,138 @@ static int createSuperTable(
} }
static int createDatabasesAndStables() { static int createDatabasesAndStables() {
TAOS * taos = NULL; TAOS * taos = NULL;
int ret = 0; int ret = 0;
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port); taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port);
if (taos == NULL) { if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
return -1;
}
char command[BUFFER_SIZE] = "\0";
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].drop) {
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
taos_close(taos);
return -1; return -1;
} }
char command[BUFFER_SIZE] = "\0";
int dataLen = 0; for (int i = 0; i < g_Dbs.dbCount; i++) {
dataLen += snprintf(command + dataLen, if (g_Dbs.db[i].drop) {
BUFFER_SIZE - dataLen, "create database if not exists %s", g_Dbs.db[i].dbName); sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
taos_close(taos);
return -1;
}
if (g_Dbs.db[i].dbCfg.blocks > 0) { int dataLen = 0;
dataLen += snprintf(command + dataLen, dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " blocks %d", g_Dbs.db[i].dbCfg.blocks); BUFFER_SIZE - dataLen, "create database if not exists %s", g_Dbs.db[i].dbName);
}
if (g_Dbs.db[i].dbCfg.cache > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cache %d", g_Dbs.db[i].dbCfg.cache);
}
if (g_Dbs.db[i].dbCfg.days > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " days %d", g_Dbs.db[i].dbCfg.days);
}
if (g_Dbs.db[i].dbCfg.keep > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " keep %d", g_Dbs.db[i].dbCfg.keep);
}
if (g_Dbs.db[i].dbCfg.quorum > 1) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " quorum %d", g_Dbs.db[i].dbCfg.quorum);
}
if (g_Dbs.db[i].dbCfg.replica > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " replica %d", g_Dbs.db[i].dbCfg.replica);
}
if (g_Dbs.db[i].dbCfg.update > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " update %d", g_Dbs.db[i].dbCfg.update);
}
//if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) {
// dataLen += snprintf(command + dataLen,
// BUFFER_SIZE - dataLen, "tables %d ", g_Dbs.db[i].dbCfg.maxtablesPerVnode);
//}
if (g_Dbs.db[i].dbCfg.minRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " minrows %d", g_Dbs.db[i].dbCfg.minRows);
}
if (g_Dbs.db[i].dbCfg.maxRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " maxrows %d", g_Dbs.db[i].dbCfg.maxRows);
}
if (g_Dbs.db[i].dbCfg.comp > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " comp %d", g_Dbs.db[i].dbCfg.comp);
}
if (g_Dbs.db[i].dbCfg.walLevel > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " wal %d", g_Dbs.db[i].dbCfg.walLevel);
}
if (g_Dbs.db[i].dbCfg.cacheLast > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cachelast %d", g_Dbs.db[i].dbCfg.cacheLast);
}
if (g_Dbs.db[i].dbCfg.fsync > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" fsync %d", g_Dbs.db[i].dbCfg.fsync);
}
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms")))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"us", strlen("us")))) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
}
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) { if (g_Dbs.db[i].dbCfg.blocks > 0) {
taos_close(taos); dataLen += snprintf(command + dataLen,
errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName); BUFFER_SIZE - dataLen, " blocks %d", g_Dbs.db[i].dbCfg.blocks);
return -1; }
} if (g_Dbs.db[i].dbCfg.cache > 0) {
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); dataLen += snprintf(command + dataLen,
} BUFFER_SIZE - dataLen, " cache %d", g_Dbs.db[i].dbCfg.cache);
}
if (g_Dbs.db[i].dbCfg.days > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " days %d", g_Dbs.db[i].dbCfg.days);
}
if (g_Dbs.db[i].dbCfg.keep > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " keep %d", g_Dbs.db[i].dbCfg.keep);
}
if (g_Dbs.db[i].dbCfg.quorum > 1) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " quorum %d", g_Dbs.db[i].dbCfg.quorum);
}
if (g_Dbs.db[i].dbCfg.replica > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " replica %d", g_Dbs.db[i].dbCfg.replica);
}
if (g_Dbs.db[i].dbCfg.update > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " update %d", g_Dbs.db[i].dbCfg.update);
}
//if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) {
// dataLen += snprintf(command + dataLen,
// BUFFER_SIZE - dataLen, "tables %d ", g_Dbs.db[i].dbCfg.maxtablesPerVnode);
//}
if (g_Dbs.db[i].dbCfg.minRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " minrows %d", g_Dbs.db[i].dbCfg.minRows);
}
if (g_Dbs.db[i].dbCfg.maxRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " maxrows %d", g_Dbs.db[i].dbCfg.maxRows);
}
if (g_Dbs.db[i].dbCfg.comp > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " comp %d", g_Dbs.db[i].dbCfg.comp);
}
if (g_Dbs.db[i].dbCfg.walLevel > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " wal %d", g_Dbs.db[i].dbCfg.walLevel);
}
if (g_Dbs.db[i].dbCfg.cacheLast > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cachelast %d", g_Dbs.db[i].dbCfg.cacheLast);
}
if (g_Dbs.db[i].dbCfg.fsync > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" fsync %d", g_Dbs.db[i].dbCfg.fsync);
}
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms")))
#if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"ns", strlen("ns")))
#endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"us", strlen("us")))) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
}
debugPrint("%s() LN%d supertbl count:%"PRIu64"\n", if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
__func__, __LINE__, g_Dbs.db[i].superTblCount); taos_close(taos);
errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
return -1;
}
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
}
int validStbCount = 0; debugPrint("%s() LN%d supertbl count:%"PRIu64"\n",
__func__, __LINE__, g_Dbs.db[i].superTblCount);
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { int validStbCount = 0;
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].sTblName);
ret = queryDbExec(taos, command, NO_INSERT_TYPE, true);
if ((ret != 0) || (g_Dbs.db[i].drop)) { for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
ret = createSuperTable(taos, g_Dbs.db[i].dbName, sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
&g_Dbs.db[i].superTbls[j]); g_Dbs.db[i].superTbls[j].sTblName);
ret = queryDbExec(taos, command, NO_INSERT_TYPE, true);
if (0 != ret) { if ((ret != 0) || (g_Dbs.db[i].drop)) {
errorPrint("create super table %"PRIu64" failed!\n\n", j); ret = createSuperTable(taos, g_Dbs.db[i].dbName,
continue; &g_Dbs.db[i].superTbls[j]);
}
}
ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName, if (0 != ret) {
&g_Dbs.db[i].superTbls[j]); errorPrint("create super table %"PRIu64" failed!\n\n", j);
if (0 != ret) { continue;
errorPrint("\nget super table %s.%s info failed!\n\n", }
g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); }
continue;
}
validStbCount ++; ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName,
} &g_Dbs.db[i].superTbls[j]);
if (0 != ret) {
errorPrint("\nget super table %s.%s info failed!\n\n",
g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName);
continue;
}
g_Dbs.db[i].superTblCount = validStbCount; validStbCount ++;
} }
taos_close(taos); g_Dbs.db[i].superTblCount = validStbCount;
return 0; }
taos_close(taos);
return 0;
} }
static void* createTable(void *sarg) static void* createTable(void *sarg)
...@@ -3531,733 +3546,733 @@ PARSE_OVER: ...@@ -3531,733 +3546,733 @@ PARSE_OVER:
} }
static bool getMetaFromInsertJsonFile(cJSON* root) { static bool getMetaFromInsertJsonFile(cJSON* root) {
bool ret = false; bool ret = false;
cJSON* cfgdir = cJSON_GetObjectItem(root, "cfgdir");
if (cfgdir && cfgdir->type == cJSON_String && cfgdir->valuestring != NULL) {
tstrncpy(g_Dbs.cfgDir, cfgdir->valuestring, MAX_FILE_NAME_LEN);
}
cJSON* host = cJSON_GetObjectItem(root, "host");
if (host && host->type == cJSON_String && host->valuestring != NULL) {
tstrncpy(g_Dbs.host, host->valuestring, MAX_HOSTNAME_SIZE);
} else if (!host) {
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
} else {
printf("ERROR: failed to read json, host not found\n");
goto PARSE_OVER;
}
cJSON* port = cJSON_GetObjectItem(root, "port");
if (port && port->type == cJSON_Number) {
g_Dbs.port = port->valueint;
} else if (!port) {
g_Dbs.port = 6030;
}
cJSON* user = cJSON_GetObjectItem(root, "user");
if (user && user->type == cJSON_String && user->valuestring != NULL) {
tstrncpy(g_Dbs.user, user->valuestring, MAX_USERNAME_SIZE);
} else if (!user) {
tstrncpy(g_Dbs.user, "root", MAX_USERNAME_SIZE);
}
cJSON* password = cJSON_GetObjectItem(root, "password");
if (password && password->type == cJSON_String && password->valuestring != NULL) {
tstrncpy(g_Dbs.password, password->valuestring, MAX_PASSWORD_SIZE);
} else if (!password) {
tstrncpy(g_Dbs.password, "taosdata", MAX_PASSWORD_SIZE);
}
cJSON* resultfile = cJSON_GetObjectItem(root, "result_file");
if (resultfile && resultfile->type == cJSON_String && resultfile->valuestring != NULL) {
tstrncpy(g_Dbs.resultFile, resultfile->valuestring, MAX_FILE_NAME_LEN);
} else if (!resultfile) {
tstrncpy(g_Dbs.resultFile, "./insert_res.txt", MAX_FILE_NAME_LEN);
}
cJSON* threads = cJSON_GetObjectItem(root, "thread_count");
if (threads && threads->type == cJSON_Number) {
g_Dbs.threadCount = threads->valueint;
} else if (!threads) {
g_Dbs.threadCount = 1;
} else {
printf("ERROR: failed to read json, threads not found\n");
goto PARSE_OVER;
}
cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl");
if (threads2 && threads2->type == cJSON_Number) {
g_Dbs.threadCountByCreateTbl = threads2->valueint;
} else if (!threads2) {
g_Dbs.threadCountByCreateTbl = 1;
} else {
errorPrint("%s() LN%d, failed to read json, threads2 not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* gInsertInterval = cJSON_GetObjectItem(root, "insert_interval");
if (gInsertInterval && gInsertInterval->type == cJSON_Number) {
if (gInsertInterval->valueint <0) {
errorPrint("%s() LN%d, failed to read json, insert interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_args.insert_interval = gInsertInterval->valueint;
} else if (!gInsertInterval) {
g_args.insert_interval = 0;
} else {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* interlaceRows = cJSON_GetObjectItem(root, "interlace_rows");
if (interlaceRows && interlaceRows->type == cJSON_Number) {
if (interlaceRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
cJSON* cfgdir = cJSON_GetObjectItem(root, "cfgdir");
if (cfgdir && cfgdir->type == cJSON_String && cfgdir->valuestring != NULL) {
tstrncpy(g_Dbs.cfgDir, cfgdir->valuestring, MAX_FILE_NAME_LEN);
} }
g_args.interlace_rows = interlaceRows->valueint;
} else if (!interlaceRows) {
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len"); cJSON* host = cJSON_GetObjectItem(root, "host");
if (maxSqlLen && maxSqlLen->type == cJSON_Number) { if (host && host->type == cJSON_String && host->valuestring != NULL) {
if (maxSqlLen->valueint < 0) { tstrncpy(g_Dbs.host, host->valuestring, MAX_HOSTNAME_SIZE);
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n", } else if (!host) {
__func__, __LINE__); tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
goto PARSE_OVER;
}
g_args.max_sql_len = maxSqlLen->valueint;
} else if (!maxSqlLen) {
g_args.max_sql_len = (1024*1024);
} else {
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req");
if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
if (numRecPerReq->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
} else if (numRecPerReq->valueint > MAX_RECORDS_PER_REQ) {
printf("NOTICE: number of records per request value %"PRIu64" > %d\n\n",
numRecPerReq->valueint, MAX_RECORDS_PER_REQ);
printf(" number of records per request value will be set to %d\n\n",
MAX_RECORDS_PER_REQ);
prompt();
numRecPerReq->valueint = MAX_RECORDS_PER_REQ;
}
g_args.num_of_RPR = numRecPerReq->valueint;
} else if (!numRecPerReq) {
g_args.num_of_RPR = MAX_RECORDS_PER_REQ;
} else {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
if (answerPrompt
&& answerPrompt->type == cJSON_String
&& answerPrompt->valuestring != NULL) {
if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) {
g_args.answer_yes = false;
} else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) {
g_args.answer_yes = true;
} else { } else {
g_args.answer_yes = false; printf("ERROR: failed to read json, host not found\n");
goto PARSE_OVER;
} }
} else if (!answerPrompt) {
g_args.answer_yes = true; // default is no, mean answer_yes.
} else {
errorPrint("%s", "failed to read json, confirm_parameter_prompt input mistake\n");
goto PARSE_OVER;
}
// rows per table need be less than insert batch cJSON* port = cJSON_GetObjectItem(root, "port");
if (g_args.interlace_rows > g_args.num_of_RPR) { if (port && port->type == cJSON_Number) {
printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n", g_Dbs.port = port->valueint;
g_args.interlace_rows, g_args.num_of_RPR); } else if (!port) {
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n", g_Dbs.port = 6030;
g_args.num_of_RPR);
prompt();
g_args.interlace_rows = g_args.num_of_RPR;
}
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (!dbs || dbs->type != cJSON_Array) {
printf("ERROR: failed to read json, databases not found\n");
goto PARSE_OVER;
}
int dbSize = cJSON_GetArraySize(dbs);
if (dbSize > MAX_DB_COUNT) {
errorPrint(
"ERROR: failed to read json, databases size overflow, max database is %d\n",
MAX_DB_COUNT);
goto PARSE_OVER;
}
g_Dbs.dbCount = dbSize;
for (int i = 0; i < dbSize; ++i) {
cJSON* dbinfos = cJSON_GetArrayItem(dbs, i);
if (dbinfos == NULL) continue;
// dbinfo
cJSON *dbinfo = cJSON_GetObjectItem(dbinfos, "dbinfo");
if (!dbinfo || dbinfo->type != cJSON_Object) {
printf("ERROR: failed to read json, dbinfo not found\n");
goto PARSE_OVER;
} }
cJSON *dbName = cJSON_GetObjectItem(dbinfo, "name"); cJSON* user = cJSON_GetObjectItem(root, "user");
if (!dbName || dbName->type != cJSON_String || dbName->valuestring == NULL) { if (user && user->type == cJSON_String && user->valuestring != NULL) {
printf("ERROR: failed to read json, db name not found\n"); tstrncpy(g_Dbs.user, user->valuestring, MAX_USERNAME_SIZE);
goto PARSE_OVER; } else if (!user) {
tstrncpy(g_Dbs.user, "root", MAX_USERNAME_SIZE);
} }
tstrncpy(g_Dbs.db[i].dbName, dbName->valuestring, TSDB_DB_NAME_LEN);
cJSON *drop = cJSON_GetObjectItem(dbinfo, "drop"); cJSON* password = cJSON_GetObjectItem(root, "password");
if (drop && drop->type == cJSON_String && drop->valuestring != NULL) { if (password && password->type == cJSON_String && password->valuestring != NULL) {
if (0 == strncasecmp(drop->valuestring, "yes", strlen("yes"))) { tstrncpy(g_Dbs.password, password->valuestring, MAX_PASSWORD_SIZE);
g_Dbs.db[i].drop = true; } else if (!password) {
} else { tstrncpy(g_Dbs.password, "taosdata", MAX_PASSWORD_SIZE);
g_Dbs.db[i].drop = false;
}
} else if (!drop) {
g_Dbs.db[i].drop = g_args.drop_database;
} else {
errorPrint("%s() LN%d, failed to read json, drop input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
} }
cJSON *precision = cJSON_GetObjectItem(dbinfo, "precision"); cJSON* resultfile = cJSON_GetObjectItem(root, "result_file");
if (precision && precision->type == cJSON_String if (resultfile && resultfile->type == cJSON_String && resultfile->valuestring != NULL) {
&& precision->valuestring != NULL) { tstrncpy(g_Dbs.resultFile, resultfile->valuestring, MAX_FILE_NAME_LEN);
tstrncpy(g_Dbs.db[i].dbCfg.precision, precision->valuestring, } else if (!resultfile) {
8); tstrncpy(g_Dbs.resultFile, "./insert_res.txt", MAX_FILE_NAME_LEN);
} else if (!precision) {
memset(g_Dbs.db[i].dbCfg.precision, 0, 8);
} else {
printf("ERROR: failed to read json, precision not found\n");
goto PARSE_OVER;
} }
cJSON* update = cJSON_GetObjectItem(dbinfo, "update"); cJSON* threads = cJSON_GetObjectItem(root, "thread_count");
if (update && update->type == cJSON_Number) { if (threads && threads->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.update = update->valueint; g_Dbs.threadCount = threads->valueint;
} else if (!update) { } else if (!threads) {
g_Dbs.db[i].dbCfg.update = -1; g_Dbs.threadCount = 1;
} else {
printf("ERROR: failed to read json, update not found\n");
goto PARSE_OVER;
}
cJSON* replica = cJSON_GetObjectItem(dbinfo, "replica");
if (replica && replica->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.replica = replica->valueint;
} else if (!replica) {
g_Dbs.db[i].dbCfg.replica = -1;
} else {
printf("ERROR: failed to read json, replica not found\n");
goto PARSE_OVER;
}
cJSON* keep = cJSON_GetObjectItem(dbinfo, "keep");
if (keep && keep->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.keep = keep->valueint;
} else if (!keep) {
g_Dbs.db[i].dbCfg.keep = -1;
} else { } else {
printf("ERROR: failed to read json, keep not found\n"); printf("ERROR: failed to read json, threads not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* days = cJSON_GetObjectItem(dbinfo, "days"); cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl");
if (days && days->type == cJSON_Number) { if (threads2 && threads2->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.days = days->valueint; g_Dbs.threadCountByCreateTbl = threads2->valueint;
} else if (!days) { } else if (!threads2) {
g_Dbs.db[i].dbCfg.days = -1; g_Dbs.threadCountByCreateTbl = 1;
} else { } else {
printf("ERROR: failed to read json, days not found\n"); errorPrint("%s() LN%d, failed to read json, threads2 not found\n",
goto PARSE_OVER; __func__, __LINE__);
goto PARSE_OVER;
} }
cJSON* cache = cJSON_GetObjectItem(dbinfo, "cache"); cJSON* gInsertInterval = cJSON_GetObjectItem(root, "insert_interval");
if (cache && cache->type == cJSON_Number) { if (gInsertInterval && gInsertInterval->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.cache = cache->valueint; if (gInsertInterval->valueint <0) {
} else if (!cache) { errorPrint("%s() LN%d, failed to read json, insert interval input mistake\n",
g_Dbs.db[i].dbCfg.cache = -1; __func__, __LINE__);
goto PARSE_OVER;
}
g_args.insert_interval = gInsertInterval->valueint;
} else if (!gInsertInterval) {
g_args.insert_interval = 0;
} else { } else {
printf("ERROR: failed to read json, cache not found\n"); errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
goto PARSE_OVER; __func__, __LINE__);
goto PARSE_OVER;
} }
cJSON* blocks= cJSON_GetObjectItem(dbinfo, "blocks"); cJSON* interlaceRows = cJSON_GetObjectItem(root, "interlace_rows");
if (blocks && blocks->type == cJSON_Number) { if (interlaceRows && interlaceRows->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.blocks = blocks->valueint; if (interlaceRows->valueint < 0) {
} else if (!blocks) { errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
g_Dbs.db[i].dbCfg.blocks = -1; __func__, __LINE__);
} else { goto PARSE_OVER;
printf("ERROR: failed to read json, block not found\n");
goto PARSE_OVER;
}
//cJSON* maxtablesPerVnode= cJSON_GetObjectItem(dbinfo, "maxtablesPerVnode");
//if (maxtablesPerVnode && maxtablesPerVnode->type == cJSON_Number) {
// g_Dbs.db[i].dbCfg.maxtablesPerVnode = maxtablesPerVnode->valueint;
//} else if (!maxtablesPerVnode) {
// g_Dbs.db[i].dbCfg.maxtablesPerVnode = TSDB_DEFAULT_TABLES;
//} else {
// printf("failed to read json, maxtablesPerVnode not found");
// goto PARSE_OVER;
//}
cJSON* minRows= cJSON_GetObjectItem(dbinfo, "minRows");
if (minRows && minRows->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.minRows = minRows->valueint;
} else if (!minRows) {
g_Dbs.db[i].dbCfg.minRows = 0; // 0 means default
} else {
printf("ERROR: failed to read json, minRows not found\n");
goto PARSE_OVER;
}
cJSON* maxRows= cJSON_GetObjectItem(dbinfo, "maxRows"); }
if (maxRows && maxRows->type == cJSON_Number) { g_args.interlace_rows = interlaceRows->valueint;
g_Dbs.db[i].dbCfg.maxRows = maxRows->valueint; } else if (!interlaceRows) {
} else if (!maxRows) { g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
g_Dbs.db[i].dbCfg.maxRows = 0; // 0 means default
} else { } else {
printf("ERROR: failed to read json, maxRows not found\n"); errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
goto PARSE_OVER; __func__, __LINE__);
goto PARSE_OVER;
} }
cJSON* comp= cJSON_GetObjectItem(dbinfo, "comp"); cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len");
if (comp && comp->type == cJSON_Number) { if (maxSqlLen && maxSqlLen->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.comp = comp->valueint; if (maxSqlLen->valueint < 0) {
} else if (!comp) { errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n",
g_Dbs.db[i].dbCfg.comp = -1; __func__, __LINE__);
goto PARSE_OVER;
}
g_args.max_sql_len = maxSqlLen->valueint;
} else if (!maxSqlLen) {
g_args.max_sql_len = (1024*1024);
} else { } else {
printf("ERROR: failed to read json, comp not found\n"); errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n",
goto PARSE_OVER; __func__, __LINE__);
goto PARSE_OVER;
} }
cJSON* walLevel= cJSON_GetObjectItem(dbinfo, "walLevel"); cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req");
if (walLevel && walLevel->type == cJSON_Number) { if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.walLevel = walLevel->valueint; if (numRecPerReq->valueint <= 0) {
} else if (!walLevel) { errorPrint("%s() LN%d, failed to read json, num_of_records_per_req input mistake\n",
g_Dbs.db[i].dbCfg.walLevel = -1; __func__, __LINE__);
goto PARSE_OVER;
} else if (numRecPerReq->valueint > MAX_RECORDS_PER_REQ) {
printf("NOTICE: number of records per request value %"PRIu64" > %d\n\n",
numRecPerReq->valueint, MAX_RECORDS_PER_REQ);
printf(" number of records per request value will be set to %d\n\n",
MAX_RECORDS_PER_REQ);
prompt();
numRecPerReq->valueint = MAX_RECORDS_PER_REQ;
}
g_args.num_of_RPR = numRecPerReq->valueint;
} else if (!numRecPerReq) {
g_args.num_of_RPR = MAX_RECORDS_PER_REQ;
} else { } else {
printf("ERROR: failed to read json, walLevel not found\n"); errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n",
goto PARSE_OVER; __func__, __LINE__);
goto PARSE_OVER;
} }
cJSON* cacheLast= cJSON_GetObjectItem(dbinfo, "cachelast"); cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
if (cacheLast && cacheLast->type == cJSON_Number) { if (answerPrompt
g_Dbs.db[i].dbCfg.cacheLast = cacheLast->valueint; && answerPrompt->type == cJSON_String
} else if (!cacheLast) { && answerPrompt->valuestring != NULL) {
g_Dbs.db[i].dbCfg.cacheLast = -1; if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) {
g_args.answer_yes = false;
} else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) {
g_args.answer_yes = true;
} else {
g_args.answer_yes = false;
}
} else if (!answerPrompt) {
g_args.answer_yes = true; // default is no, mean answer_yes.
} else { } else {
printf("ERROR: failed to read json, cacheLast not found\n"); errorPrint("%s", "failed to read json, confirm_parameter_prompt input mistake\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* quorum= cJSON_GetObjectItem(dbinfo, "quorum"); // rows per table need be less than insert batch
if (quorum && quorum->type == cJSON_Number) { if (g_args.interlace_rows > g_args.num_of_RPR) {
g_Dbs.db[i].dbCfg.quorum = quorum->valueint; printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n",
} else if (!quorum) { g_args.interlace_rows, g_args.num_of_RPR);
g_Dbs.db[i].dbCfg.quorum = 1; printf(" interlace rows value will be set to num_of_records_per_req %u\n\n",
} else { g_args.num_of_RPR);
printf("failed to read json, quorum input mistake"); prompt();
goto PARSE_OVER; g_args.interlace_rows = g_args.num_of_RPR;
} }
cJSON* fsync= cJSON_GetObjectItem(dbinfo, "fsync"); cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (fsync && fsync->type == cJSON_Number) { if (!dbs || dbs->type != cJSON_Array) {
g_Dbs.db[i].dbCfg.fsync = fsync->valueint; printf("ERROR: failed to read json, databases not found\n");
} else if (!fsync) { goto PARSE_OVER;
g_Dbs.db[i].dbCfg.fsync = -1;
} else {
errorPrint("%s() LN%d, failed to read json, fsync input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
} }
// super_talbes int dbSize = cJSON_GetArraySize(dbs);
cJSON *stables = cJSON_GetObjectItem(dbinfos, "super_tables"); if (dbSize > MAX_DB_COUNT) {
if (!stables || stables->type != cJSON_Array) { errorPrint(
errorPrint("%s() LN%d, failed to read json, super_tables not found\n", "ERROR: failed to read json, databases size overflow, max database is %d\n",
__func__, __LINE__); MAX_DB_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
} }
int stbSize = cJSON_GetArraySize(stables); g_Dbs.dbCount = dbSize;
if (stbSize > MAX_SUPER_TABLE_COUNT) { for (int i = 0; i < dbSize; ++i) {
errorPrint( cJSON* dbinfos = cJSON_GetArrayItem(dbs, i);
"%s() LN%d, failed to read json, supertable size overflow, max supertable is %d\n", if (dbinfos == NULL) continue;
__func__, __LINE__, MAX_SUPER_TABLE_COUNT);
goto PARSE_OVER;
}
g_Dbs.db[i].superTblCount = stbSize; // dbinfo
for (int j = 0; j < stbSize; ++j) { cJSON *dbinfo = cJSON_GetObjectItem(dbinfos, "dbinfo");
cJSON* stbInfo = cJSON_GetArrayItem(stables, j); if (!dbinfo || dbinfo->type != cJSON_Object) {
if (stbInfo == NULL) continue; printf("ERROR: failed to read json, dbinfo not found\n");
goto PARSE_OVER;
}
// dbinfo cJSON *dbName = cJSON_GetObjectItem(dbinfo, "name");
cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name"); if (!dbName || dbName->type != cJSON_String || dbName->valuestring == NULL) {
if (!stbName || stbName->type != cJSON_String printf("ERROR: failed to read json, db name not found\n");
|| stbName->valuestring == NULL) { goto PARSE_OVER;
errorPrint("%s() LN%d, failed to read json, stb name not found\n", }
__func__, __LINE__); tstrncpy(g_Dbs.db[i].dbName, dbName->valuestring, TSDB_DB_NAME_LEN);
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring,
TSDB_TABLE_NAME_LEN);
cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix"); cJSON *drop = cJSON_GetObjectItem(dbinfo, "drop");
if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) { if (drop && drop->type == cJSON_String && drop->valuestring != NULL) {
printf("ERROR: failed to read json, childtable_prefix not found\n"); if (0 == strncasecmp(drop->valuestring, "yes", strlen("yes"))) {
goto PARSE_OVER; g_Dbs.db[i].drop = true;
} } else {
tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, g_Dbs.db[i].drop = false;
TSDB_TABLE_NAME_LEN - 20); }
} else if (!drop) {
cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table"); g_Dbs.db[i].drop = g_args.drop_database;
if (autoCreateTbl } else {
&& autoCreateTbl->type == cJSON_String errorPrint("%s() LN%d, failed to read json, drop input mistake\n",
&& autoCreateTbl->valuestring != NULL) { __func__, __LINE__);
if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) goto PARSE_OVER;
&& (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) { }
g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL;
} else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} else {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
}
} else if (!autoCreateTbl) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} else {
printf("ERROR: failed to read json, auto_create_table not found\n");
goto PARSE_OVER;
}
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num"); cJSON *precision = cJSON_GetObjectItem(dbinfo, "precision");
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) { if (precision && precision->type == cJSON_String
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint; && precision->valuestring != NULL) {
} else if (!batchCreateTbl) { tstrncpy(g_Dbs.db[i].dbCfg.precision, precision->valuestring,
g_Dbs.db[i].superTbls[j].batchCreateTableNum = 1000; 8);
} else { } else if (!precision) {
printf("ERROR: failed to read json, batch_create_tbl_num not found\n"); memset(g_Dbs.db[i].dbCfg.precision, 0, 8);
goto PARSE_OVER; } else {
} printf("ERROR: failed to read json, precision not found\n");
goto PARSE_OVER;
}
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no cJSON* update = cJSON_GetObjectItem(dbinfo, "update");
if (childTblExists if (update && update->type == cJSON_Number) {
&& childTblExists->type == cJSON_String g_Dbs.db[i].dbCfg.update = update->valueint;
&& childTblExists->valuestring != NULL) { } else if (!update) {
if ((0 == strncasecmp(childTblExists->valuestring, "yes", 3)) g_Dbs.db[i].dbCfg.update = -1;
&& (g_Dbs.db[i].drop == false)) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_ALREADY_EXISTS;
} else if ((0 == strncasecmp(childTblExists->valuestring, "no", 2)
|| (g_Dbs.db[i].drop == true))) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
} else { } else {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS; printf("ERROR: failed to read json, update not found\n");
goto PARSE_OVER;
} }
} else if (!childTblExists) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
} else {
errorPrint("%s() LN%d, failed to read json, child_table_exists not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) { cJSON* replica = cJSON_GetObjectItem(dbinfo, "replica");
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; if (replica && replica->type == cJSON_Number) {
} g_Dbs.db[i].dbCfg.replica = replica->valueint;
} else if (!replica) {
g_Dbs.db[i].dbCfg.replica = -1;
} else {
printf("ERROR: failed to read json, replica not found\n");
goto PARSE_OVER;
}
cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count"); cJSON* keep = cJSON_GetObjectItem(dbinfo, "keep");
if (!count || count->type != cJSON_Number || 0 >= count->valueint) { if (keep && keep->type == cJSON_Number) {
errorPrint("%s() LN%d, failed to read json, childtable_count input mistake\n", g_Dbs.db[i].dbCfg.keep = keep->valueint;
__func__, __LINE__); } else if (!keep) {
goto PARSE_OVER; g_Dbs.db[i].dbCfg.keep = -1;
} } else {
g_Dbs.db[i].superTbls[j].childTblCount = count->valueint; printf("ERROR: failed to read json, keep not found\n");
goto PARSE_OVER;
cJSON *dataSource = cJSON_GetObjectItem(stbInfo, "data_source"); }
if (dataSource && dataSource->type == cJSON_String
&& dataSource->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource,
dataSource->valuestring, TSDB_DB_NAME_LEN);
} else if (!dataSource) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", TSDB_DB_NAME_LEN);
} else {
errorPrint("%s() LN%d, failed to read json, data_source not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON *stbIface = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt cJSON* days = cJSON_GetObjectItem(dbinfo, "days");
if (stbIface && stbIface->type == cJSON_String if (days && days->type == cJSON_Number) {
&& stbIface->valuestring != NULL) { g_Dbs.db[i].dbCfg.days = days->valueint;
if (0 == strcasecmp(stbIface->valuestring, "taosc")) { } else if (!days) {
g_Dbs.db[i].superTbls[j].iface= TAOSC_IFACE; g_Dbs.db[i].dbCfg.days = -1;
} else if (0 == strcasecmp(stbIface->valuestring, "rest")) {
g_Dbs.db[i].superTbls[j].iface= REST_IFACE;
#if STMT_IFACE_ENABLED == 1
} else if (0 == strcasecmp(stbIface->valuestring, "stmt")) {
g_Dbs.db[i].superTbls[j].iface= STMT_IFACE;
#endif
} else { } else {
errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n", printf("ERROR: failed to read json, days not found\n");
__func__, __LINE__, stbIface->valuestring);
goto PARSE_OVER; goto PARSE_OVER;
} }
} else if (!stbIface) {
g_Dbs.db[i].superTbls[j].iface = TAOSC_IFACE;
} else {
errorPrint("%s", "failed to read json, insert_mode not found\n");
goto PARSE_OVER;
}
cJSON* childTbl_limit = cJSON_GetObjectItem(stbInfo, "childtable_limit"); cJSON* cache = cJSON_GetObjectItem(dbinfo, "cache");
if ((childTbl_limit) && (g_Dbs.db[i].drop != true) if (cache && cache->type == cJSON_Number) {
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) { g_Dbs.db[i].dbCfg.cache = cache->valueint;
if (childTbl_limit->type != cJSON_Number) { } else if (!cache) {
printf("ERROR: failed to read json, childtable_limit\n"); g_Dbs.db[i].dbCfg.cache = -1;
} else {
printf("ERROR: failed to read json, cache not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
g_Dbs.db[i].superTbls[j].childTblLimit = childTbl_limit->valueint;
} else {
g_Dbs.db[i].superTbls[j].childTblLimit = -1; // select ... limit -1 means all query result, drop = yes mean all table need recreate, limit value is invalid.
}
cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset"); cJSON* blocks= cJSON_GetObjectItem(dbinfo, "blocks");
if ((childTbl_offset) && (g_Dbs.db[i].drop != true) if (blocks && blocks->type == cJSON_Number) {
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) { g_Dbs.db[i].dbCfg.blocks = blocks->valueint;
if ((childTbl_offset->type != cJSON_Number) } else if (!blocks) {
|| (0 > childTbl_offset->valueint)) { g_Dbs.db[i].dbCfg.blocks = -1;
printf("ERROR: failed to read json, childtable_offset\n"); } else {
printf("ERROR: failed to read json, block not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
g_Dbs.db[i].superTbls[j].childTblOffset = childTbl_offset->valueint;
} else {
g_Dbs.db[i].superTbls[j].childTblOffset = 0;
}
cJSON *ts = cJSON_GetObjectItem(stbInfo, "start_timestamp"); //cJSON* maxtablesPerVnode= cJSON_GetObjectItem(dbinfo, "maxtablesPerVnode");
if (ts && ts->type == cJSON_String && ts->valuestring != NULL) { //if (maxtablesPerVnode && maxtablesPerVnode->type == cJSON_Number) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, // g_Dbs.db[i].dbCfg.maxtablesPerVnode = maxtablesPerVnode->valueint;
ts->valuestring, TSDB_DB_NAME_LEN); //} else if (!maxtablesPerVnode) {
} else if (!ts) { // g_Dbs.db[i].dbCfg.maxtablesPerVnode = TSDB_DEFAULT_TABLES;
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, //} else {
"now", TSDB_DB_NAME_LEN); // printf("failed to read json, maxtablesPerVnode not found");
} else { // goto PARSE_OVER;
printf("ERROR: failed to read json, start_timestamp not found\n"); //}
goto PARSE_OVER;
} cJSON* minRows= cJSON_GetObjectItem(dbinfo, "minRows");
if (minRows && minRows->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.minRows = minRows->valueint;
} else if (!minRows) {
g_Dbs.db[i].dbCfg.minRows = 0; // 0 means default
} else {
printf("ERROR: failed to read json, minRows not found\n");
goto PARSE_OVER;
}
cJSON* timestampStep = cJSON_GetObjectItem(stbInfo, "timestamp_step"); cJSON* maxRows= cJSON_GetObjectItem(dbinfo, "maxRows");
if (timestampStep && timestampStep->type == cJSON_Number) { if (maxRows && maxRows->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].timeStampStep = timestampStep->valueint; g_Dbs.db[i].dbCfg.maxRows = maxRows->valueint;
} else if (!timestampStep) { } else if (!maxRows) {
g_Dbs.db[i].superTbls[j].timeStampStep = DEFAULT_TIMESTAMP_STEP; g_Dbs.db[i].dbCfg.maxRows = 0; // 0 means default
} else { } else {
printf("ERROR: failed to read json, timestamp_step not found\n"); printf("ERROR: failed to read json, maxRows not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON *sampleFormat = cJSON_GetObjectItem(stbInfo, "sample_format"); cJSON* comp= cJSON_GetObjectItem(dbinfo, "comp");
if (sampleFormat && sampleFormat->type if (comp && comp->type == cJSON_Number) {
== cJSON_String && sampleFormat->valuestring != NULL) { g_Dbs.db[i].dbCfg.comp = comp->valueint;
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, } else if (!comp) {
sampleFormat->valuestring, TSDB_DB_NAME_LEN); g_Dbs.db[i].dbCfg.comp = -1;
} else if (!sampleFormat) { } else {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, "csv", TSDB_DB_NAME_LEN); printf("ERROR: failed to read json, comp not found\n");
} else { goto PARSE_OVER;
printf("ERROR: failed to read json, sample_format not found\n"); }
goto PARSE_OVER;
}
cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file"); cJSON* walLevel= cJSON_GetObjectItem(dbinfo, "walLevel");
if (sampleFile && sampleFile->type == cJSON_String if (walLevel && walLevel->type == cJSON_Number) {
&& sampleFile->valuestring != NULL) { g_Dbs.db[i].dbCfg.walLevel = walLevel->valueint;
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile, } else if (!walLevel) {
sampleFile->valuestring, MAX_FILE_NAME_LEN); g_Dbs.db[i].dbCfg.walLevel = -1;
} else if (!sampleFile) { } else {
memset(g_Dbs.db[i].superTbls[j].sampleFile, 0, MAX_FILE_NAME_LEN); printf("ERROR: failed to read json, walLevel not found\n");
} else { goto PARSE_OVER;
printf("ERROR: failed to read json, sample_file not found\n"); }
goto PARSE_OVER;
}
cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file"); cJSON* cacheLast= cJSON_GetObjectItem(dbinfo, "cachelast");
if ((tagsFile && tagsFile->type == cJSON_String) if (cacheLast && cacheLast->type == cJSON_Number) {
&& (tagsFile->valuestring != NULL)) { g_Dbs.db[i].dbCfg.cacheLast = cacheLast->valueint;
tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile, } else if (!cacheLast) {
tagsFile->valuestring, MAX_FILE_NAME_LEN); g_Dbs.db[i].dbCfg.cacheLast = -1;
if (0 == g_Dbs.db[i].superTbls[j].tagsFile[0]) {
g_Dbs.db[i].superTbls[j].tagSource = 0;
} else { } else {
g_Dbs.db[i].superTbls[j].tagSource = 1; printf("ERROR: failed to read json, cacheLast not found\n");
goto PARSE_OVER;
} }
} else if (!tagsFile) {
memset(g_Dbs.db[i].superTbls[j].tagsFile, 0, MAX_FILE_NAME_LEN);
g_Dbs.db[i].superTbls[j].tagSource = 0;
} else {
printf("ERROR: failed to read json, tags_file not found\n");
goto PARSE_OVER;
}
cJSON* stbMaxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len"); cJSON* quorum= cJSON_GetObjectItem(dbinfo, "quorum");
if (stbMaxSqlLen && stbMaxSqlLen->type == cJSON_Number) { if (quorum && quorum->type == cJSON_Number) {
int32_t len = stbMaxSqlLen->valueint; g_Dbs.db[i].dbCfg.quorum = quorum->valueint;
if (len > TSDB_MAX_ALLOWED_SQL_LEN) { } else if (!quorum) {
len = TSDB_MAX_ALLOWED_SQL_LEN; g_Dbs.db[i].dbCfg.quorum = 1;
} else if (len < 5) {
len = 5;
}
g_Dbs.db[i].superTbls[j].maxSqlLen = len;
} else if (!maxSqlLen) {
g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len;
} else {
errorPrint("%s() LN%d, failed to read json, stbMaxSqlLen input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
/*
cJSON *multiThreadWriteOneTbl =
cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes
if (multiThreadWriteOneTbl
&& multiThreadWriteOneTbl->type == cJSON_String
&& multiThreadWriteOneTbl->valuestring != NULL) {
if (0 == strncasecmp(multiThreadWriteOneTbl->valuestring, "yes", 3)) {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 1;
} else { } else {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0; printf("failed to read json, quorum input mistake");
goto PARSE_OVER;
} }
} else if (!multiThreadWriteOneTbl) {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0; cJSON* fsync= cJSON_GetObjectItem(dbinfo, "fsync");
} else { if (fsync && fsync->type == cJSON_Number) {
printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n"); g_Dbs.db[i].dbCfg.fsync = fsync->valueint;
goto PARSE_OVER; } else if (!fsync) {
} g_Dbs.db[i].dbCfg.fsync = -1;
*/ } else {
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); errorPrint("%s() LN%d, failed to read json, fsync input mistake\n",
if (insertRows && insertRows->type == cJSON_Number) { __func__, __LINE__);
if (insertRows->valueint < 0) { goto PARSE_OVER;
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
} }
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
} else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows"); // super_talbes
if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) { cJSON *stables = cJSON_GetObjectItem(dbinfos, "super_tables");
if (stbInterlaceRows->valueint < 0) { if (!stables || stables->type != cJSON_Array) {
errorPrint("%s() LN%d, failed to read json, interlace rows input mistake\n", errorPrint("%s() LN%d, failed to read json, super_tables not found\n",
__func__, __LINE__); __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_Dbs.db[i].superTbls[j].insertRows) { int stbSize = cJSON_GetArraySize(stables);
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > insert_rows %"PRId64"\n\n", if (stbSize > MAX_SUPER_TABLE_COUNT) {
i, j, g_Dbs.db[i].superTbls[j].interlaceRows, errorPrint(
g_Dbs.db[i].superTbls[j].insertRows); "%s() LN%d, failed to read json, supertable size overflow, max supertable is %d\n",
printf(" interlace rows value will be set to insert_rows %"PRId64"\n\n", __func__, __LINE__, MAX_SUPER_TABLE_COUNT);
g_Dbs.db[i].superTbls[j].insertRows); goto PARSE_OVER;
prompt();
g_Dbs.db[i].superTbls[j].interlaceRows = g_Dbs.db[i].superTbls[j].insertRows;
} }
} else if (!stbInterlaceRows) {
g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
errorPrint(
"%s() LN%d, failed to read json, interlace rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* disorderRatio = cJSON_GetObjectItem(stbInfo, "disorder_ratio"); g_Dbs.db[i].superTblCount = stbSize;
if (disorderRatio && disorderRatio->type == cJSON_Number) { for (int j = 0; j < stbSize; ++j) {
if (disorderRatio->valueint > 50) cJSON* stbInfo = cJSON_GetArrayItem(stables, j);
disorderRatio->valueint = 50; if (stbInfo == NULL) continue;
if (disorderRatio->valueint < 0) // dbinfo
disorderRatio->valueint = 0; cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name");
if (!stbName || stbName->type != cJSON_String
|| stbName->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, stb name not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring,
TSDB_TABLE_NAME_LEN);
g_Dbs.db[i].superTbls[j].disorderRatio = disorderRatio->valueint; cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix");
} else if (!disorderRatio) { if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) {
g_Dbs.db[i].superTbls[j].disorderRatio = 0; printf("ERROR: failed to read json, childtable_prefix not found\n");
} else { goto PARSE_OVER;
printf("ERROR: failed to read json, disorderRatio not found\n"); }
goto PARSE_OVER; tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring,
} TSDB_TABLE_NAME_LEN - 20);
cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table");
if (autoCreateTbl
&& autoCreateTbl->type == cJSON_String
&& autoCreateTbl->valuestring != NULL) {
if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3))
&& (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) {
g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL;
} else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} else {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
}
} else if (!autoCreateTbl) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} else {
printf("ERROR: failed to read json, auto_create_table not found\n");
goto PARSE_OVER;
}
cJSON* disorderRange = cJSON_GetObjectItem(stbInfo, "disorder_range"); cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
if (disorderRange && disorderRange->type == cJSON_Number) { if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].disorderRange = disorderRange->valueint; g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
} else if (!disorderRange) { } else if (!batchCreateTbl) {
g_Dbs.db[i].superTbls[j].disorderRange = 1000; g_Dbs.db[i].superTbls[j].batchCreateTableNum = 1000;
} else { } else {
printf("ERROR: failed to read json, disorderRange not found\n"); printf("ERROR: failed to read json, batch_create_tbl_num not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
if (childTblExists
&& childTblExists->type == cJSON_String
&& childTblExists->valuestring != NULL) {
if ((0 == strncasecmp(childTblExists->valuestring, "yes", 3))
&& (g_Dbs.db[i].drop == false)) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_ALREADY_EXISTS;
} else if ((0 == strncasecmp(childTblExists->valuestring, "no", 2)
|| (g_Dbs.db[i].drop == true))) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
} else {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
}
} else if (!childTblExists) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
} else {
errorPrint("%s() LN%d, failed to read json, child_table_exists not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
}
cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count");
if (!count || count->type != cJSON_Number || 0 >= count->valueint) {
errorPrint("%s() LN%d, failed to read json, childtable_count input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblCount = count->valueint;
cJSON *dataSource = cJSON_GetObjectItem(stbInfo, "data_source");
if (dataSource && dataSource->type == cJSON_String
&& dataSource->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource,
dataSource->valuestring, TSDB_DB_NAME_LEN);
} else if (!dataSource) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", TSDB_DB_NAME_LEN);
} else {
errorPrint("%s() LN%d, failed to read json, data_source not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON *stbIface = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt
if (stbIface && stbIface->type == cJSON_String
&& stbIface->valuestring != NULL) {
if (0 == strcasecmp(stbIface->valuestring, "taosc")) {
g_Dbs.db[i].superTbls[j].iface= TAOSC_IFACE;
} else if (0 == strcasecmp(stbIface->valuestring, "rest")) {
g_Dbs.db[i].superTbls[j].iface= REST_IFACE;
#if STMT_IFACE_ENABLED == 1
} else if (0 == strcasecmp(stbIface->valuestring, "stmt")) {
g_Dbs.db[i].superTbls[j].iface= STMT_IFACE;
#endif
} else {
errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n",
__func__, __LINE__, stbIface->valuestring);
goto PARSE_OVER;
}
} else if (!stbIface) {
g_Dbs.db[i].superTbls[j].iface = TAOSC_IFACE;
} else {
errorPrint("%s", "failed to read json, insert_mode not found\n");
goto PARSE_OVER;
}
cJSON* childTbl_limit = cJSON_GetObjectItem(stbInfo, "childtable_limit");
if ((childTbl_limit) && (g_Dbs.db[i].drop != true)
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) {
if (childTbl_limit->type != cJSON_Number) {
printf("ERROR: failed to read json, childtable_limit\n");
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblLimit = childTbl_limit->valueint;
} else {
g_Dbs.db[i].superTbls[j].childTblLimit = -1; // select ... limit -1 means all query result, drop = yes mean all table need recreate, limit value is invalid.
}
cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset");
if ((childTbl_offset) && (g_Dbs.db[i].drop != true)
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) {
if ((childTbl_offset->type != cJSON_Number)
|| (0 > childTbl_offset->valueint)) {
printf("ERROR: failed to read json, childtable_offset\n");
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblOffset = childTbl_offset->valueint;
} else {
g_Dbs.db[i].superTbls[j].childTblOffset = 0;
}
cJSON *ts = cJSON_GetObjectItem(stbInfo, "start_timestamp");
if (ts && ts->type == cJSON_String && ts->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
ts->valuestring, TSDB_DB_NAME_LEN);
} else if (!ts) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
"now", TSDB_DB_NAME_LEN);
} else {
printf("ERROR: failed to read json, start_timestamp not found\n");
goto PARSE_OVER;
}
cJSON* timestampStep = cJSON_GetObjectItem(stbInfo, "timestamp_step");
if (timestampStep && timestampStep->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].timeStampStep = timestampStep->valueint;
} else if (!timestampStep) {
g_Dbs.db[i].superTbls[j].timeStampStep = DEFAULT_TIMESTAMP_STEP;
} else {
printf("ERROR: failed to read json, timestamp_step not found\n");
goto PARSE_OVER;
}
cJSON *sampleFormat = cJSON_GetObjectItem(stbInfo, "sample_format");
if (sampleFormat && sampleFormat->type
== cJSON_String && sampleFormat->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat,
sampleFormat->valuestring, TSDB_DB_NAME_LEN);
} else if (!sampleFormat) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, "csv", TSDB_DB_NAME_LEN);
} else {
printf("ERROR: failed to read json, sample_format not found\n");
goto PARSE_OVER;
}
cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file");
if (sampleFile && sampleFile->type == cJSON_String
&& sampleFile->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile,
sampleFile->valuestring, MAX_FILE_NAME_LEN);
} else if (!sampleFile) {
memset(g_Dbs.db[i].superTbls[j].sampleFile, 0, MAX_FILE_NAME_LEN);
} else {
printf("ERROR: failed to read json, sample_file not found\n");
goto PARSE_OVER;
}
cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file");
if ((tagsFile && tagsFile->type == cJSON_String)
&& (tagsFile->valuestring != NULL)) {
tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile,
tagsFile->valuestring, MAX_FILE_NAME_LEN);
if (0 == g_Dbs.db[i].superTbls[j].tagsFile[0]) {
g_Dbs.db[i].superTbls[j].tagSource = 0;
} else {
g_Dbs.db[i].superTbls[j].tagSource = 1;
}
} else if (!tagsFile) {
memset(g_Dbs.db[i].superTbls[j].tagsFile, 0, MAX_FILE_NAME_LEN);
g_Dbs.db[i].superTbls[j].tagSource = 0;
} else {
printf("ERROR: failed to read json, tags_file not found\n");
goto PARSE_OVER;
}
cJSON* stbMaxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len");
if (stbMaxSqlLen && stbMaxSqlLen->type == cJSON_Number) {
int32_t len = stbMaxSqlLen->valueint;
if (len > TSDB_MAX_ALLOWED_SQL_LEN) {
len = TSDB_MAX_ALLOWED_SQL_LEN;
} else if (len < 5) {
len = 5;
}
g_Dbs.db[i].superTbls[j].maxSqlLen = len;
} else if (!maxSqlLen) {
g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len;
} else {
errorPrint("%s() LN%d, failed to read json, stbMaxSqlLen input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
/*
cJSON *multiThreadWriteOneTbl =
cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes
if (multiThreadWriteOneTbl
&& multiThreadWriteOneTbl->type == cJSON_String
&& multiThreadWriteOneTbl->valuestring != NULL) {
if (0 == strncasecmp(multiThreadWriteOneTbl->valuestring, "yes", 3)) {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 1;
} else {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0;
}
} else if (!multiThreadWriteOneTbl) {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0;
} else {
printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n");
goto PARSE_OVER;
}
*/
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) {
if (insertRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
} else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) {
if (stbInterlaceRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, interlace rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_Dbs.db[i].superTbls[j].insertRows) {
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > insert_rows %"PRId64"\n\n",
i, j, g_Dbs.db[i].superTbls[j].interlaceRows,
g_Dbs.db[i].superTbls[j].insertRows);
printf(" interlace rows value will be set to insert_rows %"PRId64"\n\n",
g_Dbs.db[i].superTbls[j].insertRows);
prompt();
g_Dbs.db[i].superTbls[j].interlaceRows = g_Dbs.db[i].superTbls[j].insertRows;
}
} else if (!stbInterlaceRows) {
g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
errorPrint(
"%s() LN%d, failed to read json, interlace rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* disorderRatio = cJSON_GetObjectItem(stbInfo, "disorder_ratio");
if (disorderRatio && disorderRatio->type == cJSON_Number) {
if (disorderRatio->valueint > 50)
disorderRatio->valueint = 50;
if (disorderRatio->valueint < 0)
disorderRatio->valueint = 0;
g_Dbs.db[i].superTbls[j].disorderRatio = disorderRatio->valueint;
} else if (!disorderRatio) {
g_Dbs.db[i].superTbls[j].disorderRatio = 0;
} else {
printf("ERROR: failed to read json, disorderRatio not found\n");
goto PARSE_OVER;
}
cJSON* disorderRange = cJSON_GetObjectItem(stbInfo, "disorder_range");
if (disorderRange && disorderRange->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].disorderRange = disorderRange->valueint;
} else if (!disorderRange) {
g_Dbs.db[i].superTbls[j].disorderRange = 1000;
} else {
printf("ERROR: failed to read json, disorderRange not found\n");
goto PARSE_OVER;
}
cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval");
if (insertInterval && insertInterval->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint;
if (insertInterval->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
} else if (!insertInterval) {
verbosePrint("%s() LN%d: stable insert interval be overrided by global %"PRIu64".\n",
__func__, __LINE__, g_args.insert_interval);
g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval;
} else {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval"); int retVal = getColumnAndTagTypeFromInsertJsonFile(
if (insertInterval && insertInterval->type == cJSON_Number) { stbInfo, &g_Dbs.db[i].superTbls[j]);
g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint; if (false == retVal) {
if (insertInterval->valueint < 0) { goto PARSE_OVER;
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", }
__func__, __LINE__);
goto PARSE_OVER;
} }
} else if (!insertInterval) {
verbosePrint("%s() LN%d: stable insert interval be overrided by global %"PRIu64".\n",
__func__, __LINE__, g_args.insert_interval);
g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval;
} else {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
int retVal = getColumnAndTagTypeFromInsertJsonFile(
stbInfo, &g_Dbs.db[i].superTbls[j]);
if (false == retVal) {
goto PARSE_OVER;
}
} }
}
ret = true; ret = true;
PARSE_OVER: PARSE_OVER:
return ret; return ret;
} }
static bool getMetaFromQueryJsonFile(cJSON* root) { static bool getMetaFromQueryJsonFile(cJSON* root) {
...@@ -6536,347 +6551,351 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * ...@@ -6536,347 +6551,351 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
static void startMultiThreadInsertData(int threads, char* db_name, static void startMultiThreadInsertData(int threads, char* db_name,
char* precision, SSuperTable* superTblInfo) { char* precision, SSuperTable* superTblInfo) {
int32_t timePrec = TSDB_TIME_PRECISION_MILLI; int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) { if (0 != precision[0]) {
if (0 == strncasecmp(precision, "ms", 2)) { if (0 == strncasecmp(precision, "ms", 2)) {
timePrec = TSDB_TIME_PRECISION_MILLI; timePrec = TSDB_TIME_PRECISION_MILLI;
} else if (0 == strncasecmp(precision, "us", 2)) { } else if (0 == strncasecmp(precision, "us", 2)) {
timePrec = TSDB_TIME_PRECISION_MICRO; timePrec = TSDB_TIME_PRECISION_MICRO;
} else { #if NANO_SECOND_ENABLED == 1
errorPrint("Not support precision: %s\n", precision); } else if (0 == strncasecmp(precision, "ns", 2)) {
exit(-1); timePrec = TSDB_TIME_PRECISION_NANO;
#endif
} else {
errorPrint("Not support precision: %s\n", precision);
exit(-1);
}
} }
}
int64_t start_time; int64_t start_time;
if (superTblInfo) { if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
start_time = taosGetTimestamp(timePrec); start_time = taosGetTimestamp(timePrec);
} else {
if (TSDB_CODE_SUCCESS != taosParseTime(
superTblInfo->startTimestamp,
&start_time,
strlen(superTblInfo->startTimestamp),
timePrec, 0)) {
ERROR_EXIT("failed to parse time!\n");
}
}
} else { } else {
if (TSDB_CODE_SUCCESS != taosParseTime( start_time = 1500000000000;
superTblInfo->startTimestamp,
&start_time,
strlen(superTblInfo->startTimestamp),
timePrec, 0)) {
ERROR_EXIT("failed to parse time!\n");
}
} }
} else {
start_time = 1500000000000;
}
int64_t start = taosGetTimestampMs(); int64_t start = taosGetTimestampMs();
// read sample data from file first // read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) { "sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) { if (0 != prepareSampleDataForSTable(superTblInfo)) {
errorPrint("%s() LN%d, prepare sample data for stable failed!\n", errorPrint("%s() LN%d, prepare sample data for stable failed!\n",
__func__, __LINE__); __func__, __LINE__);
exit(-1); exit(-1);
}
} }
}
TAOS* taos0 = taos_connect( TAOS* taos0 = taos_connect(
g_Dbs.host, g_Dbs.user, g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port); g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos0) { if (NULL == taos0) {
errorPrint("%s() LN%d, connect to server fail , reason: %s\n", errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
__func__, __LINE__, taos_errstr(NULL)); __func__, __LINE__, taos_errstr(NULL));
exit(-1); exit(-1);
} }
int64_t ntables = 0; int64_t ntables = 0;
uint64_t tableFrom; uint64_t tableFrom;
if (superTblInfo) { if (superTblInfo) {
int64_t limit; int64_t limit;
uint64_t offset; uint64_t offset;
if ((NULL != g_args.sqlFile) && (superTblInfo->childTblExists == TBL_NO_EXISTS) && if ((NULL != g_args.sqlFile) && (superTblInfo->childTblExists == TBL_NO_EXISTS) &&
((superTblInfo->childTblOffset != 0) || (superTblInfo->childTblLimit >= 0))) { ((superTblInfo->childTblOffset != 0) || (superTblInfo->childTblLimit >= 0))) {
printf("WARNING: offset and limit will not be used since the child tables not exists!\n"); printf("WARNING: offset and limit will not be used since the child tables not exists!\n");
} }
if (superTblInfo->childTblExists == TBL_ALREADY_EXISTS) { if (superTblInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((superTblInfo->childTblLimit < 0) if ((superTblInfo->childTblLimit < 0)
|| ((superTblInfo->childTblOffset + superTblInfo->childTblLimit) || ((superTblInfo->childTblOffset + superTblInfo->childTblLimit)
> (superTblInfo->childTblCount))) { > (superTblInfo->childTblCount))) {
superTblInfo->childTblLimit = superTblInfo->childTblLimit =
superTblInfo->childTblCount - superTblInfo->childTblOffset; superTblInfo->childTblCount - superTblInfo->childTblOffset;
} }
offset = superTblInfo->childTblOffset; offset = superTblInfo->childTblOffset;
limit = superTblInfo->childTblLimit; limit = superTblInfo->childTblLimit;
} else { } else {
limit = superTblInfo->childTblCount; limit = superTblInfo->childTblCount;
offset = 0; offset = 0;
} }
ntables = limit; ntables = limit;
tableFrom = offset; tableFrom = offset;
if ((superTblInfo->childTblExists != TBL_NO_EXISTS) if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset + superTblInfo->childTblLimit ) && ((superTblInfo->childTblOffset + superTblInfo->childTblLimit )
> superTblInfo->childTblCount)) { > superTblInfo->childTblCount)) {
printf("WARNING: specified offset + limit > child table count!\n"); printf("WARNING: specified offset + limit > child table count!\n");
prompt(); prompt();
} }
if ((superTblInfo->childTblExists != TBL_NO_EXISTS) if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& (0 == superTblInfo->childTblLimit)) { && (0 == superTblInfo->childTblLimit)) {
printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n"); printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n");
prompt(); prompt();
} }
superTblInfo->childTblName = (char*)calloc(1, superTblInfo->childTblName = (char*)calloc(1,
limit * TSDB_TABLE_NAME_LEN); limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) { if (superTblInfo->childTblName == NULL) {
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos0); taos_close(taos0);
exit(-1); exit(-1);
} }
int64_t childTblCount; int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset( getChildNameOfSuperTableWithLimitAndOffset(
taos0, taos0,
db_name, superTblInfo->sTblName, db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount, &superTblInfo->childTblName, &childTblCount,
limit, limit,
offset); offset);
} else { } else {
ntables = g_args.num_of_tables; ntables = g_args.num_of_tables;
tableFrom = 0; tableFrom = 0;
} }
taos_close(taos0); taos_close(taos0);
int64_t a = ntables / threads; int64_t a = ntables / threads;
if (a < 1) { if (a < 1) {
threads = ntables; threads = ntables;
a = 1; a = 1;
} }
int64_t b = 0; int64_t b = 0;
if (threads != 0) { if (threads != 0) {
b = ntables % threads; b = ntables % threads;
} }
if ((superTblInfo) if ((superTblInfo)
&& (superTblInfo->iface == REST_IFACE)) { && (superTblInfo->iface == REST_IFACE)) {
if (convertHostToServAddr( if (convertHostToServAddr(
g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) { g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
exit(-1); exit(-1);
} }
} }
pthread_t *pids = calloc(1, threads * sizeof(pthread_t)); pthread_t *pids = calloc(1, threads * sizeof(pthread_t));
assert(pids != NULL); assert(pids != NULL);
threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
assert(infos != NULL); assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t)); memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo)); memset(infos, 0, threads * sizeof(threadInfo));
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i; threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i; pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN); tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->time_precision = timePrec; pThreadInfo->time_precision = timePrec;
pThreadInfo->superTblInfo = superTblInfo; pThreadInfo->superTblInfo = superTblInfo;
pThreadInfo->start_time = start_time;
pThreadInfo->minDelay = UINT64_MAX;
if ((NULL == superTblInfo) ||
(superTblInfo->iface != REST_IFACE)) {
//t_info->taos = taos;
pThreadInfo->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == pThreadInfo->taos) {
errorPrint(
"%s() LN%d, connect to server fail from insert sub thread, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(infos);
exit(-1);
}
pThreadInfo->start_time = start_time; #if STMT_IFACE_ENABLED == 1
pThreadInfo->minDelay = UINT64_MAX; if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) {
if ((NULL == superTblInfo) || int columnCount;
(superTblInfo->iface != REST_IFACE)) { if (superTblInfo) {
//t_info->taos = taos; columnCount = superTblInfo->columnCount;
pThreadInfo->taos = taos_connect( } else {
g_Dbs.host, g_Dbs.user, columnCount = g_args.num_of_CPR;
g_Dbs.password, db_name, g_Dbs.port); }
if (NULL == pThreadInfo->taos) {
errorPrint(
"%s() LN%d, connect to server fail from insert sub thread, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(infos);
exit(-1);
}
#if STMT_IFACE_ENABLED == 1 pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if ((g_args.iface == STMT_IFACE) if (NULL == pThreadInfo->stmt) {
|| ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) { errorPrint(
"%s() LN%d, failed init stmt, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
int columnCount; char buffer[3000];
if (superTblInfo) { char *pstr = buffer;
columnCount = superTblInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
if (NULL == pThreadInfo->stmt) { pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
errorPrint( superTblInfo->sTblName);
"%s() LN%d, failed init stmt, reason: %s\n", for (int tag = 0; tag < (superTblInfo->tagCount - 1); tag ++ ) {
__func__, __LINE__, pstr += sprintf(pstr, ",?");
taos_errstr(NULL)); }
free(pids); pstr += sprintf(pstr, ") VALUES(?");
free(infos); } else {
exit(-1); pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
} }
char buffer[3000]; for (int col = 0; col < columnCount; col ++) {
char *pstr = buffer; pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ")");
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer);
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?", int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
superTblInfo->sTblName); if (ret != 0){
for (int tag = 0; tag < (superTblInfo->tagCount - 1); tag ++ ) { errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
pstr += sprintf(pstr, ",?"); ret, taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
} }
pstr += sprintf(pstr, ") VALUES(?"); #endif
} else { } else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?"); pThreadInfo->taos = NULL;
} }
for (int col = 0; col < columnCount; col ++) { /* if ((NULL == superTblInfo)
pstr += sprintf(pstr, ",?"); || (0 == superTblInfo->multiThreadWriteOneTbl)) {
} */
pstr += sprintf(pstr, ")"); pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer); pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0); tableFrom = pThreadInfo->end_table_to + 1;
if (ret != 0){ /* } else {
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", pThreadInfo->start_table_from = 0;
ret, taos_errstr(NULL)); pThreadInfo->ntables = superTblInfo->childTblCount;
free(pids); pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
free(infos); }
exit(-1); */
tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
} else {
pthread_create(pids + i, NULL, syncWrite, pThreadInfo);
} }
}
#endif
} else {
pThreadInfo->taos = NULL;
} }
/* if ((NULL == superTblInfo) for (int i = 0; i < threads; i++) {
|| (0 == superTblInfo->multiThreadWriteOneTbl)) { pthread_join(pids[i], NULL);
*/
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
/* } else {
pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount;
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
}
*/
tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
} else {
pthread_create(pids + i, NULL, syncWrite, pThreadInfo);
} }
}
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
}
uint64_t totalDelay = 0; uint64_t totalDelay = 0;
uint64_t maxDelay = 0; uint64_t maxDelay = 0;
uint64_t minDelay = UINT64_MAX; uint64_t minDelay = UINT64_MAX;
uint64_t cntDelay = 1; uint64_t cntDelay = 1;
double avgDelay = 0; double avgDelay = 0;
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i; threadInfo *pThreadInfo = infos + i;
tsem_destroy(&(pThreadInfo->lock_sem)); tsem_destroy(&(pThreadInfo->lock_sem));
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
if (pThreadInfo->stmt) { if (pThreadInfo->stmt) {
taos_stmt_close(pThreadInfo->stmt); taos_stmt_close(pThreadInfo->stmt);
} }
#endif #endif
tsem_destroy(&(pThreadInfo->lock_sem)); tsem_destroy(&(pThreadInfo->lock_sem));
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
__func__, __LINE__,
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
if (superTblInfo) {
superTblInfo->totalAffectedRows += pThreadInfo->totalAffectedRows;
superTblInfo->totalInsertRows += pThreadInfo->totalInsertRows;
} else {
g_args.totalAffectedRows += pThreadInfo->totalAffectedRows;
g_args.totalInsertRows += pThreadInfo->totalInsertRows;
}
totalDelay += pThreadInfo->totalDelay; debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
cntDelay += pThreadInfo->cntDelay; __func__, __LINE__,
if (pThreadInfo->maxDelay > maxDelay) maxDelay = pThreadInfo->maxDelay; pThreadInfo->threadID, pThreadInfo->totalInsertRows,
if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay; pThreadInfo->totalAffectedRows);
} if (superTblInfo) {
cntDelay -= 1; superTblInfo->totalAffectedRows += pThreadInfo->totalAffectedRows;
superTblInfo->totalInsertRows += pThreadInfo->totalInsertRows;
} else {
g_args.totalAffectedRows += pThreadInfo->totalAffectedRows;
g_args.totalInsertRows += pThreadInfo->totalInsertRows;
}
if (cntDelay == 0) cntDelay = 1; totalDelay += pThreadInfo->totalDelay;
avgDelay = (double)totalDelay / cntDelay; cntDelay += pThreadInfo->cntDelay;
if (pThreadInfo->maxDelay > maxDelay) maxDelay = pThreadInfo->maxDelay;
if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay;
}
cntDelay -= 1;
int64_t end = taosGetTimestampMs(); if (cntDelay == 0) cntDelay = 1;
int64_t t = end - start; avgDelay = (double)totalDelay / cntDelay;
double tInMs = t/1000.0; int64_t end = taosGetTimestampMs();
int64_t t = end - start;
if (superTblInfo) { double tInMs = t/1000.0;
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
(tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) { if (superTblInfo) {
fprintf(g_fpOfInsertResult, fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n", tInMs, superTblInfo->totalInsertRows,
tInMs, superTblInfo->totalInsertRows, superTblInfo->totalAffectedRows,
superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName,
threads, db_name, superTblInfo->sTblName, (tInMs)?
(tInMs)? (double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
(tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
}
} else {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
tInMs, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
(tInMs)?
(double)(g_args.totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
tInMs, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
(tInMs)?
(double)(g_args.totalInsertRows/tInMs):FLT_MAX);
}
} }
} else {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n", fprintf(stderr, "insert delay, avg: %10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
tInMs, g_args.totalInsertRows, avgDelay, maxDelay, minDelay);
g_args.totalAffectedRows,
threads, db_name,
(tInMs)?
(double)(g_args.totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) { if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, fprintf(g_fpOfInsertResult, "insert delay, avg:%10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n", avgDelay, maxDelay, minDelay);
tInMs, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
(tInMs)?
(double)(g_args.totalInsertRows/tInMs):FLT_MAX);
} }
}
fprintf(stderr, "insert delay, avg: %10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay);
}
//taos_close(taos); //taos_close(taos);
free(pids); free(pids);
free(infos); free(infos);
} }
static void *readTable(void *sarg) { static void *readTable(void *sarg) {
...@@ -7034,98 +7053,98 @@ static void prompt() ...@@ -7034,98 +7053,98 @@ static void prompt()
static int insertTestProcess() { static int insertTestProcess() {
setupForAnsiEscape(); setupForAnsiEscape();
int ret = printfInsertMeta(); int ret = printfInsertMeta();
resetAfterAnsiEscape(); resetAfterAnsiEscape();
if (ret == -1) if (ret == -1)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile); debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile);
g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a"); g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a");
if (NULL == g_fpOfInsertResult) { if (NULL == g_fpOfInsertResult) {
errorPrint( "Failed to open %s for save result\n", g_Dbs.resultFile); errorPrint( "Failed to open %s for save result\n", g_Dbs.resultFile);
return -1; return -1;
} }
if (g_fpOfInsertResult) if (g_fpOfInsertResult)
printfInsertMetaToFile(g_fpOfInsertResult); printfInsertMetaToFile(g_fpOfInsertResult);
prompt(); prompt();
init_rand_data(); init_rand_data();
// create database and super tables // create database and super tables
if(createDatabasesAndStables() != 0) { if(createDatabasesAndStables() != 0) {
if (g_fpOfInsertResult) if (g_fpOfInsertResult)
fclose(g_fpOfInsertResult); fclose(g_fpOfInsertResult);
return -1; return -1;
} }
// pretreatement // pretreatement
if (prepareSampleData() != 0) { if (prepareSampleData() != 0) {
if (g_fpOfInsertResult) if (g_fpOfInsertResult)
fclose(g_fpOfInsertResult); fclose(g_fpOfInsertResult);
return -1; return -1;
} }
double start; double start;
double end; double end;
// create child tables // create child tables
start = taosGetTimestampMs(); start = taosGetTimestampMs();
createChildTables(); createChildTables();
end = taosGetTimestampMs(); end = taosGetTimestampMs();
if (g_totalChildTables > 0) { if (g_totalChildTables > 0) {
fprintf(stderr, "Spent %.4f seconds to create %"PRId64" tables with %d thread(s)\n\n", fprintf(stderr, "Spent %.4f seconds to create %"PRId64" tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl); (end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
if (g_fpOfInsertResult) { if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to create %"PRId64" tables with %d thread(s)\n\n", "Spent %.4f seconds to create %"PRId64" tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl); (end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
}
} }
}
// create sub threads for inserting data // create sub threads for inserting data
//start = taosGetTimestampMs(); //start = taosGetTimestampMs();
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.use_metric) { if (g_Dbs.use_metric) {
if (g_Dbs.db[i].superTblCount > 0) { if (g_Dbs.db[i].superTblCount > 0) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
if (superTblInfo && (superTblInfo->insertRows > 0)) { if (superTblInfo && (superTblInfo->insertRows > 0)) {
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
superTblInfo);
}
}
}
} else {
startMultiThreadInsertData( startMultiThreadInsertData(
g_Dbs.threadCount, g_Dbs.threadCount,
g_Dbs.db[i].dbName, g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision, g_Dbs.db[i].dbCfg.precision,
superTblInfo); NULL);
}
} }
}
} else {
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
NULL);
} }
} //end = taosGetTimestampMs();
//end = taosGetTimestampMs();
//int64_t totalInsertRows = 0; //int64_t totalInsertRows = 0;
//int64_t totalAffectedRows = 0; //int64_t totalAffectedRows = 0;
//for (int i = 0; i < g_Dbs.dbCount; i++) { //for (int i = 0; i < g_Dbs.dbCount; i++) {
// for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// totalInsertRows+= g_Dbs.db[i].superTbls[j].totalInsertRows; // totalInsertRows+= g_Dbs.db[i].superTbls[j].totalInsertRows;
// totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows; // totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows;
//} //}
//printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalInsertRows, totalAffectedRows, g_Dbs.threadCount); //printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalInsertRows, totalAffectedRows, g_Dbs.threadCount);
postFreeResource(); postFreeResource();
return 0; return 0;
} }
static void *specifiedTableQuery(void *sarg) { static void *specifiedTableQuery(void *sarg) {
...@@ -7961,116 +7980,116 @@ static void initOfQueryMeta() { ...@@ -7961,116 +7980,116 @@ static void initOfQueryMeta() {
} }
static void setParaFromArg(){ static void setParaFromArg(){
if (g_args.host) { if (g_args.host) {
tstrncpy(g_Dbs.host, g_args.host, MAX_HOSTNAME_SIZE); tstrncpy(g_Dbs.host, g_args.host, MAX_HOSTNAME_SIZE);
} else { } else {
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE); tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
} }
if (g_args.user) {
tstrncpy(g_Dbs.user, g_args.user, MAX_USERNAME_SIZE);
}
if (g_args.password) { if (g_args.user) {
tstrncpy(g_Dbs.password, g_args.password, MAX_PASSWORD_SIZE); tstrncpy(g_Dbs.user, g_args.user, MAX_USERNAME_SIZE);
} }
if (g_args.port) { if (g_args.password) {
g_Dbs.port = g_args.port; tstrncpy(g_Dbs.password, g_args.password, MAX_PASSWORD_SIZE);
} }
g_Dbs.threadCount = g_args.num_of_threads; if (g_args.port) {
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads; g_Dbs.port = g_args.port;
}
g_Dbs.dbCount = 1; g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.db[0].drop = true; g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
tstrncpy(g_Dbs.db[0].dbName, g_args.database, TSDB_DB_NAME_LEN); g_Dbs.dbCount = 1;
g_Dbs.db[0].dbCfg.replica = g_args.replica; g_Dbs.db[0].drop = true;
tstrncpy(g_Dbs.db[0].dbCfg.precision, "ms", 8);
tstrncpy(g_Dbs.resultFile, g_args.output_file, MAX_FILE_NAME_LEN); tstrncpy(g_Dbs.db[0].dbName, g_args.database, TSDB_DB_NAME_LEN);
g_Dbs.db[0].dbCfg.replica = g_args.replica;
tstrncpy(g_Dbs.db[0].dbCfg.precision, "ms", 8);
g_Dbs.use_metric = g_args.use_metric; tstrncpy(g_Dbs.resultFile, g_args.output_file, MAX_FILE_NAME_LEN);
g_Dbs.insert_only = g_args.insert_only;
g_Dbs.do_aggreFunc = true; g_Dbs.use_metric = g_args.use_metric;
g_Dbs.insert_only = g_args.insert_only;
char dataString[STRING_LEN]; g_Dbs.do_aggreFunc = true;
char **data_type = g_args.datatype;
memset(dataString, 0, STRING_LEN); char dataString[STRING_LEN];
char **data_type = g_args.datatype;
if (strcasecmp(data_type[0], "BINARY") == 0 memset(dataString, 0, STRING_LEN);
|| strcasecmp(data_type[0], "BOOL") == 0
|| strcasecmp(data_type[0], "NCHAR") == 0 ) {
g_Dbs.do_aggreFunc = false;
}
if (g_args.use_metric) { if (strcasecmp(data_type[0], "BINARY") == 0
g_Dbs.db[0].superTblCount = 1; || strcasecmp(data_type[0], "BOOL") == 0
tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", TSDB_TABLE_NAME_LEN); || strcasecmp(data_type[0], "NCHAR") == 0 ) {
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables; g_Dbs.do_aggreFunc = false;
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.asyncMode = g_args.async_mode;
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange;
g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio;
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
g_args.tb_prefix, TSDB_TABLE_NAME_LEN - 20);
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
if (g_args.iface == INTERFACE_BUT) {
g_Dbs.db[0].superTbls[0].iface = TAOSC_IFACE;
} else {
g_Dbs.db[0].superTbls[0].iface = g_args.iface;
} }
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; if (g_args.use_metric) {
g_Dbs.db[0].superTbls[0].maxSqlLen = g_args.max_sql_len; g_Dbs.db[0].superTblCount = 1;
tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", TSDB_TABLE_NAME_LEN);
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.asyncMode = g_args.async_mode;
g_Dbs.db[0].superTbls[0].columnCount = 0; g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
for (int i = 0; i < MAX_NUM_COLUMNS; i++) { g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
if (data_type[i] == NULL) { g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange;
break; g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio;
} tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
g_args.tb_prefix, TSDB_TABLE_NAME_LEN - 20);
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType, if (g_args.iface == INTERFACE_BUT) {
data_type[i], strlen(data_type[i]) + 1); g_Dbs.db[0].superTbls[0].iface = TAOSC_IFACE;
g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.len_of_binary; } else {
g_Dbs.db[0].superTbls[0].columnCount++; g_Dbs.db[0].superTbls[0].iface = g_args.iface;
} }
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) { g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR; g_Dbs.db[0].superTbls[0].maxSqlLen = g_args.max_sql_len;
} else {
for (int i = g_Dbs.db[0].superTbls[0].columnCount;
i < g_args.num_of_CPR; i++) {
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
"INT", strlen("INT") + 1);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0;
g_Dbs.db[0].superTbls[0].columnCount++;
}
}
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType, g_Dbs.db[0].superTbls[0].columnCount = 0;
"INT", strlen("INT") + 1); for (int i = 0; i < MAX_NUM_COLUMNS; i++) {
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0; if (data_type[i] == NULL) {
break;
}
tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType, tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
"BINARY", strlen("BINARY") + 1); data_type[i], strlen(data_type[i]) + 1);
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary; g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].tagCount = 2; g_Dbs.db[0].superTbls[0].columnCount++;
} else { }
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.db[0].superTbls[0].tagCount = 0; if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) {
} g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR;
} else {
for (int i = g_Dbs.db[0].superTbls[0].columnCount;
i < g_args.num_of_CPR; i++) {
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
"INT", strlen("INT") + 1);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0;
g_Dbs.db[0].superTbls[0].columnCount++;
}
}
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType,
"INT", strlen("INT") + 1);
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0;
tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType,
"BINARY", strlen("BINARY") + 1);
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].tagCount = 2;
} else {
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.db[0].superTbls[0].tagCount = 0;
}
} }
/* Function to do regular expression check */ /* Function to do regular expression check */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册