未验证 提交 93396bca 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD5065]<feature>: taosdemo nano second support (#6902)

上级 91774fda
......@@ -54,6 +54,7 @@
#include "tutil.h"
#define STMT_IFACE_ENABLED 1
#define NANO_SECOND_ENABLED 1
#define REQ_EXTRA_BUF_LEN 1024
#define RESP_BUF_LEN 4096
......@@ -66,13 +67,6 @@ extern char configDir[];
#define STR_INSERT_INTO "INSERT INTO "
enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
SUBSCRIBE_TEST, // 2
INVAID_TEST
};
#define MAX_RECORDS_PER_REQ 32766
#define HEAD_BUFF_LEN TSDB_MAX_COLUMNS*24 // 16*MAX_COLUMNS + (192+32)*2 + insert into ..
......@@ -105,6 +99,13 @@ enum TEST_MODE {
#define DEFAULT_TIMESTAMP_STEP 1
enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
SUBSCRIBE_TEST, // 2
INVAID_TEST
};
typedef enum CREATE_SUB_TALBE_MOD_EN {
PRE_CREATE_SUBTBL,
AUTO_CREATE_SUBTBL,
......@@ -112,15 +113,15 @@ typedef enum CREATE_SUB_TALBE_MOD_EN {
} CREATE_SUB_TALBE_MOD_EN;
typedef enum TALBE_EXISTS_EN {
TBL_NO_EXISTS,
TBL_ALREADY_EXISTS,
TBL_EXISTS_BUTT
TBL_NO_EXISTS,
TBL_ALREADY_EXISTS,
TBL_EXISTS_BUTT
} TALBE_EXISTS_EN;
enum enumSYNC_MODE {
SYNC_MODE,
ASYNC_MODE,
MODE_BUT
SYNC_MODE,
ASYNC_MODE,
MODE_BUT
};
enum enum_TAOS_INTERFACE {
......@@ -143,52 +144,52 @@ typedef enum enum_PROGRESSIVE_OR_INTERLACE {
} PROG_OR_INTERLACE_MODE;
typedef enum enumQUERY_TYPE {
NO_INSERT_TYPE,
INSERT_TYPE,
QUERY_TYPE_BUT
NO_INSERT_TYPE,
INSERT_TYPE,
QUERY_TYPE_BUT
} QUERY_TYPE;
enum _show_db_index {
TSDB_SHOW_DB_NAME_INDEX,
TSDB_SHOW_DB_CREATED_TIME_INDEX,
TSDB_SHOW_DB_NTABLES_INDEX,
TSDB_SHOW_DB_VGROUPS_INDEX,
TSDB_SHOW_DB_REPLICA_INDEX,
TSDB_SHOW_DB_QUORUM_INDEX,
TSDB_SHOW_DB_DAYS_INDEX,
TSDB_SHOW_DB_KEEP_INDEX,
TSDB_SHOW_DB_CACHE_INDEX,
TSDB_SHOW_DB_BLOCKS_INDEX,
TSDB_SHOW_DB_MINROWS_INDEX,
TSDB_SHOW_DB_MAXROWS_INDEX,
TSDB_SHOW_DB_WALLEVEL_INDEX,
TSDB_SHOW_DB_FSYNC_INDEX,
TSDB_SHOW_DB_COMP_INDEX,
TSDB_SHOW_DB_CACHELAST_INDEX,
TSDB_SHOW_DB_PRECISION_INDEX,
TSDB_SHOW_DB_UPDATE_INDEX,
TSDB_SHOW_DB_STATUS_INDEX,
TSDB_MAX_SHOW_DB
TSDB_SHOW_DB_NAME_INDEX,
TSDB_SHOW_DB_CREATED_TIME_INDEX,
TSDB_SHOW_DB_NTABLES_INDEX,
TSDB_SHOW_DB_VGROUPS_INDEX,
TSDB_SHOW_DB_REPLICA_INDEX,
TSDB_SHOW_DB_QUORUM_INDEX,
TSDB_SHOW_DB_DAYS_INDEX,
TSDB_SHOW_DB_KEEP_INDEX,
TSDB_SHOW_DB_CACHE_INDEX,
TSDB_SHOW_DB_BLOCKS_INDEX,
TSDB_SHOW_DB_MINROWS_INDEX,
TSDB_SHOW_DB_MAXROWS_INDEX,
TSDB_SHOW_DB_WALLEVEL_INDEX,
TSDB_SHOW_DB_FSYNC_INDEX,
TSDB_SHOW_DB_COMP_INDEX,
TSDB_SHOW_DB_CACHELAST_INDEX,
TSDB_SHOW_DB_PRECISION_INDEX,
TSDB_SHOW_DB_UPDATE_INDEX,
TSDB_SHOW_DB_STATUS_INDEX,
TSDB_MAX_SHOW_DB
};
// -----------------------------------------SHOW TABLES CONFIGURE -------------------------------------
enum _show_stables_index {
TSDB_SHOW_STABLES_NAME_INDEX,
TSDB_SHOW_STABLES_CREATED_TIME_INDEX,
TSDB_SHOW_STABLES_COLUMNS_INDEX,
TSDB_SHOW_STABLES_METRIC_INDEX,
TSDB_SHOW_STABLES_UID_INDEX,
TSDB_SHOW_STABLES_TID_INDEX,
TSDB_SHOW_STABLES_VGID_INDEX,
TSDB_MAX_SHOW_STABLES
TSDB_SHOW_STABLES_NAME_INDEX,
TSDB_SHOW_STABLES_CREATED_TIME_INDEX,
TSDB_SHOW_STABLES_COLUMNS_INDEX,
TSDB_SHOW_STABLES_METRIC_INDEX,
TSDB_SHOW_STABLES_UID_INDEX,
TSDB_SHOW_STABLES_TID_INDEX,
TSDB_SHOW_STABLES_VGID_INDEX,
TSDB_MAX_SHOW_STABLES
};
enum _describe_table_index {
TSDB_DESCRIBE_METRIC_FIELD_INDEX,
TSDB_DESCRIBE_METRIC_TYPE_INDEX,
TSDB_DESCRIBE_METRIC_LENGTH_INDEX,
TSDB_DESCRIBE_METRIC_NOTE_INDEX,
TSDB_MAX_DESCRIBE_METRIC
TSDB_DESCRIBE_METRIC_FIELD_INDEX,
TSDB_DESCRIBE_METRIC_TYPE_INDEX,
TSDB_DESCRIBE_METRIC_LENGTH_INDEX,
TSDB_DESCRIBE_METRIC_NOTE_INDEX,
TSDB_MAX_DESCRIBE_METRIC
};
/* Used by main to communicate with parse_opt. */
......@@ -228,7 +229,7 @@ typedef struct SArguments_S {
int64_t num_of_DPT;
int abort;
uint32_t disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision
int disorderRange; // ms, us or ns. accordig to database precision
uint32_t method_of_delete;
char ** arg_list;
uint64_t totalInsertRows;
......@@ -237,191 +238,191 @@ typedef struct SArguments_S {
} SArguments;
typedef struct SColumn_S {
char field[TSDB_COL_NAME_LEN];
char dataType[16];
uint32_t dataLen;
char note[128];
char field[TSDB_COL_NAME_LEN];
char dataType[16];
uint32_t dataLen;
char note[128];
} StrColumn;
typedef struct SSuperTable_S {
char sTblName[TSDB_TABLE_NAME_LEN];
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
char childTblPrefix[TSDB_TABLE_NAME_LEN - 20]; // 20 characters reserved for seq
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
uint16_t childTblExists;
int64_t childTblCount;
uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
uint16_t iface; // 0: taosc, 1: rest, 2: stmt
int64_t childTblLimit;
uint64_t childTblOffset;
// int multiThreadWriteOneTbl; // 0: no, 1: yes
uint32_t interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision
uint64_t maxSqlLen; //
uint64_t insertInterval; // insert interval, will override global insert interval
int64_t insertRows;
int64_t timeStampStep;
char startTimestamp[MAX_TB_NAME_SIZE];
char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json
char sampleFile[MAX_FILE_NAME_LEN];
char tagsFile[MAX_FILE_NAME_LEN];
uint32_t columnCount;
StrColumn columns[TSDB_MAX_COLUMNS];
uint32_t tagCount;
StrColumn tags[TSDB_MAX_TAGS];
char* childTblName;
char* colsOfCreateChildTable;
uint64_t lenOfOneRow;
uint64_t lenOfTagOfOneRow;
char* sampleDataBuf;
//int sampleRowCount;
//int sampleUsePos;
uint32_t tagSource; // 0: rand, 1: tag sample
char* tagDataBuf;
uint32_t tagSampleCount;
uint32_t tagUsePos;
// statistics
uint64_t totalInsertRows;
uint64_t totalAffectedRows;
char sTblName[TSDB_TABLE_NAME_LEN];
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
char childTblPrefix[TSDB_TABLE_NAME_LEN - 20]; // 20 characters reserved for seq
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
uint16_t childTblExists;
int64_t childTblCount;
uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
uint16_t iface; // 0: taosc, 1: rest, 2: stmt
int64_t childTblLimit;
uint64_t childTblOffset;
// int multiThreadWriteOneTbl; // 0: no, 1: yes
uint32_t interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms, us or ns. according to database precision
uint64_t maxSqlLen; //
uint64_t insertInterval; // insert interval, will override global insert interval
int64_t insertRows;
int64_t timeStampStep;
char startTimestamp[MAX_TB_NAME_SIZE];
char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json
char sampleFile[MAX_FILE_NAME_LEN];
char tagsFile[MAX_FILE_NAME_LEN];
uint32_t columnCount;
StrColumn columns[TSDB_MAX_COLUMNS];
uint32_t tagCount;
StrColumn tags[TSDB_MAX_TAGS];
char* childTblName;
char* colsOfCreateChildTable;
uint64_t lenOfOneRow;
uint64_t lenOfTagOfOneRow;
char* sampleDataBuf;
//int sampleRowCount;
//int sampleUsePos;
uint32_t tagSource; // 0: rand, 1: tag sample
char* tagDataBuf;
uint32_t tagSampleCount;
uint32_t tagUsePos;
// statistics
uint64_t totalInsertRows;
uint64_t totalAffectedRows;
} SSuperTable;
typedef struct {
char name[TSDB_DB_NAME_LEN];
char create_time[32];
int64_t ntables;
int32_t vgroups;
int16_t replica;
int16_t quorum;
int16_t days;
char keeplist[32];
int32_t cache; //MB
int32_t blocks;
int32_t minrows;
int32_t maxrows;
int8_t wallevel;
int32_t fsync;
int8_t comp;
int8_t cachelast;
char precision[8]; // time resolution
int8_t update;
char status[16];
char name[TSDB_DB_NAME_LEN];
char create_time[32];
int64_t ntables;
int32_t vgroups;
int16_t replica;
int16_t quorum;
int16_t days;
char keeplist[32];
int32_t cache; //MB
int32_t blocks;
int32_t minrows;
int32_t maxrows;
int8_t wallevel;
int32_t fsync;
int8_t comp;
int8_t cachelast;
char precision[8]; // time resolution
int8_t update;
char status[16];
} SDbInfo;
typedef struct SDbCfg_S {
// int maxtablesPerVnode;
uint32_t minRows; // 0 means default
uint32_t maxRows; // 0 means default
int comp;
int walLevel;
int cacheLast;
int fsync;
int replica;
int update;
int keep;
int days;
int cache;
int blocks;
int quorum;
char precision[8];
// int maxtablesPerVnode;
uint32_t minRows; // 0 means default
uint32_t maxRows; // 0 means default
int comp;
int walLevel;
int cacheLast;
int fsync;
int replica;
int update;
int keep;
int days;
int cache;
int blocks;
int quorum;
char precision[8];
} SDbCfg;
typedef struct SDataBase_S {
char dbName[TSDB_DB_NAME_LEN];
bool drop; // 0: use exists, 1: if exists, drop then new create
SDbCfg dbCfg;
uint64_t superTblCount;
SSuperTable superTbls[MAX_SUPER_TABLE_COUNT];
char dbName[TSDB_DB_NAME_LEN];
bool drop; // 0: use exists, 1: if exists, drop then new create
SDbCfg dbCfg;
uint64_t superTblCount;
SSuperTable superTbls[MAX_SUPER_TABLE_COUNT];
} SDataBase;
typedef struct SDbs_S {
char cfgDir[MAX_FILE_NAME_LEN];
char host[MAX_HOSTNAME_SIZE];
struct sockaddr_in serv_addr;
uint16_t port;
char user[MAX_USERNAME_SIZE];
char password[MAX_PASSWORD_SIZE];
char resultFile[MAX_FILE_NAME_LEN];
bool use_metric;
bool insert_only;
bool do_aggreFunc;
bool asyncMode;
uint32_t threadCount;
uint32_t threadCountByCreateTbl;
uint32_t dbCount;
SDataBase db[MAX_DB_COUNT];
// statistics
uint64_t totalInsertRows;
uint64_t totalAffectedRows;
char cfgDir[MAX_FILE_NAME_LEN];
char host[MAX_HOSTNAME_SIZE];
struct sockaddr_in serv_addr;
uint16_t port;
char user[MAX_USERNAME_SIZE];
char password[MAX_PASSWORD_SIZE];
char resultFile[MAX_FILE_NAME_LEN];
bool use_metric;
bool insert_only;
bool do_aggreFunc;
bool asyncMode;
uint32_t threadCount;
uint32_t threadCountByCreateTbl;
uint32_t dbCount;
SDataBase db[MAX_DB_COUNT];
// statistics
uint64_t totalInsertRows;
uint64_t totalAffectedRows;
} SDbs;
typedef struct SpecifiedQueryInfo_S {
uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint32_t concurrent;
int sqlCount;
uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms
uint64_t queryTimes;
bool subscribeRestart;
int subscribeKeepProgress;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
int endAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char topic[MAX_QUERY_SQL_COUNT][32];
int consumed[MAX_QUERY_SQL_COUNT];
TAOS_RES* res[MAX_QUERY_SQL_COUNT];
uint64_t totalQueried;
uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint32_t concurrent;
int sqlCount;
uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms
uint64_t queryTimes;
bool subscribeRestart;
int subscribeKeepProgress;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
int endAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char topic[MAX_QUERY_SQL_COUNT][32];
int consumed[MAX_QUERY_SQL_COUNT];
TAOS_RES* res[MAX_QUERY_SQL_COUNT];
uint64_t totalQueried;
} SpecifiedQueryInfo;
typedef struct SuperQueryInfo_S {
char sTblName[TSDB_TABLE_NAME_LEN];
uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint32_t threadCnt;
uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms
bool subscribeRestart;
int subscribeKeepProgress;
uint64_t queryTimes;
int64_t childTblCount;
char childTblPrefix[TSDB_TABLE_NAME_LEN - 20]; // 20 characters reserved for seq
int sqlCount;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN];
int resubAfterConsume;
int endAfterConsume;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char* childTblName;
uint64_t totalQueried;
char sTblName[TSDB_TABLE_NAME_LEN];
uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint32_t threadCnt;
uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms
bool subscribeRestart;
int subscribeKeepProgress;
uint64_t queryTimes;
int64_t childTblCount;
char childTblPrefix[TSDB_TABLE_NAME_LEN - 20]; // 20 characters reserved for seq
int sqlCount;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN];
int resubAfterConsume;
int endAfterConsume;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char* childTblName;
uint64_t totalQueried;
} SuperQueryInfo;
typedef struct SQueryMetaInfo_S {
char cfgDir[MAX_FILE_NAME_LEN];
char host[MAX_HOSTNAME_SIZE];
uint16_t port;
struct sockaddr_in serv_addr;
char user[MAX_USERNAME_SIZE];
char password[MAX_PASSWORD_SIZE];
char dbName[TSDB_DB_NAME_LEN];
char queryMode[MAX_TB_NAME_SIZE]; // taosc, rest
SpecifiedQueryInfo specifiedQueryInfo;
SuperQueryInfo superQueryInfo;
uint64_t totalQueried;
char cfgDir[MAX_FILE_NAME_LEN];
char host[MAX_HOSTNAME_SIZE];
uint16_t port;
struct sockaddr_in serv_addr;
char user[MAX_USERNAME_SIZE];
char password[MAX_PASSWORD_SIZE];
char dbName[TSDB_DB_NAME_LEN];
char queryMode[MAX_TB_NAME_SIZE]; // taosc, rest
SpecifiedQueryInfo specifiedQueryInfo;
SuperQueryInfo superQueryInfo;
uint64_t totalQueried;
} SQueryMetaInfo;
typedef struct SThreadInfo_S {
......@@ -1513,7 +1514,10 @@ static int printfInsertMeta() {
}
if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
#if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))
#endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ns", 2))) {
printf(" precision: \033[33m%s\033[0m\n",
g_Dbs.db[i].dbCfg.precision);
} else {
......@@ -1703,6 +1707,9 @@ static void printfInsertMetaToFile(FILE* fp) {
}
if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
#if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ns", 2))
#endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
fprintf(fp, " precision: %s\n",
g_Dbs.db[i].dbCfg.precision);
......@@ -1903,10 +1910,12 @@ static void printfQueryMeta() {
static char* formatTimestamp(char* buf, int64_t val, int precision) {
time_t tt;
if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000);
#if NANO_SECOND_ENABLED == 1
} if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
#endif
} else {
tt = (time_t)(val / 1000);
}
......@@ -1926,10 +1935,12 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
struct tm* ptm = localtime(&tt);
size_t pos = strftime(buf, 32, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%06d", (int)(val % 1000000));
#if NANO_SECOND_ENABLED == 1
} else if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
#endif
} else {
sprintf(buf + pos, ".%03d", (int)(val % 1000));
}
......@@ -2875,134 +2886,138 @@ static int createSuperTable(
}
static int createDatabasesAndStables() {
TAOS * taos = NULL;
int ret = 0;
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port);
if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
return -1;
}
char command[BUFFER_SIZE] = "\0";
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].drop) {
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
taos_close(taos);
TAOS * taos = NULL;
int ret = 0;
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port);
if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
return -1;
}
}
char command[BUFFER_SIZE] = "\0";
int dataLen = 0;
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "create database if not exists %s", g_Dbs.db[i].dbName);
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].drop) {
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
taos_close(taos);
return -1;
}
if (g_Dbs.db[i].dbCfg.blocks > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " blocks %d", g_Dbs.db[i].dbCfg.blocks);
}
if (g_Dbs.db[i].dbCfg.cache > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cache %d", g_Dbs.db[i].dbCfg.cache);
}
if (g_Dbs.db[i].dbCfg.days > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " days %d", g_Dbs.db[i].dbCfg.days);
}
if (g_Dbs.db[i].dbCfg.keep > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " keep %d", g_Dbs.db[i].dbCfg.keep);
}
if (g_Dbs.db[i].dbCfg.quorum > 1) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " quorum %d", g_Dbs.db[i].dbCfg.quorum);
}
if (g_Dbs.db[i].dbCfg.replica > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " replica %d", g_Dbs.db[i].dbCfg.replica);
}
if (g_Dbs.db[i].dbCfg.update > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " update %d", g_Dbs.db[i].dbCfg.update);
}
//if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) {
// dataLen += snprintf(command + dataLen,
// BUFFER_SIZE - dataLen, "tables %d ", g_Dbs.db[i].dbCfg.maxtablesPerVnode);
//}
if (g_Dbs.db[i].dbCfg.minRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " minrows %d", g_Dbs.db[i].dbCfg.minRows);
}
if (g_Dbs.db[i].dbCfg.maxRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " maxrows %d", g_Dbs.db[i].dbCfg.maxRows);
}
if (g_Dbs.db[i].dbCfg.comp > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " comp %d", g_Dbs.db[i].dbCfg.comp);
}
if (g_Dbs.db[i].dbCfg.walLevel > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " wal %d", g_Dbs.db[i].dbCfg.walLevel);
}
if (g_Dbs.db[i].dbCfg.cacheLast > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cachelast %d", g_Dbs.db[i].dbCfg.cacheLast);
}
if (g_Dbs.db[i].dbCfg.fsync > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" fsync %d", g_Dbs.db[i].dbCfg.fsync);
}
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms")))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"us", strlen("us")))) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
}
int dataLen = 0;
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "create database if not exists %s", g_Dbs.db[i].dbName);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
taos_close(taos);
errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
return -1;
}
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
}
if (g_Dbs.db[i].dbCfg.blocks > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " blocks %d", g_Dbs.db[i].dbCfg.blocks);
}
if (g_Dbs.db[i].dbCfg.cache > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cache %d", g_Dbs.db[i].dbCfg.cache);
}
if (g_Dbs.db[i].dbCfg.days > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " days %d", g_Dbs.db[i].dbCfg.days);
}
if (g_Dbs.db[i].dbCfg.keep > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " keep %d", g_Dbs.db[i].dbCfg.keep);
}
if (g_Dbs.db[i].dbCfg.quorum > 1) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " quorum %d", g_Dbs.db[i].dbCfg.quorum);
}
if (g_Dbs.db[i].dbCfg.replica > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " replica %d", g_Dbs.db[i].dbCfg.replica);
}
if (g_Dbs.db[i].dbCfg.update > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " update %d", g_Dbs.db[i].dbCfg.update);
}
//if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) {
// dataLen += snprintf(command + dataLen,
// BUFFER_SIZE - dataLen, "tables %d ", g_Dbs.db[i].dbCfg.maxtablesPerVnode);
//}
if (g_Dbs.db[i].dbCfg.minRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " minrows %d", g_Dbs.db[i].dbCfg.minRows);
}
if (g_Dbs.db[i].dbCfg.maxRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " maxrows %d", g_Dbs.db[i].dbCfg.maxRows);
}
if (g_Dbs.db[i].dbCfg.comp > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " comp %d", g_Dbs.db[i].dbCfg.comp);
}
if (g_Dbs.db[i].dbCfg.walLevel > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " wal %d", g_Dbs.db[i].dbCfg.walLevel);
}
if (g_Dbs.db[i].dbCfg.cacheLast > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cachelast %d", g_Dbs.db[i].dbCfg.cacheLast);
}
if (g_Dbs.db[i].dbCfg.fsync > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" fsync %d", g_Dbs.db[i].dbCfg.fsync);
}
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms")))
#if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"ns", strlen("ns")))
#endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"us", strlen("us")))) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
}
debugPrint("%s() LN%d supertbl count:%"PRIu64"\n",
__func__, __LINE__, g_Dbs.db[i].superTblCount);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
taos_close(taos);
errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
return -1;
}
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
}
int validStbCount = 0;
debugPrint("%s() LN%d supertbl count:%"PRIu64"\n",
__func__, __LINE__, g_Dbs.db[i].superTblCount);
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].sTblName);
ret = queryDbExec(taos, command, NO_INSERT_TYPE, true);
int validStbCount = 0;
if ((ret != 0) || (g_Dbs.db[i].drop)) {
ret = createSuperTable(taos, g_Dbs.db[i].dbName,
&g_Dbs.db[i].superTbls[j]);
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].sTblName);
ret = queryDbExec(taos, command, NO_INSERT_TYPE, true);
if (0 != ret) {
errorPrint("create super table %"PRIu64" failed!\n\n", j);
continue;
}
}
if ((ret != 0) || (g_Dbs.db[i].drop)) {
ret = createSuperTable(taos, g_Dbs.db[i].dbName,
&g_Dbs.db[i].superTbls[j]);
ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName,
&g_Dbs.db[i].superTbls[j]);
if (0 != ret) {
errorPrint("\nget super table %s.%s info failed!\n\n",
g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName);
continue;
}
if (0 != ret) {
errorPrint("create super table %"PRIu64" failed!\n\n", j);
continue;
}
}
validStbCount ++;
}
ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName,
&g_Dbs.db[i].superTbls[j]);
if (0 != ret) {
errorPrint("\nget super table %s.%s info failed!\n\n",
g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName);
continue;
}
g_Dbs.db[i].superTblCount = validStbCount;
}
validStbCount ++;
}
taos_close(taos);
return 0;
g_Dbs.db[i].superTblCount = validStbCount;
}
taos_close(taos);
return 0;
}
static void* createTable(void *sarg)
......@@ -3530,733 +3545,733 @@ PARSE_OVER:
}
static bool getMetaFromInsertJsonFile(cJSON* root) {
bool ret = false;
cJSON* cfgdir = cJSON_GetObjectItem(root, "cfgdir");
if (cfgdir && cfgdir->type == cJSON_String && cfgdir->valuestring != NULL) {
tstrncpy(g_Dbs.cfgDir, cfgdir->valuestring, MAX_FILE_NAME_LEN);
}
cJSON* host = cJSON_GetObjectItem(root, "host");
if (host && host->type == cJSON_String && host->valuestring != NULL) {
tstrncpy(g_Dbs.host, host->valuestring, MAX_HOSTNAME_SIZE);
} else if (!host) {
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
} else {
printf("ERROR: failed to read json, host not found\n");
goto PARSE_OVER;
}
cJSON* port = cJSON_GetObjectItem(root, "port");
if (port && port->type == cJSON_Number) {
g_Dbs.port = port->valueint;
} else if (!port) {
g_Dbs.port = 6030;
}
cJSON* user = cJSON_GetObjectItem(root, "user");
if (user && user->type == cJSON_String && user->valuestring != NULL) {
tstrncpy(g_Dbs.user, user->valuestring, MAX_USERNAME_SIZE);
} else if (!user) {
tstrncpy(g_Dbs.user, "root", MAX_USERNAME_SIZE);
}
cJSON* password = cJSON_GetObjectItem(root, "password");
if (password && password->type == cJSON_String && password->valuestring != NULL) {
tstrncpy(g_Dbs.password, password->valuestring, MAX_PASSWORD_SIZE);
} else if (!password) {
tstrncpy(g_Dbs.password, "taosdata", MAX_PASSWORD_SIZE);
}
cJSON* resultfile = cJSON_GetObjectItem(root, "result_file");
if (resultfile && resultfile->type == cJSON_String && resultfile->valuestring != NULL) {
tstrncpy(g_Dbs.resultFile, resultfile->valuestring, MAX_FILE_NAME_LEN);
} else if (!resultfile) {
tstrncpy(g_Dbs.resultFile, "./insert_res.txt", MAX_FILE_NAME_LEN);
}
cJSON* threads = cJSON_GetObjectItem(root, "thread_count");
if (threads && threads->type == cJSON_Number) {
g_Dbs.threadCount = threads->valueint;
} else if (!threads) {
g_Dbs.threadCount = 1;
} else {
printf("ERROR: failed to read json, threads not found\n");
goto PARSE_OVER;
}
cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl");
if (threads2 && threads2->type == cJSON_Number) {
g_Dbs.threadCountByCreateTbl = threads2->valueint;
} else if (!threads2) {
g_Dbs.threadCountByCreateTbl = 1;
} else {
errorPrint("%s() LN%d, failed to read json, threads2 not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* gInsertInterval = cJSON_GetObjectItem(root, "insert_interval");
if (gInsertInterval && gInsertInterval->type == cJSON_Number) {
if (gInsertInterval->valueint <0) {
errorPrint("%s() LN%d, failed to read json, insert interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_args.insert_interval = gInsertInterval->valueint;
} else if (!gInsertInterval) {
g_args.insert_interval = 0;
} else {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* interlaceRows = cJSON_GetObjectItem(root, "interlace_rows");
if (interlaceRows && interlaceRows->type == cJSON_Number) {
if (interlaceRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
bool ret = false;
cJSON* cfgdir = cJSON_GetObjectItem(root, "cfgdir");
if (cfgdir && cfgdir->type == cJSON_String && cfgdir->valuestring != NULL) {
tstrncpy(g_Dbs.cfgDir, cfgdir->valuestring, MAX_FILE_NAME_LEN);
}
g_args.interlace_rows = interlaceRows->valueint;
} else if (!interlaceRows) {
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len");
if (maxSqlLen && maxSqlLen->type == cJSON_Number) {
if (maxSqlLen->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_args.max_sql_len = maxSqlLen->valueint;
} else if (!maxSqlLen) {
g_args.max_sql_len = (1024*1024);
} else {
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req");
if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
if (numRecPerReq->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
} else if (numRecPerReq->valueint > MAX_RECORDS_PER_REQ) {
printf("NOTICE: number of records per request value %"PRIu64" > %d\n\n",
numRecPerReq->valueint, MAX_RECORDS_PER_REQ);
printf(" number of records per request value will be set to %d\n\n",
MAX_RECORDS_PER_REQ);
prompt();
numRecPerReq->valueint = MAX_RECORDS_PER_REQ;
}
g_args.num_of_RPR = numRecPerReq->valueint;
} else if (!numRecPerReq) {
g_args.num_of_RPR = MAX_RECORDS_PER_REQ;
} else {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
if (answerPrompt
&& answerPrompt->type == cJSON_String
&& answerPrompt->valuestring != NULL) {
if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) {
g_args.answer_yes = false;
} else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) {
g_args.answer_yes = true;
cJSON* host = cJSON_GetObjectItem(root, "host");
if (host && host->type == cJSON_String && host->valuestring != NULL) {
tstrncpy(g_Dbs.host, host->valuestring, MAX_HOSTNAME_SIZE);
} else if (!host) {
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
} else {
g_args.answer_yes = false;
printf("ERROR: failed to read json, host not found\n");
goto PARSE_OVER;
}
} else if (!answerPrompt) {
g_args.answer_yes = true; // default is no, mean answer_yes.
} else {
errorPrint("%s", "failed to read json, confirm_parameter_prompt input mistake\n");
goto PARSE_OVER;
}
// rows per table need be less than insert batch
if (g_args.interlace_rows > g_args.num_of_RPR) {
printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n",
g_args.interlace_rows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n",
g_args.num_of_RPR);
prompt();
g_args.interlace_rows = g_args.num_of_RPR;
}
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (!dbs || dbs->type != cJSON_Array) {
printf("ERROR: failed to read json, databases not found\n");
goto PARSE_OVER;
}
int dbSize = cJSON_GetArraySize(dbs);
if (dbSize > MAX_DB_COUNT) {
errorPrint(
"ERROR: failed to read json, databases size overflow, max database is %d\n",
MAX_DB_COUNT);
goto PARSE_OVER;
}
g_Dbs.dbCount = dbSize;
for (int i = 0; i < dbSize; ++i) {
cJSON* dbinfos = cJSON_GetArrayItem(dbs, i);
if (dbinfos == NULL) continue;
// dbinfo
cJSON *dbinfo = cJSON_GetObjectItem(dbinfos, "dbinfo");
if (!dbinfo || dbinfo->type != cJSON_Object) {
printf("ERROR: failed to read json, dbinfo not found\n");
goto PARSE_OVER;
cJSON* port = cJSON_GetObjectItem(root, "port");
if (port && port->type == cJSON_Number) {
g_Dbs.port = port->valueint;
} else if (!port) {
g_Dbs.port = 6030;
}
cJSON *dbName = cJSON_GetObjectItem(dbinfo, "name");
if (!dbName || dbName->type != cJSON_String || dbName->valuestring == NULL) {
printf("ERROR: failed to read json, db name not found\n");
goto PARSE_OVER;
cJSON* user = cJSON_GetObjectItem(root, "user");
if (user && user->type == cJSON_String && user->valuestring != NULL) {
tstrncpy(g_Dbs.user, user->valuestring, MAX_USERNAME_SIZE);
} else if (!user) {
tstrncpy(g_Dbs.user, "root", MAX_USERNAME_SIZE);
}
tstrncpy(g_Dbs.db[i].dbName, dbName->valuestring, TSDB_DB_NAME_LEN);
cJSON *drop = cJSON_GetObjectItem(dbinfo, "drop");
if (drop && drop->type == cJSON_String && drop->valuestring != NULL) {
if (0 == strncasecmp(drop->valuestring, "yes", strlen("yes"))) {
g_Dbs.db[i].drop = true;
} else {
g_Dbs.db[i].drop = false;
}
} else if (!drop) {
g_Dbs.db[i].drop = g_args.drop_database;
} else {
errorPrint("%s() LN%d, failed to read json, drop input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
cJSON* password = cJSON_GetObjectItem(root, "password");
if (password && password->type == cJSON_String && password->valuestring != NULL) {
tstrncpy(g_Dbs.password, password->valuestring, MAX_PASSWORD_SIZE);
} else if (!password) {
tstrncpy(g_Dbs.password, "taosdata", MAX_PASSWORD_SIZE);
}
cJSON *precision = cJSON_GetObjectItem(dbinfo, "precision");
if (precision && precision->type == cJSON_String
&& precision->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].dbCfg.precision, precision->valuestring,
8);
} else if (!precision) {
memset(g_Dbs.db[i].dbCfg.precision, 0, 8);
} else {
printf("ERROR: failed to read json, precision not found\n");
goto PARSE_OVER;
cJSON* resultfile = cJSON_GetObjectItem(root, "result_file");
if (resultfile && resultfile->type == cJSON_String && resultfile->valuestring != NULL) {
tstrncpy(g_Dbs.resultFile, resultfile->valuestring, MAX_FILE_NAME_LEN);
} else if (!resultfile) {
tstrncpy(g_Dbs.resultFile, "./insert_res.txt", MAX_FILE_NAME_LEN);
}
cJSON* update = cJSON_GetObjectItem(dbinfo, "update");
if (update && update->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.update = update->valueint;
} else if (!update) {
g_Dbs.db[i].dbCfg.update = -1;
} else {
printf("ERROR: failed to read json, update not found\n");
goto PARSE_OVER;
}
cJSON* replica = cJSON_GetObjectItem(dbinfo, "replica");
if (replica && replica->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.replica = replica->valueint;
} else if (!replica) {
g_Dbs.db[i].dbCfg.replica = -1;
} else {
printf("ERROR: failed to read json, replica not found\n");
goto PARSE_OVER;
}
cJSON* keep = cJSON_GetObjectItem(dbinfo, "keep");
if (keep && keep->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.keep = keep->valueint;
} else if (!keep) {
g_Dbs.db[i].dbCfg.keep = -1;
cJSON* threads = cJSON_GetObjectItem(root, "thread_count");
if (threads && threads->type == cJSON_Number) {
g_Dbs.threadCount = threads->valueint;
} else if (!threads) {
g_Dbs.threadCount = 1;
} else {
printf("ERROR: failed to read json, keep not found\n");
goto PARSE_OVER;
printf("ERROR: failed to read json, threads not found\n");
goto PARSE_OVER;
}
cJSON* days = cJSON_GetObjectItem(dbinfo, "days");
if (days && days->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.days = days->valueint;
} else if (!days) {
g_Dbs.db[i].dbCfg.days = -1;
cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl");
if (threads2 && threads2->type == cJSON_Number) {
g_Dbs.threadCountByCreateTbl = threads2->valueint;
} else if (!threads2) {
g_Dbs.threadCountByCreateTbl = 1;
} else {
printf("ERROR: failed to read json, days not found\n");
goto PARSE_OVER;
errorPrint("%s() LN%d, failed to read json, threads2 not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* cache = cJSON_GetObjectItem(dbinfo, "cache");
if (cache && cache->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.cache = cache->valueint;
} else if (!cache) {
g_Dbs.db[i].dbCfg.cache = -1;
cJSON* gInsertInterval = cJSON_GetObjectItem(root, "insert_interval");
if (gInsertInterval && gInsertInterval->type == cJSON_Number) {
if (gInsertInterval->valueint <0) {
errorPrint("%s() LN%d, failed to read json, insert interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_args.insert_interval = gInsertInterval->valueint;
} else if (!gInsertInterval) {
g_args.insert_interval = 0;
} else {
printf("ERROR: failed to read json, cache not found\n");
goto PARSE_OVER;
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* blocks= cJSON_GetObjectItem(dbinfo, "blocks");
if (blocks && blocks->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.blocks = blocks->valueint;
} else if (!blocks) {
g_Dbs.db[i].dbCfg.blocks = -1;
} else {
printf("ERROR: failed to read json, block not found\n");
goto PARSE_OVER;
}
//cJSON* maxtablesPerVnode= cJSON_GetObjectItem(dbinfo, "maxtablesPerVnode");
//if (maxtablesPerVnode && maxtablesPerVnode->type == cJSON_Number) {
// g_Dbs.db[i].dbCfg.maxtablesPerVnode = maxtablesPerVnode->valueint;
//} else if (!maxtablesPerVnode) {
// g_Dbs.db[i].dbCfg.maxtablesPerVnode = TSDB_DEFAULT_TABLES;
//} else {
// printf("failed to read json, maxtablesPerVnode not found");
// goto PARSE_OVER;
//}
cJSON* minRows= cJSON_GetObjectItem(dbinfo, "minRows");
if (minRows && minRows->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.minRows = minRows->valueint;
} else if (!minRows) {
g_Dbs.db[i].dbCfg.minRows = 0; // 0 means default
} else {
printf("ERROR: failed to read json, minRows not found\n");
goto PARSE_OVER;
}
cJSON* interlaceRows = cJSON_GetObjectItem(root, "interlace_rows");
if (interlaceRows && interlaceRows->type == cJSON_Number) {
if (interlaceRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
cJSON* maxRows= cJSON_GetObjectItem(dbinfo, "maxRows");
if (maxRows && maxRows->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.maxRows = maxRows->valueint;
} else if (!maxRows) {
g_Dbs.db[i].dbCfg.maxRows = 0; // 0 means default
}
g_args.interlace_rows = interlaceRows->valueint;
} else if (!interlaceRows) {
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
printf("ERROR: failed to read json, maxRows not found\n");
goto PARSE_OVER;
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* comp= cJSON_GetObjectItem(dbinfo, "comp");
if (comp && comp->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.comp = comp->valueint;
} else if (!comp) {
g_Dbs.db[i].dbCfg.comp = -1;
cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len");
if (maxSqlLen && maxSqlLen->type == cJSON_Number) {
if (maxSqlLen->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_args.max_sql_len = maxSqlLen->valueint;
} else if (!maxSqlLen) {
g_args.max_sql_len = (1024*1024);
} else {
printf("ERROR: failed to read json, comp not found\n");
goto PARSE_OVER;
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* walLevel= cJSON_GetObjectItem(dbinfo, "walLevel");
if (walLevel && walLevel->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.walLevel = walLevel->valueint;
} else if (!walLevel) {
g_Dbs.db[i].dbCfg.walLevel = -1;
cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req");
if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
if (numRecPerReq->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
} else if (numRecPerReq->valueint > MAX_RECORDS_PER_REQ) {
printf("NOTICE: number of records per request value %"PRIu64" > %d\n\n",
numRecPerReq->valueint, MAX_RECORDS_PER_REQ);
printf(" number of records per request value will be set to %d\n\n",
MAX_RECORDS_PER_REQ);
prompt();
numRecPerReq->valueint = MAX_RECORDS_PER_REQ;
}
g_args.num_of_RPR = numRecPerReq->valueint;
} else if (!numRecPerReq) {
g_args.num_of_RPR = MAX_RECORDS_PER_REQ;
} else {
printf("ERROR: failed to read json, walLevel not found\n");
goto PARSE_OVER;
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* cacheLast= cJSON_GetObjectItem(dbinfo, "cachelast");
if (cacheLast && cacheLast->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.cacheLast = cacheLast->valueint;
} else if (!cacheLast) {
g_Dbs.db[i].dbCfg.cacheLast = -1;
cJSON *answerPrompt = cJSON_GetObjectItem(root, "confirm_parameter_prompt"); // yes, no,
if (answerPrompt
&& answerPrompt->type == cJSON_String
&& answerPrompt->valuestring != NULL) {
if (0 == strncasecmp(answerPrompt->valuestring, "yes", 3)) {
g_args.answer_yes = false;
} else if (0 == strncasecmp(answerPrompt->valuestring, "no", 2)) {
g_args.answer_yes = true;
} else {
g_args.answer_yes = false;
}
} else if (!answerPrompt) {
g_args.answer_yes = true; // default is no, mean answer_yes.
} else {
printf("ERROR: failed to read json, cacheLast not found\n");
goto PARSE_OVER;
errorPrint("%s", "failed to read json, confirm_parameter_prompt input mistake\n");
goto PARSE_OVER;
}
cJSON* quorum= cJSON_GetObjectItem(dbinfo, "quorum");
if (quorum && quorum->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.quorum = quorum->valueint;
} else if (!quorum) {
g_Dbs.db[i].dbCfg.quorum = 1;
} else {
printf("failed to read json, quorum input mistake");
goto PARSE_OVER;
// rows per table need be less than insert batch
if (g_args.interlace_rows > g_args.num_of_RPR) {
printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n",
g_args.interlace_rows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n",
g_args.num_of_RPR);
prompt();
g_args.interlace_rows = g_args.num_of_RPR;
}
cJSON* fsync= cJSON_GetObjectItem(dbinfo, "fsync");
if (fsync && fsync->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.fsync = fsync->valueint;
} else if (!fsync) {
g_Dbs.db[i].dbCfg.fsync = -1;
} else {
errorPrint("%s() LN%d, failed to read json, fsync input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (!dbs || dbs->type != cJSON_Array) {
printf("ERROR: failed to read json, databases not found\n");
goto PARSE_OVER;
}
// super_talbes
cJSON *stables = cJSON_GetObjectItem(dbinfos, "super_tables");
if (!stables || stables->type != cJSON_Array) {
errorPrint("%s() LN%d, failed to read json, super_tables not found\n",
__func__, __LINE__);
goto PARSE_OVER;
int dbSize = cJSON_GetArraySize(dbs);
if (dbSize > MAX_DB_COUNT) {
errorPrint(
"ERROR: failed to read json, databases size overflow, max database is %d\n",
MAX_DB_COUNT);
goto PARSE_OVER;
}
int stbSize = cJSON_GetArraySize(stables);
if (stbSize > MAX_SUPER_TABLE_COUNT) {
errorPrint(
"%s() LN%d, failed to read json, supertable size overflow, max supertable is %d\n",
__func__, __LINE__, MAX_SUPER_TABLE_COUNT);
goto PARSE_OVER;
}
g_Dbs.dbCount = dbSize;
for (int i = 0; i < dbSize; ++i) {
cJSON* dbinfos = cJSON_GetArrayItem(dbs, i);
if (dbinfos == NULL) continue;
g_Dbs.db[i].superTblCount = stbSize;
for (int j = 0; j < stbSize; ++j) {
cJSON* stbInfo = cJSON_GetArrayItem(stables, j);
if (stbInfo == NULL) continue;
// dbinfo
cJSON *dbinfo = cJSON_GetObjectItem(dbinfos, "dbinfo");
if (!dbinfo || dbinfo->type != cJSON_Object) {
printf("ERROR: failed to read json, dbinfo not found\n");
goto PARSE_OVER;
}
// dbinfo
cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name");
if (!stbName || stbName->type != cJSON_String
|| stbName->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, stb name not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring,
TSDB_TABLE_NAME_LEN);
cJSON *dbName = cJSON_GetObjectItem(dbinfo, "name");
if (!dbName || dbName->type != cJSON_String || dbName->valuestring == NULL) {
printf("ERROR: failed to read json, db name not found\n");
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].dbName, dbName->valuestring, TSDB_DB_NAME_LEN);
cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix");
if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) {
printf("ERROR: failed to read json, childtable_prefix not found\n");
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring,
TSDB_TABLE_NAME_LEN - 20);
cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table");
if (autoCreateTbl
&& autoCreateTbl->type == cJSON_String
&& autoCreateTbl->valuestring != NULL) {
if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3))
&& (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) {
g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL;
} else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} else {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
}
} else if (!autoCreateTbl) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} else {
printf("ERROR: failed to read json, auto_create_table not found\n");
goto PARSE_OVER;
}
cJSON *drop = cJSON_GetObjectItem(dbinfo, "drop");
if (drop && drop->type == cJSON_String && drop->valuestring != NULL) {
if (0 == strncasecmp(drop->valuestring, "yes", strlen("yes"))) {
g_Dbs.db[i].drop = true;
} else {
g_Dbs.db[i].drop = false;
}
} else if (!drop) {
g_Dbs.db[i].drop = g_args.drop_database;
} else {
errorPrint("%s() LN%d, failed to read json, drop input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
} else if (!batchCreateTbl) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = 1000;
} else {
printf("ERROR: failed to read json, batch_create_tbl_num not found\n");
goto PARSE_OVER;
}
cJSON *precision = cJSON_GetObjectItem(dbinfo, "precision");
if (precision && precision->type == cJSON_String
&& precision->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].dbCfg.precision, precision->valuestring,
8);
} else if (!precision) {
memset(g_Dbs.db[i].dbCfg.precision, 0, 8);
} else {
printf("ERROR: failed to read json, precision not found\n");
goto PARSE_OVER;
}
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
if (childTblExists
&& childTblExists->type == cJSON_String
&& childTblExists->valuestring != NULL) {
if ((0 == strncasecmp(childTblExists->valuestring, "yes", 3))
&& (g_Dbs.db[i].drop == false)) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_ALREADY_EXISTS;
} else if ((0 == strncasecmp(childTblExists->valuestring, "no", 2)
|| (g_Dbs.db[i].drop == true))) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
cJSON* update = cJSON_GetObjectItem(dbinfo, "update");
if (update && update->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.update = update->valueint;
} else if (!update) {
g_Dbs.db[i].dbCfg.update = -1;
} else {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
printf("ERROR: failed to read json, update not found\n");
goto PARSE_OVER;
}
} else if (!childTblExists) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
} else {
errorPrint("%s() LN%d, failed to read json, child_table_exists not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
}
cJSON* replica = cJSON_GetObjectItem(dbinfo, "replica");
if (replica && replica->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.replica = replica->valueint;
} else if (!replica) {
g_Dbs.db[i].dbCfg.replica = -1;
} else {
printf("ERROR: failed to read json, replica not found\n");
goto PARSE_OVER;
}
cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count");
if (!count || count->type != cJSON_Number || 0 >= count->valueint) {
errorPrint("%s() LN%d, failed to read json, childtable_count input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblCount = count->valueint;
cJSON *dataSource = cJSON_GetObjectItem(stbInfo, "data_source");
if (dataSource && dataSource->type == cJSON_String
&& dataSource->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource,
dataSource->valuestring, TSDB_DB_NAME_LEN);
} else if (!dataSource) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", TSDB_DB_NAME_LEN);
} else {
errorPrint("%s() LN%d, failed to read json, data_source not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* keep = cJSON_GetObjectItem(dbinfo, "keep");
if (keep && keep->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.keep = keep->valueint;
} else if (!keep) {
g_Dbs.db[i].dbCfg.keep = -1;
} else {
printf("ERROR: failed to read json, keep not found\n");
goto PARSE_OVER;
}
cJSON *stbIface = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt
if (stbIface && stbIface->type == cJSON_String
&& stbIface->valuestring != NULL) {
if (0 == strcasecmp(stbIface->valuestring, "taosc")) {
g_Dbs.db[i].superTbls[j].iface= TAOSC_IFACE;
} else if (0 == strcasecmp(stbIface->valuestring, "rest")) {
g_Dbs.db[i].superTbls[j].iface= REST_IFACE;
#if STMT_IFACE_ENABLED == 1
} else if (0 == strcasecmp(stbIface->valuestring, "stmt")) {
g_Dbs.db[i].superTbls[j].iface= STMT_IFACE;
#endif
cJSON* days = cJSON_GetObjectItem(dbinfo, "days");
if (days && days->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.days = days->valueint;
} else if (!days) {
g_Dbs.db[i].dbCfg.days = -1;
} else {
errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n",
__func__, __LINE__, stbIface->valuestring);
printf("ERROR: failed to read json, days not found\n");
goto PARSE_OVER;
}
} else if (!stbIface) {
g_Dbs.db[i].superTbls[j].iface = TAOSC_IFACE;
} else {
errorPrint("%s", "failed to read json, insert_mode not found\n");
goto PARSE_OVER;
}
cJSON* childTbl_limit = cJSON_GetObjectItem(stbInfo, "childtable_limit");
if ((childTbl_limit) && (g_Dbs.db[i].drop != true)
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) {
if (childTbl_limit->type != cJSON_Number) {
printf("ERROR: failed to read json, childtable_limit\n");
cJSON* cache = cJSON_GetObjectItem(dbinfo, "cache");
if (cache && cache->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.cache = cache->valueint;
} else if (!cache) {
g_Dbs.db[i].dbCfg.cache = -1;
} else {
printf("ERROR: failed to read json, cache not found\n");
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblLimit = childTbl_limit->valueint;
} else {
g_Dbs.db[i].superTbls[j].childTblLimit = -1; // select ... limit -1 means all query result, drop = yes mean all table need recreate, limit value is invalid.
}
cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset");
if ((childTbl_offset) && (g_Dbs.db[i].drop != true)
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) {
if ((childTbl_offset->type != cJSON_Number)
|| (0 > childTbl_offset->valueint)) {
printf("ERROR: failed to read json, childtable_offset\n");
cJSON* blocks= cJSON_GetObjectItem(dbinfo, "blocks");
if (blocks && blocks->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.blocks = blocks->valueint;
} else if (!blocks) {
g_Dbs.db[i].dbCfg.blocks = -1;
} else {
printf("ERROR: failed to read json, block not found\n");
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblOffset = childTbl_offset->valueint;
} else {
g_Dbs.db[i].superTbls[j].childTblOffset = 0;
}
cJSON *ts = cJSON_GetObjectItem(stbInfo, "start_timestamp");
if (ts && ts->type == cJSON_String && ts->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
ts->valuestring, TSDB_DB_NAME_LEN);
} else if (!ts) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
"now", TSDB_DB_NAME_LEN);
} else {
printf("ERROR: failed to read json, start_timestamp not found\n");
goto PARSE_OVER;
}
//cJSON* maxtablesPerVnode= cJSON_GetObjectItem(dbinfo, "maxtablesPerVnode");
//if (maxtablesPerVnode && maxtablesPerVnode->type == cJSON_Number) {
// g_Dbs.db[i].dbCfg.maxtablesPerVnode = maxtablesPerVnode->valueint;
//} else if (!maxtablesPerVnode) {
// g_Dbs.db[i].dbCfg.maxtablesPerVnode = TSDB_DEFAULT_TABLES;
//} else {
// printf("failed to read json, maxtablesPerVnode not found");
// goto PARSE_OVER;
//}
cJSON* minRows= cJSON_GetObjectItem(dbinfo, "minRows");
if (minRows && minRows->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.minRows = minRows->valueint;
} else if (!minRows) {
g_Dbs.db[i].dbCfg.minRows = 0; // 0 means default
} else {
printf("ERROR: failed to read json, minRows not found\n");
goto PARSE_OVER;
}
cJSON* timestampStep = cJSON_GetObjectItem(stbInfo, "timestamp_step");
if (timestampStep && timestampStep->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].timeStampStep = timestampStep->valueint;
} else if (!timestampStep) {
g_Dbs.db[i].superTbls[j].timeStampStep = DEFAULT_TIMESTAMP_STEP;
} else {
printf("ERROR: failed to read json, timestamp_step not found\n");
goto PARSE_OVER;
}
cJSON* maxRows= cJSON_GetObjectItem(dbinfo, "maxRows");
if (maxRows && maxRows->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.maxRows = maxRows->valueint;
} else if (!maxRows) {
g_Dbs.db[i].dbCfg.maxRows = 0; // 0 means default
} else {
printf("ERROR: failed to read json, maxRows not found\n");
goto PARSE_OVER;
}
cJSON *sampleFormat = cJSON_GetObjectItem(stbInfo, "sample_format");
if (sampleFormat && sampleFormat->type
== cJSON_String && sampleFormat->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat,
sampleFormat->valuestring, TSDB_DB_NAME_LEN);
} else if (!sampleFormat) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, "csv", TSDB_DB_NAME_LEN);
} else {
printf("ERROR: failed to read json, sample_format not found\n");
goto PARSE_OVER;
}
cJSON* comp= cJSON_GetObjectItem(dbinfo, "comp");
if (comp && comp->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.comp = comp->valueint;
} else if (!comp) {
g_Dbs.db[i].dbCfg.comp = -1;
} else {
printf("ERROR: failed to read json, comp not found\n");
goto PARSE_OVER;
}
cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file");
if (sampleFile && sampleFile->type == cJSON_String
&& sampleFile->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile,
sampleFile->valuestring, MAX_FILE_NAME_LEN);
} else if (!sampleFile) {
memset(g_Dbs.db[i].superTbls[j].sampleFile, 0, MAX_FILE_NAME_LEN);
} else {
printf("ERROR: failed to read json, sample_file not found\n");
goto PARSE_OVER;
}
cJSON* walLevel= cJSON_GetObjectItem(dbinfo, "walLevel");
if (walLevel && walLevel->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.walLevel = walLevel->valueint;
} else if (!walLevel) {
g_Dbs.db[i].dbCfg.walLevel = -1;
} else {
printf("ERROR: failed to read json, walLevel not found\n");
goto PARSE_OVER;
}
cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file");
if ((tagsFile && tagsFile->type == cJSON_String)
&& (tagsFile->valuestring != NULL)) {
tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile,
tagsFile->valuestring, MAX_FILE_NAME_LEN);
if (0 == g_Dbs.db[i].superTbls[j].tagsFile[0]) {
g_Dbs.db[i].superTbls[j].tagSource = 0;
cJSON* cacheLast= cJSON_GetObjectItem(dbinfo, "cachelast");
if (cacheLast && cacheLast->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.cacheLast = cacheLast->valueint;
} else if (!cacheLast) {
g_Dbs.db[i].dbCfg.cacheLast = -1;
} else {
g_Dbs.db[i].superTbls[j].tagSource = 1;
printf("ERROR: failed to read json, cacheLast not found\n");
goto PARSE_OVER;
}
} else if (!tagsFile) {
memset(g_Dbs.db[i].superTbls[j].tagsFile, 0, MAX_FILE_NAME_LEN);
g_Dbs.db[i].superTbls[j].tagSource = 0;
} else {
printf("ERROR: failed to read json, tags_file not found\n");
goto PARSE_OVER;
}
cJSON* stbMaxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len");
if (stbMaxSqlLen && stbMaxSqlLen->type == cJSON_Number) {
int32_t len = stbMaxSqlLen->valueint;
if (len > TSDB_MAX_ALLOWED_SQL_LEN) {
len = TSDB_MAX_ALLOWED_SQL_LEN;
} else if (len < 5) {
len = 5;
}
g_Dbs.db[i].superTbls[j].maxSqlLen = len;
} else if (!maxSqlLen) {
g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len;
} else {
errorPrint("%s() LN%d, failed to read json, stbMaxSqlLen input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
/*
cJSON *multiThreadWriteOneTbl =
cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes
if (multiThreadWriteOneTbl
&& multiThreadWriteOneTbl->type == cJSON_String
&& multiThreadWriteOneTbl->valuestring != NULL) {
if (0 == strncasecmp(multiThreadWriteOneTbl->valuestring, "yes", 3)) {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 1;
cJSON* quorum= cJSON_GetObjectItem(dbinfo, "quorum");
if (quorum && quorum->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.quorum = quorum->valueint;
} else if (!quorum) {
g_Dbs.db[i].dbCfg.quorum = 1;
} else {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0;
printf("failed to read json, quorum input mistake");
goto PARSE_OVER;
}
} else if (!multiThreadWriteOneTbl) {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0;
} else {
printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n");
goto PARSE_OVER;
}
*/
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) {
if (insertRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
cJSON* fsync= cJSON_GetObjectItem(dbinfo, "fsync");
if (fsync && fsync->type == cJSON_Number) {
g_Dbs.db[i].dbCfg.fsync = fsync->valueint;
} else if (!fsync) {
g_Dbs.db[i].dbCfg.fsync = -1;
} else {
errorPrint("%s() LN%d, failed to read json, fsync input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
} else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) {
if (stbInterlaceRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, interlace rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
// super_talbes
cJSON *stables = cJSON_GetObjectItem(dbinfos, "super_tables");
if (!stables || stables->type != cJSON_Array) {
errorPrint("%s() LN%d, failed to read json, super_tables not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_Dbs.db[i].superTbls[j].insertRows) {
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > insert_rows %"PRId64"\n\n",
i, j, g_Dbs.db[i].superTbls[j].interlaceRows,
g_Dbs.db[i].superTbls[j].insertRows);
printf(" interlace rows value will be set to insert_rows %"PRId64"\n\n",
g_Dbs.db[i].superTbls[j].insertRows);
prompt();
g_Dbs.db[i].superTbls[j].interlaceRows = g_Dbs.db[i].superTbls[j].insertRows;
int stbSize = cJSON_GetArraySize(stables);
if (stbSize > MAX_SUPER_TABLE_COUNT) {
errorPrint(
"%s() LN%d, failed to read json, supertable size overflow, max supertable is %d\n",
__func__, __LINE__, MAX_SUPER_TABLE_COUNT);
goto PARSE_OVER;
}
} else if (!stbInterlaceRows) {
g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
errorPrint(
"%s() LN%d, failed to read json, interlace rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* disorderRatio = cJSON_GetObjectItem(stbInfo, "disorder_ratio");
if (disorderRatio && disorderRatio->type == cJSON_Number) {
if (disorderRatio->valueint > 50)
disorderRatio->valueint = 50;
g_Dbs.db[i].superTblCount = stbSize;
for (int j = 0; j < stbSize; ++j) {
cJSON* stbInfo = cJSON_GetArrayItem(stables, j);
if (stbInfo == NULL) continue;
if (disorderRatio->valueint < 0)
disorderRatio->valueint = 0;
// dbinfo
cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name");
if (!stbName || stbName->type != cJSON_String
|| stbName->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, stb name not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring,
TSDB_TABLE_NAME_LEN);
g_Dbs.db[i].superTbls[j].disorderRatio = disorderRatio->valueint;
} else if (!disorderRatio) {
g_Dbs.db[i].superTbls[j].disorderRatio = 0;
} else {
printf("ERROR: failed to read json, disorderRatio not found\n");
goto PARSE_OVER;
}
cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix");
if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) {
printf("ERROR: failed to read json, childtable_prefix not found\n");
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring,
TSDB_TABLE_NAME_LEN - 20);
cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table");
if (autoCreateTbl
&& autoCreateTbl->type == cJSON_String
&& autoCreateTbl->valuestring != NULL) {
if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3))
&& (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) {
g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL;
} else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} else {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
}
} else if (!autoCreateTbl) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} else {
printf("ERROR: failed to read json, auto_create_table not found\n");
goto PARSE_OVER;
}
cJSON* disorderRange = cJSON_GetObjectItem(stbInfo, "disorder_range");
if (disorderRange && disorderRange->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].disorderRange = disorderRange->valueint;
} else if (!disorderRange) {
g_Dbs.db[i].superTbls[j].disorderRange = 1000;
} else {
printf("ERROR: failed to read json, disorderRange not found\n");
goto PARSE_OVER;
}
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
} else if (!batchCreateTbl) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = 1000;
} else {
printf("ERROR: failed to read json, batch_create_tbl_num not found\n");
goto PARSE_OVER;
}
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
if (childTblExists
&& childTblExists->type == cJSON_String
&& childTblExists->valuestring != NULL) {
if ((0 == strncasecmp(childTblExists->valuestring, "yes", 3))
&& (g_Dbs.db[i].drop == false)) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_ALREADY_EXISTS;
} else if ((0 == strncasecmp(childTblExists->valuestring, "no", 2)
|| (g_Dbs.db[i].drop == true))) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
} else {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
}
} else if (!childTblExists) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
} else {
errorPrint("%s() LN%d, failed to read json, child_table_exists not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
}
cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count");
if (!count || count->type != cJSON_Number || 0 >= count->valueint) {
errorPrint("%s() LN%d, failed to read json, childtable_count input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblCount = count->valueint;
cJSON *dataSource = cJSON_GetObjectItem(stbInfo, "data_source");
if (dataSource && dataSource->type == cJSON_String
&& dataSource->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource,
dataSource->valuestring, TSDB_DB_NAME_LEN);
} else if (!dataSource) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", TSDB_DB_NAME_LEN);
} else {
errorPrint("%s() LN%d, failed to read json, data_source not found\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON *stbIface = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt
if (stbIface && stbIface->type == cJSON_String
&& stbIface->valuestring != NULL) {
if (0 == strcasecmp(stbIface->valuestring, "taosc")) {
g_Dbs.db[i].superTbls[j].iface= TAOSC_IFACE;
} else if (0 == strcasecmp(stbIface->valuestring, "rest")) {
g_Dbs.db[i].superTbls[j].iface= REST_IFACE;
#if STMT_IFACE_ENABLED == 1
} else if (0 == strcasecmp(stbIface->valuestring, "stmt")) {
g_Dbs.db[i].superTbls[j].iface= STMT_IFACE;
#endif
} else {
errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n",
__func__, __LINE__, stbIface->valuestring);
goto PARSE_OVER;
}
} else if (!stbIface) {
g_Dbs.db[i].superTbls[j].iface = TAOSC_IFACE;
} else {
errorPrint("%s", "failed to read json, insert_mode not found\n");
goto PARSE_OVER;
}
cJSON* childTbl_limit = cJSON_GetObjectItem(stbInfo, "childtable_limit");
if ((childTbl_limit) && (g_Dbs.db[i].drop != true)
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) {
if (childTbl_limit->type != cJSON_Number) {
printf("ERROR: failed to read json, childtable_limit\n");
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblLimit = childTbl_limit->valueint;
} else {
g_Dbs.db[i].superTbls[j].childTblLimit = -1; // select ... limit -1 means all query result, drop = yes mean all table need recreate, limit value is invalid.
}
cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset");
if ((childTbl_offset) && (g_Dbs.db[i].drop != true)
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) {
if ((childTbl_offset->type != cJSON_Number)
|| (0 > childTbl_offset->valueint)) {
printf("ERROR: failed to read json, childtable_offset\n");
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblOffset = childTbl_offset->valueint;
} else {
g_Dbs.db[i].superTbls[j].childTblOffset = 0;
}
cJSON *ts = cJSON_GetObjectItem(stbInfo, "start_timestamp");
if (ts && ts->type == cJSON_String && ts->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
ts->valuestring, TSDB_DB_NAME_LEN);
} else if (!ts) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
"now", TSDB_DB_NAME_LEN);
} else {
printf("ERROR: failed to read json, start_timestamp not found\n");
goto PARSE_OVER;
}
cJSON* timestampStep = cJSON_GetObjectItem(stbInfo, "timestamp_step");
if (timestampStep && timestampStep->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].timeStampStep = timestampStep->valueint;
} else if (!timestampStep) {
g_Dbs.db[i].superTbls[j].timeStampStep = DEFAULT_TIMESTAMP_STEP;
} else {
printf("ERROR: failed to read json, timestamp_step not found\n");
goto PARSE_OVER;
}
cJSON *sampleFormat = cJSON_GetObjectItem(stbInfo, "sample_format");
if (sampleFormat && sampleFormat->type
== cJSON_String && sampleFormat->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat,
sampleFormat->valuestring, TSDB_DB_NAME_LEN);
} else if (!sampleFormat) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, "csv", TSDB_DB_NAME_LEN);
} else {
printf("ERROR: failed to read json, sample_format not found\n");
goto PARSE_OVER;
}
cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file");
if (sampleFile && sampleFile->type == cJSON_String
&& sampleFile->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile,
sampleFile->valuestring, MAX_FILE_NAME_LEN);
} else if (!sampleFile) {
memset(g_Dbs.db[i].superTbls[j].sampleFile, 0, MAX_FILE_NAME_LEN);
} else {
printf("ERROR: failed to read json, sample_file not found\n");
goto PARSE_OVER;
}
cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file");
if ((tagsFile && tagsFile->type == cJSON_String)
&& (tagsFile->valuestring != NULL)) {
tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile,
tagsFile->valuestring, MAX_FILE_NAME_LEN);
if (0 == g_Dbs.db[i].superTbls[j].tagsFile[0]) {
g_Dbs.db[i].superTbls[j].tagSource = 0;
} else {
g_Dbs.db[i].superTbls[j].tagSource = 1;
}
} else if (!tagsFile) {
memset(g_Dbs.db[i].superTbls[j].tagsFile, 0, MAX_FILE_NAME_LEN);
g_Dbs.db[i].superTbls[j].tagSource = 0;
} else {
printf("ERROR: failed to read json, tags_file not found\n");
goto PARSE_OVER;
}
cJSON* stbMaxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len");
if (stbMaxSqlLen && stbMaxSqlLen->type == cJSON_Number) {
int32_t len = stbMaxSqlLen->valueint;
if (len > TSDB_MAX_ALLOWED_SQL_LEN) {
len = TSDB_MAX_ALLOWED_SQL_LEN;
} else if (len < 5) {
len = 5;
}
g_Dbs.db[i].superTbls[j].maxSqlLen = len;
} else if (!maxSqlLen) {
g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len;
} else {
errorPrint("%s() LN%d, failed to read json, stbMaxSqlLen input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
/*
cJSON *multiThreadWriteOneTbl =
cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes
if (multiThreadWriteOneTbl
&& multiThreadWriteOneTbl->type == cJSON_String
&& multiThreadWriteOneTbl->valuestring != NULL) {
if (0 == strncasecmp(multiThreadWriteOneTbl->valuestring, "yes", 3)) {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 1;
} else {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0;
}
} else if (!multiThreadWriteOneTbl) {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0;
} else {
printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n");
goto PARSE_OVER;
}
*/
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) {
if (insertRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
} else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) {
if (stbInterlaceRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, interlace rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_Dbs.db[i].superTbls[j].insertRows) {
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > insert_rows %"PRId64"\n\n",
i, j, g_Dbs.db[i].superTbls[j].interlaceRows,
g_Dbs.db[i].superTbls[j].insertRows);
printf(" interlace rows value will be set to insert_rows %"PRId64"\n\n",
g_Dbs.db[i].superTbls[j].insertRows);
prompt();
g_Dbs.db[i].superTbls[j].interlaceRows = g_Dbs.db[i].superTbls[j].insertRows;
}
} else if (!stbInterlaceRows) {
g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
errorPrint(
"%s() LN%d, failed to read json, interlace rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* disorderRatio = cJSON_GetObjectItem(stbInfo, "disorder_ratio");
if (disorderRatio && disorderRatio->type == cJSON_Number) {
if (disorderRatio->valueint > 50)
disorderRatio->valueint = 50;
if (disorderRatio->valueint < 0)
disorderRatio->valueint = 0;
g_Dbs.db[i].superTbls[j].disorderRatio = disorderRatio->valueint;
} else if (!disorderRatio) {
g_Dbs.db[i].superTbls[j].disorderRatio = 0;
} else {
printf("ERROR: failed to read json, disorderRatio not found\n");
goto PARSE_OVER;
}
cJSON* disorderRange = cJSON_GetObjectItem(stbInfo, "disorder_range");
if (disorderRange && disorderRange->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].disorderRange = disorderRange->valueint;
} else if (!disorderRange) {
g_Dbs.db[i].superTbls[j].disorderRange = 1000;
} else {
printf("ERROR: failed to read json, disorderRange not found\n");
goto PARSE_OVER;
}
cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval");
if (insertInterval && insertInterval->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint;
if (insertInterval->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
} else if (!insertInterval) {
verbosePrint("%s() LN%d: stable insert interval be overrided by global %"PRIu64".\n",
__func__, __LINE__, g_args.insert_interval);
g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval;
} else {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval");
if (insertInterval && insertInterval->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint;
if (insertInterval->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
int retVal = getColumnAndTagTypeFromInsertJsonFile(
stbInfo, &g_Dbs.db[i].superTbls[j]);
if (false == retVal) {
goto PARSE_OVER;
}
}
} else if (!insertInterval) {
verbosePrint("%s() LN%d: stable insert interval be overrided by global %"PRIu64".\n",
__func__, __LINE__, g_args.insert_interval);
g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval;
} else {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
int retVal = getColumnAndTagTypeFromInsertJsonFile(
stbInfo, &g_Dbs.db[i].superTbls[j]);
if (false == retVal) {
goto PARSE_OVER;
}
}
}
ret = true;
ret = true;
PARSE_OVER:
return ret;
return ret;
}
static bool getMetaFromQueryJsonFile(cJSON* root) {
......@@ -6535,349 +6550,351 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
static void startMultiThreadInsertData(int threads, char* db_name,
char* precision, SSuperTable* superTblInfo) {
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) {
if (0 == strncasecmp(precision, "ms", 2)) {
timePrec = TSDB_TIME_PRECISION_MILLI;
} else if (0 == strncasecmp(precision, "us", 2)) {
timePrec = TSDB_TIME_PRECISION_MICRO;
} else if (0 == strncasecmp(precision, "ns", 2)) {
timePrec = TSDB_TIME_PRECISION_NANO;
} else {
errorPrint("Not support precision: %s\n", precision);
exit(-1);
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) {
if (0 == strncasecmp(precision, "ms", 2)) {
timePrec = TSDB_TIME_PRECISION_MILLI;
} else if (0 == strncasecmp(precision, "us", 2)) {
timePrec = TSDB_TIME_PRECISION_MICRO;
#if NANO_SECOND_ENABLED == 1
} else if (0 == strncasecmp(precision, "ns", 2)) {
timePrec = TSDB_TIME_PRECISION_NANO;
#endif
} else {
errorPrint("Not support precision: %s\n", precision);
exit(-1);
}
}
}
int64_t start_time;
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
start_time = taosGetTimestamp(timePrec);
int64_t start_time;
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
start_time = taosGetTimestamp(timePrec);
} else {
if (TSDB_CODE_SUCCESS != taosParseTime(
superTblInfo->startTimestamp,
&start_time,
strlen(superTblInfo->startTimestamp),
timePrec, 0)) {
ERROR_EXIT("failed to parse time!\n");
}
}
} else {
if (TSDB_CODE_SUCCESS != taosParseTime(
superTblInfo->startTimestamp,
&start_time,
strlen(superTblInfo->startTimestamp),
timePrec, 0)) {
ERROR_EXIT("failed to parse time!\n");
}
start_time = 1500000000000;
}
} else {
start_time = 1500000000000;
}
int64_t start = taosGetTimestampMs();
int64_t start = taosGetTimestampMs();
// read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) {
errorPrint("%s() LN%d, prepare sample data for stable failed!\n",
__func__, __LINE__);
exit(-1);
// read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) {
errorPrint("%s() LN%d, prepare sample data for stable failed!\n",
__func__, __LINE__);
exit(-1);
}
}
}
TAOS* taos0 = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos0) {
errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
TAOS* taos0 = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos0) {
errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
__func__, __LINE__, taos_errstr(NULL));
exit(-1);
}
exit(-1);
}
int64_t ntables = 0;
uint64_t tableFrom;
int64_t ntables = 0;
uint64_t tableFrom;
if (superTblInfo) {
int64_t limit;
uint64_t offset;
if (superTblInfo) {
int64_t limit;
uint64_t offset;
if ((NULL != g_args.sqlFile) && (superTblInfo->childTblExists == TBL_NO_EXISTS) &&
((superTblInfo->childTblOffset != 0) || (superTblInfo->childTblLimit >= 0))) {
printf("WARNING: offset and limit will not be used since the child tables not exists!\n");
}
if ((NULL != g_args.sqlFile) && (superTblInfo->childTblExists == TBL_NO_EXISTS) &&
((superTblInfo->childTblOffset != 0) || (superTblInfo->childTblLimit >= 0))) {
printf("WARNING: offset and limit will not be used since the child tables not exists!\n");
}
if (superTblInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((superTblInfo->childTblLimit < 0)
|| ((superTblInfo->childTblOffset + superTblInfo->childTblLimit)
> (superTblInfo->childTblCount))) {
superTblInfo->childTblLimit =
superTblInfo->childTblCount - superTblInfo->childTblOffset;
}
if (superTblInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((superTblInfo->childTblLimit < 0)
|| ((superTblInfo->childTblOffset + superTblInfo->childTblLimit)
> (superTblInfo->childTblCount))) {
superTblInfo->childTblLimit =
superTblInfo->childTblCount - superTblInfo->childTblOffset;
}
offset = superTblInfo->childTblOffset;
limit = superTblInfo->childTblLimit;
} else {
limit = superTblInfo->childTblCount;
offset = 0;
}
offset = superTblInfo->childTblOffset;
limit = superTblInfo->childTblLimit;
} else {
limit = superTblInfo->childTblCount;
offset = 0;
}
ntables = limit;
tableFrom = offset;
ntables = limit;
tableFrom = offset;
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset + superTblInfo->childTblLimit )
> superTblInfo->childTblCount)) {
printf("WARNING: specified offset + limit > child table count!\n");
prompt();
}
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset + superTblInfo->childTblLimit )
> superTblInfo->childTblCount)) {
printf("WARNING: specified offset + limit > child table count!\n");
prompt();
}
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& (0 == superTblInfo->childTblLimit)) {
printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n");
prompt();
}
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& (0 == superTblInfo->childTblLimit)) {
printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n");
prompt();
}
superTblInfo->childTblName = (char*)calloc(1,
limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos0);
exit(-1);
}
superTblInfo->childTblName = (char*)calloc(1,
limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos0);
exit(-1);
}
int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos0,
db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount,
limit,
offset);
} else {
ntables = g_args.num_of_tables;
tableFrom = 0;
}
int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos0,
db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount,
limit,
offset);
} else {
ntables = g_args.num_of_tables;
tableFrom = 0;
}
taos_close(taos0);
taos_close(taos0);
int64_t a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int64_t a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int64_t b = 0;
if (threads != 0) {
b = ntables % threads;
}
int64_t b = 0;
if (threads != 0) {
b = ntables % threads;
}
if ((superTblInfo)
&& (superTblInfo->iface == REST_IFACE)) {
if (convertHostToServAddr(
g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
exit(-1);
}
}
if ((superTblInfo)
&& (superTblInfo->iface == REST_IFACE)) {
if (convertHostToServAddr(
g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
exit(-1);
}
}
pthread_t *pids = calloc(1, threads * sizeof(pthread_t));
assert(pids != NULL);
pthread_t *pids = calloc(1, threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
assert(infos != NULL);
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->time_precision = timePrec;
pThreadInfo->superTblInfo = superTblInfo;
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->time_precision = timePrec;
pThreadInfo->superTblInfo = superTblInfo;
pThreadInfo->start_time = start_time;
pThreadInfo->minDelay = UINT64_MAX;
if ((NULL == superTblInfo) ||
(superTblInfo->iface != REST_IFACE)) {
//t_info->taos = taos;
pThreadInfo->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == pThreadInfo->taos) {
errorPrint(
"%s() LN%d, connect to server fail from insert sub thread, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(infos);
exit(-1);
}
pThreadInfo->start_time = start_time;
pThreadInfo->minDelay = UINT64_MAX;
#if STMT_IFACE_ENABLED == 1
if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) {
if ((NULL == superTblInfo) ||
(superTblInfo->iface != REST_IFACE)) {
//t_info->taos = taos;
pThreadInfo->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == pThreadInfo->taos) {
errorPrint(
"%s() LN%d, connect to server fail from insert sub thread, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(infos);
exit(-1);
}
int columnCount;
if (superTblInfo) {
columnCount = superTblInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
#if STMT_IFACE_ENABLED == 1
if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) {
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if (NULL == pThreadInfo->stmt) {
errorPrint(
"%s() LN%d, failed init stmt, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
int columnCount;
if (superTblInfo) {
columnCount = superTblInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
char buffer[3000];
char *pstr = buffer;
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if (NULL == pThreadInfo->stmt) {
errorPrint(
"%s() LN%d, failed init stmt, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1); tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
char buffer[3000];
char *pstr = buffer;
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ")");
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1); tag ++ ) {
pstr += sprintf(pstr, ",?");
debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer);
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
if (ret != 0){
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
ret, taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
}
pstr += sprintf(pstr, ") VALUES(?");
#endif
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
pThreadInfo->taos = NULL;
}
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer);
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
if (ret != 0){
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
ret, taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
/* if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
*/
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
/* } else {
pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount;
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
}
*/
tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
} else {
pthread_create(pids + i, NULL, syncWrite, pThreadInfo);
}
}
#endif
} else {
pThreadInfo->taos = NULL;
}
/* if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
*/
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
/* } else {
pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount;
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
}
*/
tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
} else {
pthread_create(pids + i, NULL, syncWrite, pThreadInfo);
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
}
}
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
}
uint64_t totalDelay = 0;
uint64_t maxDelay = 0;
uint64_t minDelay = UINT64_MAX;
uint64_t cntDelay = 1;
double avgDelay = 0;
uint64_t totalDelay = 0;
uint64_t maxDelay = 0;
uint64_t minDelay = UINT64_MAX;
uint64_t cntDelay = 1;
double avgDelay = 0;
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
tsem_destroy(&(pThreadInfo->lock_sem));
tsem_destroy(&(pThreadInfo->lock_sem));
#if STMT_IFACE_ENABLED == 1
if (pThreadInfo->stmt) {
taos_stmt_close(pThreadInfo->stmt);
}
if (pThreadInfo->stmt) {
taos_stmt_close(pThreadInfo->stmt);
}
#endif
tsem_destroy(&(pThreadInfo->lock_sem));
taos_close(pThreadInfo->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
__func__, __LINE__,
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
if (superTblInfo) {
superTblInfo->totalAffectedRows += pThreadInfo->totalAffectedRows;
superTblInfo->totalInsertRows += pThreadInfo->totalInsertRows;
} else {
g_args.totalAffectedRows += pThreadInfo->totalAffectedRows;
g_args.totalInsertRows += pThreadInfo->totalInsertRows;
}
tsem_destroy(&(pThreadInfo->lock_sem));
taos_close(pThreadInfo->taos);
totalDelay += pThreadInfo->totalDelay;
cntDelay += pThreadInfo->cntDelay;
if (pThreadInfo->maxDelay > maxDelay) maxDelay = pThreadInfo->maxDelay;
if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay;
}
cntDelay -= 1;
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
__func__, __LINE__,
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
if (superTblInfo) {
superTblInfo->totalAffectedRows += pThreadInfo->totalAffectedRows;
superTblInfo->totalInsertRows += pThreadInfo->totalInsertRows;
} else {
g_args.totalAffectedRows += pThreadInfo->totalAffectedRows;
g_args.totalInsertRows += pThreadInfo->totalInsertRows;
}
if (cntDelay == 0) cntDelay = 1;
avgDelay = (double)totalDelay / cntDelay;
totalDelay += pThreadInfo->totalDelay;
cntDelay += pThreadInfo->cntDelay;
if (pThreadInfo->maxDelay > maxDelay) maxDelay = pThreadInfo->maxDelay;
if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay;
}
cntDelay -= 1;
int64_t end = taosGetTimestampMs();
int64_t t = end - start;
if (cntDelay == 0) cntDelay = 1;
avgDelay = (double)totalDelay / cntDelay;
double tInMs = t/1000.0;
int64_t end = taosGetTimestampMs();
int64_t t = end - start;
if (superTblInfo) {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
(tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
double tInMs = t/1000.0;
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
(tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
if (superTblInfo) {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
(tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
(tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
}
} else {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
tInMs, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
(tInMs)?
(double)(g_args.totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
tInMs, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
(tInMs)?
(double)(g_args.totalInsertRows/tInMs):FLT_MAX);
}
}
} else {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
tInMs, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
(tInMs)?
(double)(g_args.totalInsertRows/tInMs):FLT_MAX);
fprintf(stderr, "insert delay, avg: %10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
tInMs, g_args.totalInsertRows,
g_args.totalAffectedRows,
threads, db_name,
(tInMs)?
(double)(g_args.totalInsertRows/tInMs):FLT_MAX);
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay);
}
}
fprintf(stderr, "insert delay, avg: %10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay);
}
//taos_close(taos);
//taos_close(taos);
free(pids);
free(infos);
free(pids);
free(infos);
}
static void *readTable(void *sarg) {
......@@ -7035,98 +7052,98 @@ static void prompt()
static int insertTestProcess() {
setupForAnsiEscape();
int ret = printfInsertMeta();
resetAfterAnsiEscape();
setupForAnsiEscape();
int ret = printfInsertMeta();
resetAfterAnsiEscape();
if (ret == -1)
exit(EXIT_FAILURE);
if (ret == -1)
exit(EXIT_FAILURE);
debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile);
g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a");
if (NULL == g_fpOfInsertResult) {
errorPrint( "Failed to open %s for save result\n", g_Dbs.resultFile);
return -1;
}
debugPrint("%d result file: %s\n", __LINE__, g_Dbs.resultFile);
g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a");
if (NULL == g_fpOfInsertResult) {
errorPrint( "Failed to open %s for save result\n", g_Dbs.resultFile);
return -1;
}
if (g_fpOfInsertResult)
printfInsertMetaToFile(g_fpOfInsertResult);
if (g_fpOfInsertResult)
printfInsertMetaToFile(g_fpOfInsertResult);
prompt();
prompt();
init_rand_data();
init_rand_data();
// create database and super tables
if(createDatabasesAndStables() != 0) {
if (g_fpOfInsertResult)
fclose(g_fpOfInsertResult);
return -1;
}
// create database and super tables
if(createDatabasesAndStables() != 0) {
if (g_fpOfInsertResult)
fclose(g_fpOfInsertResult);
return -1;
}
// pretreatement
if (prepareSampleData() != 0) {
if (g_fpOfInsertResult)
fclose(g_fpOfInsertResult);
return -1;
}
// pretreatement
if (prepareSampleData() != 0) {
if (g_fpOfInsertResult)
fclose(g_fpOfInsertResult);
return -1;
}
double start;
double end;
double start;
double end;
// create child tables
start = taosGetTimestampMs();
createChildTables();
end = taosGetTimestampMs();
// create child tables
start = taosGetTimestampMs();
createChildTables();
end = taosGetTimestampMs();
if (g_totalChildTables > 0) {
fprintf(stderr, "Spent %.4f seconds to create %"PRId64" tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to create %"PRId64" tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
if (g_totalChildTables > 0) {
fprintf(stderr, "Spent %.4f seconds to create %"PRId64" tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to create %"PRId64" tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
}
}
}
// create sub threads for inserting data
//start = taosGetTimestampMs();
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.use_metric) {
if (g_Dbs.db[i].superTblCount > 0) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// create sub threads for inserting data
//start = taosGetTimestampMs();
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.use_metric) {
if (g_Dbs.db[i].superTblCount > 0) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
if (superTblInfo && (superTblInfo->insertRows > 0)) {
if (superTblInfo && (superTblInfo->insertRows > 0)) {
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
superTblInfo);
}
}
}
} else {
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
superTblInfo);
}
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
NULL);
}
}
} else {
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
NULL);
}
}
//end = taosGetTimestampMs();
//end = taosGetTimestampMs();
//int64_t totalInsertRows = 0;
//int64_t totalAffectedRows = 0;
//for (int i = 0; i < g_Dbs.dbCount; i++) {
// for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// totalInsertRows+= g_Dbs.db[i].superTbls[j].totalInsertRows;
// totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows;
//}
//printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalInsertRows, totalAffectedRows, g_Dbs.threadCount);
postFreeResource();
//int64_t totalInsertRows = 0;
//int64_t totalAffectedRows = 0;
//for (int i = 0; i < g_Dbs.dbCount; i++) {
// for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// totalInsertRows+= g_Dbs.db[i].superTbls[j].totalInsertRows;
// totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows;
//}
//printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalInsertRows, totalAffectedRows, g_Dbs.threadCount);
postFreeResource();
return 0;
return 0;
}
static void *specifiedTableQuery(void *sarg) {
......@@ -7962,116 +7979,116 @@ static void initOfQueryMeta() {
}
static void setParaFromArg(){
if (g_args.host) {
tstrncpy(g_Dbs.host, g_args.host, MAX_HOSTNAME_SIZE);
} else {
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
}
if (g_args.user) {
tstrncpy(g_Dbs.user, g_args.user, MAX_USERNAME_SIZE);
}
if (g_args.host) {
tstrncpy(g_Dbs.host, g_args.host, MAX_HOSTNAME_SIZE);
} else {
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_HOSTNAME_SIZE);
}
if (g_args.password) {
tstrncpy(g_Dbs.password, g_args.password, MAX_PASSWORD_SIZE);
}
if (g_args.user) {
tstrncpy(g_Dbs.user, g_args.user, MAX_USERNAME_SIZE);
}
if (g_args.port) {
g_Dbs.port = g_args.port;
}
if (g_args.password) {
tstrncpy(g_Dbs.password, g_args.password, MAX_PASSWORD_SIZE);
}
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
if (g_args.port) {
g_Dbs.port = g_args.port;
}
g_Dbs.dbCount = 1;
g_Dbs.db[0].drop = true;
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
tstrncpy(g_Dbs.db[0].dbName, g_args.database, TSDB_DB_NAME_LEN);
g_Dbs.db[0].dbCfg.replica = g_args.replica;
tstrncpy(g_Dbs.db[0].dbCfg.precision, "ms", 8);
g_Dbs.dbCount = 1;
g_Dbs.db[0].drop = true;
tstrncpy(g_Dbs.resultFile, g_args.output_file, MAX_FILE_NAME_LEN);
tstrncpy(g_Dbs.db[0].dbName, g_args.database, TSDB_DB_NAME_LEN);
g_Dbs.db[0].dbCfg.replica = g_args.replica;
tstrncpy(g_Dbs.db[0].dbCfg.precision, "ms", 8);
g_Dbs.use_metric = g_args.use_metric;
g_Dbs.insert_only = g_args.insert_only;
tstrncpy(g_Dbs.resultFile, g_args.output_file, MAX_FILE_NAME_LEN);
g_Dbs.do_aggreFunc = true;
g_Dbs.use_metric = g_args.use_metric;
g_Dbs.insert_only = g_args.insert_only;
char dataString[STRING_LEN];
char **data_type = g_args.datatype;
g_Dbs.do_aggreFunc = true;
memset(dataString, 0, STRING_LEN);
char dataString[STRING_LEN];
char **data_type = g_args.datatype;
if (strcasecmp(data_type[0], "BINARY") == 0
|| strcasecmp(data_type[0], "BOOL") == 0
|| strcasecmp(data_type[0], "NCHAR") == 0 ) {
g_Dbs.do_aggreFunc = false;
}
memset(dataString, 0, STRING_LEN);
if (g_args.use_metric) {
g_Dbs.db[0].superTblCount = 1;
tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", TSDB_TABLE_NAME_LEN);
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.asyncMode = g_args.async_mode;
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange;
g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio;
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
g_args.tb_prefix, TSDB_TABLE_NAME_LEN - 20);
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
if (g_args.iface == INTERFACE_BUT) {
g_Dbs.db[0].superTbls[0].iface = TAOSC_IFACE;
} else {
g_Dbs.db[0].superTbls[0].iface = g_args.iface;
if (strcasecmp(data_type[0], "BINARY") == 0
|| strcasecmp(data_type[0], "BOOL") == 0
|| strcasecmp(data_type[0], "NCHAR") == 0 ) {
g_Dbs.do_aggreFunc = false;
}
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
g_Dbs.db[0].superTbls[0].maxSqlLen = g_args.max_sql_len;
if (g_args.use_metric) {
g_Dbs.db[0].superTblCount = 1;
tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", TSDB_TABLE_NAME_LEN);
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.asyncMode = g_args.async_mode;
g_Dbs.db[0].superTbls[0].columnCount = 0;
for (int i = 0; i < MAX_NUM_COLUMNS; i++) {
if (data_type[i] == NULL) {
break;
}
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange;
g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio;
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
g_args.tb_prefix, TSDB_TABLE_NAME_LEN - 20);
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
data_type[i], strlen(data_type[i]) + 1);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].columnCount++;
}
if (g_args.iface == INTERFACE_BUT) {
g_Dbs.db[0].superTbls[0].iface = TAOSC_IFACE;
} else {
g_Dbs.db[0].superTbls[0].iface = g_args.iface;
}
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) {
g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR;
} else {
for (int i = g_Dbs.db[0].superTbls[0].columnCount;
i < g_args.num_of_CPR; i++) {
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
"INT", strlen("INT") + 1);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0;
g_Dbs.db[0].superTbls[0].columnCount++;
}
}
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
g_Dbs.db[0].superTbls[0].maxSqlLen = g_args.max_sql_len;
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType,
"INT", strlen("INT") + 1);
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0;
g_Dbs.db[0].superTbls[0].columnCount = 0;
for (int i = 0; i < MAX_NUM_COLUMNS; i++) {
if (data_type[i] == NULL) {
break;
}
tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType,
"BINARY", strlen("BINARY") + 1);
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].tagCount = 2;
} else {
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.db[0].superTbls[0].tagCount = 0;
}
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
data_type[i], strlen(data_type[i]) + 1);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].columnCount++;
}
if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) {
g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR;
} else {
for (int i = g_Dbs.db[0].superTbls[0].columnCount;
i < g_args.num_of_CPR; i++) {
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
"INT", strlen("INT") + 1);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0;
g_Dbs.db[0].superTbls[0].columnCount++;
}
}
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType,
"INT", strlen("INT") + 1);
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0;
tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType,
"BINARY", strlen("BINARY") + 1);
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].tagCount = 2;
} else {
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.db[0].superTbls[0].tagCount = 0;
}
}
/* Function to do regular expression check */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册