未验证 提交 26821d99 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[DONT MERGE] Hotfix/sangshuduo/td 5872 taosdemo stmt improve (#7733)

* [TD-5872]<fix>: taosdemo stmt improve.

* refactor stmt functions.

* [TD-5872]<fix>: taosdemo stmt csv perf improve.

* rand func back to early impl.

* fix windows/mac compile error.

* fix empty tag sample.

* [TD-5873]<test>add stmt’performance taosdemo testcase

* add data_type enum and stmt_batch framework.

* use data type enum and fix test case limit/offset.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
Co-authored-by: haoranc's avatartomchon <haoran920c@163.com>
上级 8cfea7d4
......@@ -20,6 +20,7 @@
#include <stdint.h>
#include <taos.h>
#include <taoserror.h>
#define _GNU_SOURCE
#define CURL_STATICLIB
......@@ -109,6 +110,14 @@ extern char configDir[];
#define DEFAULT_CHILDTABLES 10000
#define STMT_BIND_PARAM_BATCH 0
char* g_sampleDataBuf = NULL;
#if STMT_BIND_PARAM_BATCH == 1
// bind param batch
char* g_sampleBindBatchArray = NULL;
#endif
enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
......@@ -226,18 +235,20 @@ typedef struct SArguments_S {
bool performance_print;
char * output_file;
bool async_mode;
char * datatype[MAX_NUM_COLUMNS + 1];
char data_type[MAX_NUM_COLUMNS+1];
char *dataType[MAX_NUM_COLUMNS+1];
uint32_t binwidth;
uint32_t num_of_CPR;
uint32_t num_of_threads;
uint32_t columnCount;
uint64_t lenOfOneRow;
uint32_t nthreads;
uint64_t insert_interval;
uint64_t timestamp_step;
int64_t query_times;
uint32_t interlace_rows;
uint32_t num_of_RPR; // num_of_records_per_req
uint32_t reqPerReq; // num_of_records_per_req
uint64_t max_sql_len;
int64_t num_of_tables;
int64_t num_of_DPT;
int64_t ntables;
int64_t insertRows;
int abort;
uint32_t disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms, us or ns. accordig to database precision
......@@ -249,13 +260,14 @@ typedef struct SArguments_S {
typedef struct SColumn_S {
char field[TSDB_COL_NAME_LEN];
char data_type;
char dataType[DATATYPE_BUFF_LEN];
uint32_t dataLen;
char note[NOTE_BUFF_LEN];
} StrColumn;
typedef struct SSuperTable_S {
char sTblName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dataSource[SMALL_BUFF_LEN]; // rand_gen or sample
char childTblPrefix[TBNAME_PREFIX_LEN];
uint16_t childTblExists;
......@@ -291,14 +303,16 @@ typedef struct SSuperTable_S {
uint64_t lenOfTagOfOneRow;
char* sampleDataBuf;
//int sampleRowCount;
//int sampleUsePos;
uint32_t tagSource; // 0: rand, 1: tag sample
char* tagDataBuf;
uint32_t tagSampleCount;
uint32_t tagUsePos;
#if STMT_BIND_PARAM_BATCH == 1
// bind param batch
char *sampleBindBatchArray;
#endif
// statistics
uint64_t totalInsertRows;
uint64_t totalAffectedRows;
......@@ -398,7 +412,7 @@ typedef struct SpecifiedQueryInfo_S {
} SpecifiedQueryInfo;
typedef struct SuperQueryInfo_S {
char sTblName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint32_t threadCnt;
uint32_t asyncMode; // 0: sync, 1: async
......@@ -437,8 +451,17 @@ typedef struct SQueryMetaInfo_S {
typedef struct SThreadInfo_S {
TAOS * taos;
TAOS_STMT *stmt;
char* sampleBindArray;
#if STMT_BIND_PARAM_BATCH == 1
int64_t *bind_ts;
int64_t *bind_ts_array;
char *bindParams;
char *is_null;
#else
int64_t *bind_ts;
char* sampleBindArray;
#endif
int threadID;
char db_name[TSDB_DB_NAME_LEN];
uint32_t time_precision;
......@@ -614,22 +637,26 @@ SArguments g_args = {
false, // answer_yes;
"./output.txt", // output_file
0, // mode : sync or async
{TSDB_DATA_TYPE_FLOAT,
TSDB_DATA_TYPE_INT,
TSDB_DATA_TYPE_FLOAT},
{
"FLOAT", // datatype
"INT", // datatype
"FLOAT", // datatype. DEFAULT_DATATYPE_NUM is 3
"FLOAT", // dataType
"INT", // dataType
"FLOAT", // dataType. demo mode has 3 columns
},
64, // binwidth
4, // num_of_CPR
10, // num_of_connections/thread
4, // columnCount, timestamp + float + int + float
20 + FLOAT_BUFF_LEN + INT_BUFF_LEN + FLOAT_BUFF_LEN, // lenOfOneRow
8, // num_of_connections/thread
0, // insert_interval
DEFAULT_TIMESTAMP_STEP, // timestamp_step
1, // query_times
DEFAULT_INTERLACE_ROWS, // interlace_rows;
30000, // num_of_RPR
30000, // reqPerReq
(1024*1024), // max_sql_len
DEFAULT_CHILDTABLES, // num_of_tables
10000, // num_of_DPT
DEFAULT_CHILDTABLES, // ntables
10000, // insertRows
0, // abort
0, // disorderRatio
1000, // disorderRange
......@@ -1161,17 +1188,17 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2(argv[0], "T");
exit(EXIT_FAILURE);
}
arguments->num_of_threads = atoi(argv[++i]);
arguments->nthreads = atoi(argv[++i]);
} else if (0 == strncmp(argv[i], "--threads=", strlen("--threads="))) {
if (isStringNumber((char *)(argv[i] + strlen("--threads=")))) {
arguments->num_of_threads = atoi((char *)(argv[i]+strlen("--threads=")));
arguments->nthreads = atoi((char *)(argv[i]+strlen("--threads=")));
} else {
errorPrintReqArg2(argv[0], "--threads");
exit(EXIT_FAILURE);
}
} else if (0 == strncmp(argv[i], "-T", strlen("-T"))) {
if (isStringNumber((char *)(argv[i] + strlen("-T")))) {
arguments->num_of_threads = atoi((char *)(argv[i]+strlen("-T")));
arguments->nthreads = atoi((char *)(argv[i]+strlen("-T")));
} else {
errorPrintReqArg2(argv[0], "-T");
exit(EXIT_FAILURE);
......@@ -1184,7 +1211,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2(argv[0], "--threads");
exit(EXIT_FAILURE);
}
arguments->num_of_threads = atoi(argv[++i]);
arguments->nthreads = atoi(argv[++i]);
} else {
errorUnreconized(argv[0], argv[i]);
exit(EXIT_FAILURE);
......@@ -1321,17 +1348,17 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2(argv[0], "r");
exit(EXIT_FAILURE);
}
arguments->num_of_RPR = atoi(argv[++i]);
arguments->reqPerReq = atoi(argv[++i]);
} else if (0 == strncmp(argv[i], "--rec-per-req=", strlen("--rec-per-req="))) {
if (isStringNumber((char *)(argv[i] + strlen("--rec-per-req=")))) {
arguments->num_of_RPR = atoi((char *)(argv[i]+strlen("--rec-per-req=")));
arguments->reqPerReq = atoi((char *)(argv[i]+strlen("--rec-per-req=")));
} else {
errorPrintReqArg2(argv[0], "--rec-per-req");
exit(EXIT_FAILURE);
}
} else if (0 == strncmp(argv[i], "-r", strlen("-r"))) {
if (isStringNumber((char *)(argv[i] + strlen("-r")))) {
arguments->num_of_RPR = atoi((char *)(argv[i]+strlen("-r")));
arguments->reqPerReq = atoi((char *)(argv[i]+strlen("-r")));
} else {
errorPrintReqArg2(argv[0], "-r");
exit(EXIT_FAILURE);
......@@ -1344,7 +1371,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2(argv[0], "--rec-per-req");
exit(EXIT_FAILURE);
}
arguments->num_of_RPR = atoi(argv[++i]);
arguments->reqPerReq = atoi(argv[++i]);
} else {
errorUnreconized(argv[0], argv[i]);
exit(EXIT_FAILURE);
......@@ -1359,17 +1386,17 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2(argv[0], "t");
exit(EXIT_FAILURE);
}
arguments->num_of_tables = atoi(argv[++i]);
arguments->ntables = atoi(argv[++i]);
} else if (0 == strncmp(argv[i], "--tables=", strlen("--tables="))) {
if (isStringNumber((char *)(argv[i] + strlen("--tables=")))) {
arguments->num_of_tables = atoi((char *)(argv[i]+strlen("--tables=")));
arguments->ntables = atoi((char *)(argv[i]+strlen("--tables=")));
} else {
errorPrintReqArg2(argv[0], "--tables");
exit(EXIT_FAILURE);
}
} else if (0 == strncmp(argv[i], "-t", strlen("-t"))) {
if (isStringNumber((char *)(argv[i] + strlen("-t")))) {
arguments->num_of_tables = atoi((char *)(argv[i]+strlen("-t")));
arguments->ntables = atoi((char *)(argv[i]+strlen("-t")));
} else {
errorPrintReqArg2(argv[0], "-t");
exit(EXIT_FAILURE);
......@@ -1382,13 +1409,13 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2(argv[0], "--tables");
exit(EXIT_FAILURE);
}
arguments->num_of_tables = atoi(argv[++i]);
arguments->ntables = atoi(argv[++i]);
} else {
errorUnreconized(argv[0], argv[i]);
exit(EXIT_FAILURE);
}
g_totalChildTables = arguments->num_of_tables;
g_totalChildTables = arguments->ntables;
} else if ((0 == strncmp(argv[i], "-n", strlen("-n")))
|| (0 == strncmp(argv[i], "--records", strlen("--records")))) {
if (2 == strlen(argv[i])) {
......@@ -1399,17 +1426,17 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2(argv[0], "n");
exit(EXIT_FAILURE);
}
arguments->num_of_DPT = atoi(argv[++i]);
arguments->insertRows = atoi(argv[++i]);
} else if (0 == strncmp(argv[i], "--records=", strlen("--records="))) {
if (isStringNumber((char *)(argv[i] + strlen("--records=")))) {
arguments->num_of_DPT = atoi((char *)(argv[i]+strlen("--records=")));
arguments->insertRows = atoi((char *)(argv[i]+strlen("--records=")));
} else {
errorPrintReqArg2(argv[0], "--records");
exit(EXIT_FAILURE);
}
} else if (0 == strncmp(argv[i], "-n", strlen("-n"))) {
if (isStringNumber((char *)(argv[i] + strlen("-n")))) {
arguments->num_of_DPT = atoi((char *)(argv[i]+strlen("-n")));
arguments->insertRows = atoi((char *)(argv[i]+strlen("-n")));
} else {
errorPrintReqArg2(argv[0], "-n");
exit(EXIT_FAILURE);
......@@ -1422,7 +1449,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2(argv[0], "--records");
exit(EXIT_FAILURE);
}
arguments->num_of_DPT = atoi(argv[++i]);
arguments->insertRows = atoi(argv[++i]);
} else {
errorUnreconized(argv[0], argv[i]);
exit(EXIT_FAILURE);
......@@ -1460,17 +1487,17 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2(argv[0], "l");
exit(EXIT_FAILURE);
}
arguments->num_of_CPR = atoi(argv[++i]);
arguments->columnCount = atoi(argv[++i]);
} else if (0 == strncmp(argv[i], "--columns=", strlen("--columns="))) {
if (isStringNumber((char *)(argv[i] + strlen("--columns=")))) {
arguments->num_of_CPR = atoi((char *)(argv[i]+strlen("--columns=")));
arguments->columnCount = atoi((char *)(argv[i]+strlen("--columns=")));
} else {
errorPrintReqArg2(argv[0], "--columns");
exit(EXIT_FAILURE);
}
} else if (0 == strncmp(argv[i], "-l", strlen("-l"))) {
if (isStringNumber((char *)(argv[i] + strlen("-l")))) {
arguments->num_of_CPR = atoi((char *)(argv[i]+strlen("-l")));
arguments->columnCount = atoi((char *)(argv[i]+strlen("-l")));
} else {
errorPrintReqArg2(argv[0], "-l");
exit(EXIT_FAILURE);
......@@ -1483,23 +1510,25 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrintReqArg2(argv[0], "--columns");
exit(EXIT_FAILURE);
}
arguments->num_of_CPR = atoi(argv[++i]);
arguments->columnCount = atoi(argv[++i]);
} else {
errorUnreconized(argv[0], argv[i]);
exit(EXIT_FAILURE);
}
if (arguments->num_of_CPR > MAX_NUM_COLUMNS) {
if (arguments->columnCount > MAX_NUM_COLUMNS) {
printf("WARNING: max acceptible columns count is %d\n", MAX_NUM_COLUMNS);
prompt();
arguments->num_of_CPR = MAX_NUM_COLUMNS;
arguments->columnCount = MAX_NUM_COLUMNS;
}
for (int col = DEFAULT_DATATYPE_NUM; col < arguments->num_of_CPR; col ++) {
arguments->datatype[col] = "INT";
for (int col = DEFAULT_DATATYPE_NUM; col < arguments->columnCount; col ++) {
arguments->dataType[col] = "INT";
arguments->data_type[col] = TSDB_DATA_TYPE_INT;
}
for (int col = arguments->num_of_CPR; col < MAX_NUM_COLUMNS; col++) {
arguments->datatype[col] = NULL;
for (int col = arguments->columnCount; col < MAX_NUM_COLUMNS; col++) {
arguments->dataType[col] = NULL;
arguments->data_type[col] = TSDB_DATA_TYPE_NULL;
}
} else if ((0 == strncmp(argv[i], "-b", strlen("-b")))
|| (0 == strncmp(argv[i], "--data-type", strlen("--data-type")))) {
......@@ -1543,8 +1572,32 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrint("%s", "-b: Invalid data_type!\n");
exit(EXIT_FAILURE);
}
arguments->datatype[0] = dataType;
arguments->datatype[1] = NULL;
arguments->dataType[0] = dataType;
if (0 == strcasecmp(dataType, "INT")) {
arguments->data_type[0] = TSDB_DATA_TYPE_INT;
} else if (0 == strcasecmp(dataType, "TINYINT")) {
arguments->data_type[0] = TSDB_DATA_TYPE_TINYINT;
} else if (0 == strcasecmp(dataType, "SMALLINT")) {
arguments->data_type[0] = TSDB_DATA_TYPE_SMALLINT;
} else if (0 == strcasecmp(dataType, "BIGINT")) {
arguments->data_type[0] = TSDB_DATA_TYPE_BIGINT;
} else if (0 == strcasecmp(dataType, "FLOAT")) {
arguments->data_type[0] = TSDB_DATA_TYPE_FLOAT;
} else if (0 == strcasecmp(dataType, "DOUBLE")) {
arguments->data_type[0] = TSDB_DATA_TYPE_DOUBLE;
} else if (0 == strcasecmp(dataType, "BINARY")) {
arguments->data_type[0] = TSDB_DATA_TYPE_BINARY;
} else if (0 == strcasecmp(dataType, "NCHAR")) {
arguments->data_type[0] = TSDB_DATA_TYPE_NCHAR;
} else if (0 == strcasecmp(dataType, "BOOL")) {
arguments->data_type[0] = TSDB_DATA_TYPE_BOOL;
} else if (0 == strcasecmp(dataType, "TIMESTAMP")) {
arguments->data_type[0] = TSDB_DATA_TYPE_TIMESTAMP;
} else {
arguments->data_type[0] = TSDB_DATA_TYPE_NULL;
}
arguments->dataType[1] = NULL;
arguments->data_type[1] = TSDB_DATA_TYPE_NULL;
} else {
// more than one col
int index = 0;
......@@ -1567,11 +1620,37 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
errorPrint("%s", "-b: Invalid data_type!\n");
exit(EXIT_FAILURE);
}
arguments->datatype[index++] = token;
if (0 == strcasecmp(token, "INT")) {
arguments->data_type[index] = TSDB_DATA_TYPE_INT;
} else if (0 == strcasecmp(token, "FLOAT")) {
arguments->data_type[index] = TSDB_DATA_TYPE_FLOAT;
} else if (0 == strcasecmp(token, "SMALLINT")) {
arguments->data_type[index] = TSDB_DATA_TYPE_SMALLINT;
} else if (0 == strcasecmp(token, "BIGINT")) {
arguments->data_type[index] = TSDB_DATA_TYPE_BIGINT;
} else if (0 == strcasecmp(token, "DOUBLE")) {
arguments->data_type[index] = TSDB_DATA_TYPE_FLOAT;
} else if (0 == strcasecmp(token, "TINYINT")) {
arguments->data_type[index] = TSDB_DATA_TYPE_TINYINT;
} else if (0 == strcasecmp(token, "BINARY")) {
arguments->data_type[index] = TSDB_DATA_TYPE_BINARY;
} else if (0 == strcasecmp(token, "NCHAR")) {
arguments->data_type[index] = TSDB_DATA_TYPE_NCHAR;
} else if (0 == strcasecmp(token, "BOOL")) {
arguments->data_type[index] = TSDB_DATA_TYPE_BOOL;
} else if (0 == strcasecmp(token, "TIMESTAMP")) {
arguments->data_type[index] = TSDB_DATA_TYPE_TIMESTAMP;
} else {
arguments->data_type[index] = TSDB_DATA_TYPE_NULL;
}
arguments->dataType[index] = token;
index ++;
token = strsep(&running, ",");
if (index >= MAX_NUM_COLUMNS) break;
}
arguments->datatype[index] = NULL;
arguments->dataType[index] = NULL;
arguments->data_type[index] = TSDB_DATA_TYPE_NULL;
}
} else if ((0 == strncmp(argv[i], "-w", strlen("-w")))
|| (0 == strncmp(argv[i], "--binwidth", strlen("--binwidth")))) {
......@@ -1842,7 +1921,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
int columnCount;
for (columnCount = 0; columnCount < MAX_NUM_COLUMNS; columnCount ++) {
if (g_args.datatype[columnCount] == NULL) {
if (g_args.dataType[columnCount] == NULL) {
break;
}
}
......@@ -1850,7 +1929,56 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
if (0 == columnCount) {
ERROR_EXIT("data type error!");
}
g_args.num_of_CPR = columnCount;
g_args.columnCount = columnCount;
g_args.lenOfOneRow = 20; // timestamp
for (int c = 0; c < g_args.columnCount; c++) {
switch(g_args.data_type[c]) {
case TSDB_DATA_TYPE_BINARY:
g_args.lenOfOneRow += g_args.binwidth + 3;
break;
case TSDB_DATA_TYPE_NCHAR:
g_args.lenOfOneRow += g_args.binwidth + 3;
break;
case TSDB_DATA_TYPE_INT:
g_args.lenOfOneRow += INT_BUFF_LEN;
break;
case TSDB_DATA_TYPE_BIGINT:
g_args.lenOfOneRow += BIGINT_BUFF_LEN;
break;
case TSDB_DATA_TYPE_SMALLINT:
g_args.lenOfOneRow += SMALLINT_BUFF_LEN;
break;
case TSDB_DATA_TYPE_TINYINT:
g_args.lenOfOneRow += TINYINT_BUFF_LEN;
break;
case TSDB_DATA_TYPE_BOOL:
g_args.lenOfOneRow += BOOL_BUFF_LEN;
break;
case TSDB_DATA_TYPE_FLOAT:
g_args.lenOfOneRow += FLOAT_BUFF_LEN;
break;
case TSDB_DATA_TYPE_DOUBLE:
g_args.lenOfOneRow += DOUBLE_BUFF_LEN;
break;
case TSDB_DATA_TYPE_TIMESTAMP:
g_args.lenOfOneRow += TIMESTAMP_BUFF_LEN;
break;
default:
errorPrint2("get error data type : %s\n", g_args.dataType[c]);
exit(EXIT_FAILURE);
}
}
if (((arguments->debug_print) && (NULL != arguments->metaFile))
|| arguments->verbose_print) {
......@@ -1863,11 +1991,11 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
printf("# Password: %s\n", arguments->password);
printf("# Use metric: %s\n",
arguments->use_metric ? "true" : "false");
if (*(arguments->datatype)) {
if (*(arguments->dataType)) {
printf("# Specified data type: ");
for (int c = 0; c < MAX_NUM_COLUMNS; c++)
if (arguments->datatype[c])
printf("%s,", arguments->datatype[c]);
if (arguments->dataType[c])
printf("%s,", arguments->dataType[c]);
else
break;
printf("\n");
......@@ -1875,15 +2003,15 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
printf("# Insertion interval: %"PRIu64"\n",
arguments->insert_interval);
printf("# Number of records per req: %u\n",
arguments->num_of_RPR);
arguments->reqPerReq);
printf("# Max SQL length: %"PRIu64"\n",
arguments->max_sql_len);
printf("# Length of Binary: %d\n", arguments->binwidth);
printf("# Number of Threads: %d\n", arguments->num_of_threads);
printf("# Number of Threads: %d\n", arguments->nthreads);
printf("# Number of Tables: %"PRId64"\n",
arguments->num_of_tables);
arguments->ntables);
printf("# Number of Data per Table: %"PRId64"\n",
arguments->num_of_DPT);
arguments->insertRows);
printf("# Database name: %s\n", arguments->database);
printf("# Table prefix: %s\n", arguments->tb_prefix);
if (arguments->disorderRatio) {
......@@ -1909,31 +2037,20 @@ static void tmfclose(FILE *fp) {
static void tmfree(char *buf) {
if (NULL != buf) {
free(buf);
buf = NULL;
}
}
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
int i;
TAOS_RES *res = NULL;
int32_t code = -1;
for (i = 0; i < 5 /* retry */; i++) {
if (NULL != res) {
taos_free_result(res);
res = NULL;
}
verbosePrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
res = taos_query(taos, command);
code = taos_errno(res);
if (0 == code) {
break;
}
}
TAOS_RES *res = taos_query(taos, command);
int32_t code = taos_errno(res);
verbosePrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
if (code != 0) {
if (!quiet) {
errorPrint2("Failed to execute %s, reason: %s\n",
errorPrint2("Failed to execute <%s>, reason: %s\n",
command, taos_errstr(res));
}
taos_free_result(res);
......@@ -2338,7 +2455,7 @@ static int printfInsertMeta() {
printf("top insert interval: \033[33m%"PRIu64"\033[0m\n",
g_args.insert_interval);
printf("number of records per req: \033[33m%u\033[0m\n",
g_args.num_of_RPR);
g_args.reqPerReq);
printf("max sql length: \033[33m%"PRIu64"\033[0m\n",
g_args.max_sql_len);
......@@ -2420,7 +2537,7 @@ static int printfInsertMeta() {
printf(" super table[\033[33m%"PRIu64"\033[0m]:\n", j);
printf(" stbName: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].sTblName);
g_Dbs.db[i].superTbls[j].stbName);
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "no");
......@@ -2543,7 +2660,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountForCreateTbl);
fprintf(fp, "number of records per req: %u\n", g_args.num_of_RPR);
fprintf(fp, "number of records per req: %u\n", g_args.reqPerReq);
fprintf(fp, "max sql length: %"PRIu64"\n", g_args.max_sql_len);
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
......@@ -2610,7 +2727,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " super table[%d]:\n", j);
fprintf(fp, " stbName: %s\n",
g_Dbs.db[i].superTbls[j].sTblName);
g_Dbs.db[i].superTbls[j].stbName);
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
fprintf(fp, " autoCreateTable: %s\n", "no");
......@@ -2769,7 +2886,7 @@ static void printfQueryMeta() {
printf("childTblCount: \033[33m%"PRId64"\033[0m\n",
g_queryInfo.superQueryInfo.childTblCount);
printf("stable name: \033[33m%s\033[0m\n",
g_queryInfo.superQueryInfo.sTblName);
g_queryInfo.superQueryInfo.stbName);
printf("stb query times:\033[33m%"PRIu64"\033[0m\n",
g_queryInfo.superQueryInfo.queryTimes);
......@@ -2840,36 +2957,45 @@ static void xDumpFieldToFile(FILE* fp, const char* val,
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
fprintf(fp, "%d", ((((int32_t)(*((char *)val))) == 1) ? 1 : 0));
fprintf(fp, "%d", ((((int32_t)(*((int8_t*)val))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
fprintf(fp, "%d", *((int8_t *)val));
break;
case TSDB_DATA_TYPE_SMALLINT:
fprintf(fp, "%d", *((int16_t *)val));
break;
case TSDB_DATA_TYPE_INT:
fprintf(fp, "%d", *((int32_t *)val));
break;
case TSDB_DATA_TYPE_BIGINT:
fprintf(fp, "%"PRId64"", *((int64_t *)val));
break;
case TSDB_DATA_TYPE_FLOAT:
fprintf(fp, "%.5f", GET_FLOAT_VAL(val));
break;
case TSDB_DATA_TYPE_DOUBLE:
fprintf(fp, "%.9f", GET_DOUBLE_VAL(val));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy(buf, val, length);
buf[length] = 0;
fprintf(fp, "\'%s\'", buf);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(buf, *(int64_t*)val, precision);
fprintf(fp, "'%s'", buf);
break;
default:
break;
}
......@@ -3356,27 +3482,48 @@ static int calcRowLen(SSuperTable* superTbls) {
for (colIndex = 0; colIndex < superTbls->columnCount; colIndex++) {
char* dataType = superTbls->columns[colIndex].dataType;
if (strcasecmp(dataType, "BINARY") == 0) {
switch(superTbls->columns[colIndex].data_type) {
case TSDB_DATA_TYPE_BINARY:
lenOfOneRow += superTbls->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "NCHAR") == 0) {
break;
case TSDB_DATA_TYPE_NCHAR:
lenOfOneRow += superTbls->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "INT") == 0) {
break;
case TSDB_DATA_TYPE_INT:
lenOfOneRow += INT_BUFF_LEN;
} else if (strcasecmp(dataType, "BIGINT") == 0) {
break;
case TSDB_DATA_TYPE_BIGINT:
lenOfOneRow += BIGINT_BUFF_LEN;
} else if (strcasecmp(dataType, "SMALLINT") == 0) {
break;
case TSDB_DATA_TYPE_SMALLINT:
lenOfOneRow += SMALLINT_BUFF_LEN;
} else if (strcasecmp(dataType, "TINYINT") == 0) {
break;
case TSDB_DATA_TYPE_TINYINT:
lenOfOneRow += TINYINT_BUFF_LEN;
} else if (strcasecmp(dataType, "BOOL") == 0) {
break;
case TSDB_DATA_TYPE_BOOL:
lenOfOneRow += BOOL_BUFF_LEN;
} else if (strcasecmp(dataType, "FLOAT") == 0) {
break;
case TSDB_DATA_TYPE_FLOAT:
lenOfOneRow += FLOAT_BUFF_LEN;
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
break;
case TSDB_DATA_TYPE_DOUBLE:
lenOfOneRow += DOUBLE_BUFF_LEN;
} else if (strcasecmp(dataType, "TIMESTAMP") == 0) {
break;
case TSDB_DATA_TYPE_TIMESTAMP:
lenOfOneRow += TIMESTAMP_BUFF_LEN;
} else {
break;
default:
errorPrint2("get error data type : %s\n", dataType);
exit(EXIT_FAILURE);
}
......@@ -3418,9 +3565,8 @@ static int calcRowLen(SSuperTable* superTbls) {
return 0;
}
static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
char* dbName, char* sTblName, char** childTblNameOfSuperTbl,
char* dbName, char* stbName, char** childTblNameOfSuperTbl,
int64_t* childTblCountOfSuperTbl, int64_t limit, uint64_t offset) {
char command[1024] = "\0";
......@@ -3438,7 +3584,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
//get all child table name use cmd: select tbname from superTblName;
snprintf(command, 1024, "select tbname from %s.%s %s",
dbName, sTblName, limitBuf);
dbName, stbName, limitBuf);
res = taos_query(taos, command);
int32_t code = taos_errno(res);
......@@ -3489,7 +3635,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
taos_free_result(res);
taos_close(taos);
errorPrint2("%s() LN%d, realloc fail for save child table name of %s.%s\n",
__func__, __LINE__, dbName, sTblName);
__func__, __LINE__, dbName, stbName);
exit(EXIT_FAILURE);
}
}
......@@ -3504,10 +3650,10 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
}
static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName,
char* sTblName, char** childTblNameOfSuperTbl,
char* stbName, char** childTblNameOfSuperTbl,
int64_t* childTblCountOfSuperTbl) {
return getChildNameOfSuperTableWithLimitAndOffset(taos, dbName, sTblName,
return getChildNameOfSuperTableWithLimitAndOffset(taos, dbName, stbName,
childTblNameOfSuperTbl, childTblCountOfSuperTbl,
-1, 0);
}
......@@ -3521,7 +3667,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
int count = 0;
//get schema use cmd: describe superTblName;
snprintf(command, 1024, "describe %s.%s", dbName, superTbls->sTblName);
snprintf(command, 1024, "describe %s.%s", dbName, superTbls->stbName);
res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
......@@ -3547,6 +3693,39 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
min(DATATYPE_BUFF_LEN,
fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes) + 1);
if (0 == strncasecmp(superTbls->tags[tagIndex].dataType,
"INT", strlen("INT"))) {
superTbls->tags[tagIndex].data_type = TSDB_DATA_TYPE_INT;
} else if (0 == strncasecmp(superTbls->tags[tagIndex].dataType,
"TINYINT", strlen("TINYINT"))) {
superTbls->tags[tagIndex].data_type = TSDB_DATA_TYPE_TINYINT;
} else if (0 == strncasecmp(superTbls->tags[tagIndex].dataType,
"SMALLINT", strlen("SMALLINT"))) {
superTbls->tags[tagIndex].data_type = TSDB_DATA_TYPE_SMALLINT;
} else if (0 == strncasecmp(superTbls->tags[tagIndex].dataType,
"BIGINT", strlen("BIGINT"))) {
superTbls->tags[tagIndex].data_type = TSDB_DATA_TYPE_BIGINT;
} else if (0 == strncasecmp(superTbls->tags[tagIndex].dataType,
"FLOAT", strlen("FLOAT"))) {
superTbls->tags[tagIndex].data_type = TSDB_DATA_TYPE_FLOAT;
} else if (0 == strncasecmp(superTbls->tags[tagIndex].dataType,
"DOUBLE", strlen("DOUBLE"))) {
superTbls->tags[tagIndex].data_type = TSDB_DATA_TYPE_DOUBLE;
} else if (0 == strncasecmp(superTbls->tags[tagIndex].dataType,
"BINARY", strlen("BINARY"))) {
superTbls->tags[tagIndex].data_type = TSDB_DATA_TYPE_BINARY;
} else if (0 == strncasecmp(superTbls->tags[tagIndex].dataType,
"NCHAR", strlen("NCHAR"))) {
superTbls->tags[tagIndex].data_type = TSDB_DATA_TYPE_NCHAR;
} else if (0 == strncasecmp(superTbls->tags[tagIndex].dataType,
"BOOL", strlen("BOOL"))) {
superTbls->tags[tagIndex].data_type = TSDB_DATA_TYPE_BOOL;
} else if (0 == strncasecmp(superTbls->tags[tagIndex].dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
superTbls->tags[tagIndex].data_type = TSDB_DATA_TYPE_TIMESTAMP;
} else {
superTbls->tags[tagIndex].data_type = TSDB_DATA_TYPE_NULL;
}
superTbls->tags[tagIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->tags[tagIndex].note,
......@@ -3558,16 +3737,51 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
tstrncpy(superTbls->columns[columnIndex].field,
(char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->columns[columnIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
min(DATATYPE_BUFF_LEN,
fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes) + 1);
if (0 == strncasecmp(superTbls->columns[columnIndex].dataType,
"INT", strlen("INT"))) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_INT;
} else if (0 == strncasecmp(superTbls->columns[columnIndex].dataType,
"TINYINT", strlen("TINYINT"))) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_TINYINT;
} else if (0 == strncasecmp(superTbls->columns[columnIndex].dataType,
"SMALLINT", strlen("SMALLINT"))) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_SMALLINT;
} else if (0 == strncasecmp(superTbls->columns[columnIndex].dataType,
"BIGINT", strlen("BIGINT"))) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_BIGINT;
} else if (0 == strncasecmp(superTbls->columns[columnIndex].dataType,
"FLOAT", strlen("FLOAT"))) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_FLOAT;
} else if (0 == strncasecmp(superTbls->columns[columnIndex].dataType,
"DOUBLE", strlen("DOUBLE"))) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_DOUBLE;
} else if (0 == strncasecmp(superTbls->columns[columnIndex].dataType,
"BINARY", strlen("BINARY"))) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_BINARY;
} else if (0 == strncasecmp(superTbls->columns[columnIndex].dataType,
"NCHAR", strlen("NCHAR"))) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_NCHAR;
} else if (0 == strncasecmp(superTbls->columns[columnIndex].dataType,
"BOOL", strlen("BOOL"))) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_BOOL;
} else if (0 == strncasecmp(superTbls->columns[columnIndex].dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_TIMESTAMP;
} else {
superTbls->columns[columnIndex].data_type = TSDB_DATA_TYPE_NULL;
}
superTbls->columns[columnIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->columns[columnIndex].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
min(NOTE_BUFF_LEN,
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes) + 1);
columnIndex++;
}
count++;
......@@ -3589,7 +3803,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
return -1;
}
getAllChildNameOfSuperTable(taos, dbName,
superTbls->sTblName,
superTbls->stbName,
&superTbls->childTblName,
&superTbls->childTblCount);
}
......@@ -3605,7 +3819,6 @@ static int createSuperTable(
assert(command);
char cols[COL_BUFFER_LEN] = "\0";
int colIndex;
int len = 0;
int lenOfOneRow = 0;
......@@ -3617,20 +3830,24 @@ static int createSuperTable(
return -1;
}
for (colIndex = 0; colIndex < superTbl->columnCount; colIndex++) {
char* dataType = superTbl->columns[colIndex].dataType;
for (int colIndex = 0; colIndex < superTbl->columnCount; colIndex++) {
if (strcasecmp(dataType, "BINARY") == 0) {
switch(superTbl->columns[colIndex].data_type) {
case TSDB_DATA_TYPE_BINARY:
len += snprintf(cols + len, COL_BUFFER_LEN - len,
",C%d %s(%d)", colIndex, "BINARY",
superTbl->columns[colIndex].dataLen);
lenOfOneRow += superTbl->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "NCHAR") == 0) {
break;
case TSDB_DATA_TYPE_NCHAR:
len += snprintf(cols + len, COL_BUFFER_LEN - len,
",C%d %s(%d)", colIndex, "NCHAR",
superTbl->columns[colIndex].dataLen);
lenOfOneRow += superTbl->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "INT") == 0) {
break;
case TSDB_DATA_TYPE_INT:
if ((g_args.demo_mode) && (colIndex == 1)) {
len += snprintf(cols + len, COL_BUFFER_LEN - len,
", VOLTAGE INT");
......@@ -3638,21 +3855,31 @@ static int createSuperTable(
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "INT");
}
lenOfOneRow += INT_BUFF_LEN;
} else if (strcasecmp(dataType, "BIGINT") == 0) {
break;
case TSDB_DATA_TYPE_BIGINT:
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s",
colIndex, "BIGINT");
lenOfOneRow += BIGINT_BUFF_LEN;
} else if (strcasecmp(dataType, "SMALLINT") == 0) {
break;
case TSDB_DATA_TYPE_SMALLINT:
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s",
colIndex, "SMALLINT");
lenOfOneRow += SMALLINT_BUFF_LEN;
} else if (strcasecmp(dataType, "TINYINT") == 0) {
break;
case TSDB_DATA_TYPE_TINYINT:
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "TINYINT");
lenOfOneRow += TINYINT_BUFF_LEN;
} else if (strcasecmp(dataType, "BOOL") == 0) {
break;
case TSDB_DATA_TYPE_BOOL:
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s", colIndex, "BOOL");
lenOfOneRow += BOOL_BUFF_LEN;
} else if (strcasecmp(dataType, "FLOAT") == 0) {
break;
case TSDB_DATA_TYPE_FLOAT:
if (g_args.demo_mode) {
if (colIndex == 0) {
len += snprintf(cols + len, COL_BUFFER_LEN - len, ", CURRENT FLOAT");
......@@ -3664,19 +3891,25 @@ static int createSuperTable(
}
lenOfOneRow += FLOAT_BUFF_LEN;
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
break;
case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s",
colIndex, "DOUBLE");
lenOfOneRow += DOUBLE_BUFF_LEN;
} else if (strcasecmp(dataType, "TIMESTAMP") == 0) {
break;
case TSDB_DATA_TYPE_TIMESTAMP:
len += snprintf(cols + len, COL_BUFFER_LEN - len, ",C%d %s",
colIndex, "TIMESTAMP");
lenOfOneRow += TIMESTAMP_BUFF_LEN;
} else {
break;
default:
taos_close(taos);
free(command);
errorPrint2("%s() LN%d, config error data type : %s\n",
__func__, __LINE__, dataType);
__func__, __LINE__, superTbl->columns[colIndex].dataType);
exit(EXIT_FAILURE);
}
}
......@@ -3777,16 +4010,16 @@ static int createSuperTable(
superTbl->lenOfTagOfOneRow = lenOfTagOfOneRow;
snprintf(command, BUFFER_SIZE,
"create table if not exists %s.%s (ts timestamp%s) tags %s",
dbName, superTbl->sTblName, cols, tags);
"CREATE TABLE IF NOT EXISTS %s.%s (ts TIMESTAMP%s) TAGS %s",
dbName, superTbl->stbName, cols, tags);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
errorPrint2("create supertable %s failed!\n\n",
superTbl->sTblName);
superTbl->stbName);
free(command);
return -1;
}
debugPrint("create supertable %s success!\n\n", superTbl->sTblName);
debugPrint("create supertable %s success!\n\n", superTbl->stbName);
free(command);
return 0;
}
......@@ -3810,42 +4043,42 @@ int createDatabasesAndStables(char *command) {
int dataLen = 0;
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "create database if not exists %s",
BUFFER_SIZE - dataLen, "CREATE DATABASE IF NOT EXISTS %s",
g_Dbs.db[i].dbName);
if (g_Dbs.db[i].dbCfg.blocks > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " blocks %d",
BUFFER_SIZE - dataLen, " BLOCKS %d",
g_Dbs.db[i].dbCfg.blocks);
}
if (g_Dbs.db[i].dbCfg.cache > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cache %d",
BUFFER_SIZE - dataLen, " CACHE %d",
g_Dbs.db[i].dbCfg.cache);
}
if (g_Dbs.db[i].dbCfg.days > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " days %d",
BUFFER_SIZE - dataLen, " DAYS %d",
g_Dbs.db[i].dbCfg.days);
}
if (g_Dbs.db[i].dbCfg.keep > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " keep %d",
BUFFER_SIZE - dataLen, " KEEP %d",
g_Dbs.db[i].dbCfg.keep);
}
if (g_Dbs.db[i].dbCfg.quorum > 1) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " quorum %d",
BUFFER_SIZE - dataLen, " QUORUM %d",
g_Dbs.db[i].dbCfg.quorum);
}
if (g_Dbs.db[i].dbCfg.replica > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " replica %d",
BUFFER_SIZE - dataLen, " REPLICA %d",
g_Dbs.db[i].dbCfg.replica);
}
if (g_Dbs.db[i].dbCfg.update > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " update %d",
BUFFER_SIZE - dataLen, " UPDATE %d",
g_Dbs.db[i].dbCfg.update);
}
//if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) {
......@@ -3854,17 +4087,17 @@ int createDatabasesAndStables(char *command) {
//}
if (g_Dbs.db[i].dbCfg.minRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " minrows %d",
BUFFER_SIZE - dataLen, " MINROWS %d",
g_Dbs.db[i].dbCfg.minRows);
}
if (g_Dbs.db[i].dbCfg.maxRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " maxrows %d",
BUFFER_SIZE - dataLen, " MAXROWS %d",
g_Dbs.db[i].dbCfg.maxRows);
}
if (g_Dbs.db[i].dbCfg.comp > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " comp %d",
BUFFER_SIZE - dataLen, " COMP %d",
g_Dbs.db[i].dbCfg.comp);
}
if (g_Dbs.db[i].dbCfg.walLevel > 0) {
......@@ -3874,12 +4107,12 @@ int createDatabasesAndStables(char *command) {
}
if (g_Dbs.db[i].dbCfg.cacheLast > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cachelast %d",
BUFFER_SIZE - dataLen, " CACHELAST %d",
g_Dbs.db[i].dbCfg.cacheLast);
}
if (g_Dbs.db[i].dbCfg.fsync > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" fsync %d", g_Dbs.db[i].dbCfg.fsync);
" FSYNC %d", g_Dbs.db[i].dbCfg.fsync);
}
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
......@@ -3906,7 +4139,7 @@ int createDatabasesAndStables(char *command) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].sTblName);
g_Dbs.db[i].superTbls[j].stbName);
ret = queryDbExec(taos, command, NO_INSERT_TYPE, true);
if ((ret != 0) || (g_Dbs.db[i].drop)) {
......@@ -3923,7 +4156,7 @@ int createDatabasesAndStables(char *command) {
&g_Dbs.db[i].superTbls[j]);
if (0 != ret) {
errorPrint2("\nget super table %s.%s info failed!\n\n",
g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName);
g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].stbName);
continue;
}
......@@ -3965,7 +4198,7 @@ static void* createTable(void *sarg)
i <= pThreadInfo->end_table_to; i++) {
if (0 == g_Dbs.use_metric) {
snprintf(pThreadInfo->buffer, buff_len,
"create table if not exists %s.%s%"PRIu64" %s;",
"CREATE TABLE IF NOT EXISTS %s.%s%"PRIu64" %s;",
pThreadInfo->db_name,
g_args.tb_prefix, i,
pThreadInfo->cols);
......@@ -3981,7 +4214,7 @@ static void* createTable(void *sarg)
batchNum = 0;
memset(pThreadInfo->buffer, 0, buff_len);
len += snprintf(pThreadInfo->buffer + len,
buff_len - len, "create table ");
buff_len - len, "CREATE TABLE ");
}
char* tagsValBuf = NULL;
......@@ -4006,7 +4239,7 @@ static void* createTable(void *sarg)
"if not exists %s.%s%"PRIu64" using %s.%s tags %s ",
pThreadInfo->db_name, stbInfo->childTblPrefix,
i, pThreadInfo->db_name,
stbInfo->sTblName, tagsValBuf);
stbInfo->stbName, tagsValBuf);
free(tagsValBuf);
batchNum++;
if ((batchNum < stbInfo->batchCreateTableNum)
......@@ -4151,15 +4384,15 @@ static void createChildTables() {
} else {
// normal table
len = snprintf(tblColsBuf, TSDB_MAX_BYTES_PER_ROW, "(TS TIMESTAMP");
for (int j = 0; j < g_args.num_of_CPR; j++) {
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.datatype[j],
for (int j = 0; j < g_args.columnCount; j++) {
if ((strncasecmp(g_args.dataType[j], "BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.dataType[j],
"NCHAR", strlen("NCHAR")) == 0)) {
snprintf(tblColsBuf + len, TSDB_MAX_BYTES_PER_ROW - len,
",C%d %s(%d)", j, g_args.datatype[j], g_args.binwidth);
",C%d %s(%d)", j, g_args.dataType[j], g_args.binwidth);
} else {
snprintf(tblColsBuf + len, TSDB_MAX_BYTES_PER_ROW - len,
",C%d %s", j, g_args.datatype[j]);
",C%d %s", j, g_args.dataType[j]);
}
len = strlen(tblColsBuf);
}
......@@ -4168,12 +4401,12 @@ static void createChildTables() {
verbosePrint("%s() LN%d: dbName: %s num of tb: %"PRId64" schema: %s\n",
__func__, __LINE__,
g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf);
g_Dbs.db[i].dbName, g_args.ntables, tblColsBuf);
startMultiThreadCreateChildTable(
tblColsBuf,
g_Dbs.threadCountForCreateTbl,
0,
g_args.num_of_tables,
g_args.ntables,
g_Dbs.db[i].dbName,
NULL);
}
......@@ -4251,7 +4484,7 @@ static int readTagFromCsvFileToMem(SSuperTable * stbInfo) {
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
static int readSampleFromCsvFileToMem(
static int generateSampleFromCsvForStb(
SSuperTable* stbInfo) {
size_t n = 0;
ssize_t readLen = 0;
......@@ -4377,6 +4610,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
tstrncpy(superTbls->columns[index].dataType,
columnCase.dataType,
min(DATATYPE_BUFF_LEN, strlen(columnCase.dataType) + 1));
superTbls->columns[index].dataLen = columnCase.dataLen;
index++;
}
......@@ -4390,6 +4624,42 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
superTbls->columnCount = index;
for (int c = 0; c < superTbls->columnCount; c++) {
if (0 == strncasecmp(superTbls->columns[c].dataType,
"INT", strlen("INT"))) {
superTbls->columns[c].data_type = TSDB_DATA_TYPE_INT;
} else if (0 == strncasecmp(superTbls->columns[c].dataType,
"TINYINT", strlen("TINYINT"))) {
superTbls->columns[c].data_type = TSDB_DATA_TYPE_TINYINT;
} else if (0 == strncasecmp(superTbls->columns[c].dataType,
"SMALLINT", strlen("SMALLINT"))) {
superTbls->columns[c].data_type = TSDB_DATA_TYPE_SMALLINT;
} else if (0 == strncasecmp(superTbls->columns[c].dataType,
"BIGINT", strlen("BIGINT"))) {
superTbls->columns[c].data_type = TSDB_DATA_TYPE_BIGINT;
} else if (0 == strncasecmp(superTbls->columns[c].dataType,
"FLOAT", strlen("FLOAT"))) {
superTbls->columns[c].data_type = TSDB_DATA_TYPE_FLOAT;
} else if (0 == strncasecmp(superTbls->columns[c].dataType,
"DOUBLE", strlen("DOUBLE"))) {
superTbls->columns[c].data_type = TSDB_DATA_TYPE_DOUBLE;
} else if (0 == strncasecmp(superTbls->columns[c].dataType,
"BINARY", strlen("BINARY"))) {
superTbls->columns[c].data_type = TSDB_DATA_TYPE_BINARY;
} else if (0 == strncasecmp(superTbls->columns[c].dataType,
"NCHAR", strlen("NCHAR"))) {
superTbls->columns[c].data_type = TSDB_DATA_TYPE_NCHAR;
} else if (0 == strncasecmp(superTbls->columns[c].dataType,
"BOOL", strlen("BOOL"))) {
superTbls->columns[c].data_type = TSDB_DATA_TYPE_BOOL;
} else if (0 == strncasecmp(superTbls->columns[c].dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
superTbls->columns[c].data_type = TSDB_DATA_TYPE_TIMESTAMP;
} else {
superTbls->columns[c].data_type = TSDB_DATA_TYPE_NULL;
}
}
count = 1;
index = 0;
// tags
......@@ -4459,6 +4729,42 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
superTbls->tagCount = index;
for (int t = 0; t < superTbls->tagCount; t++) {
if (0 == strncasecmp(superTbls->tags[t].dataType,
"INT", strlen("INT"))) {
superTbls->tags[t].data_type = TSDB_DATA_TYPE_INT;
} else if (0 == strncasecmp(superTbls->tags[t].dataType,
"TINYINT", strlen("TINYINT"))) {
superTbls->tags[t].data_type = TSDB_DATA_TYPE_TINYINT;
} else if (0 == strncasecmp(superTbls->tags[t].dataType,
"SMALLINT", strlen("SMALLINT"))) {
superTbls->tags[t].data_type = TSDB_DATA_TYPE_SMALLINT;
} else if (0 == strncasecmp(superTbls->tags[t].dataType,
"BIGINT", strlen("BIGINT"))) {
superTbls->tags[t].data_type = TSDB_DATA_TYPE_BIGINT;
} else if (0 == strncasecmp(superTbls->tags[t].dataType,
"FLOAT", strlen("FLOAT"))) {
superTbls->tags[t].data_type = TSDB_DATA_TYPE_FLOAT;
} else if (0 == strncasecmp(superTbls->tags[t].dataType,
"DOUBLE", strlen("DOUBLE"))) {
superTbls->tags[t].data_type = TSDB_DATA_TYPE_DOUBLE;
} else if (0 == strncasecmp(superTbls->tags[t].dataType,
"BINARY", strlen("BINARY"))) {
superTbls->tags[t].data_type = TSDB_DATA_TYPE_BINARY;
} else if (0 == strncasecmp(superTbls->tags[t].dataType,
"NCHAR", strlen("NCHAR"))) {
superTbls->tags[t].data_type = TSDB_DATA_TYPE_NCHAR;
} else if (0 == strncasecmp(superTbls->tags[t].dataType,
"BOOL", strlen("BOOL"))) {
superTbls->tags[t].data_type = TSDB_DATA_TYPE_BOOL;
} else if (0 == strncasecmp(superTbls->tags[t].dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
superTbls->tags[t].data_type = TSDB_DATA_TYPE_TIMESTAMP;
} else {
superTbls->tags[t].data_type = TSDB_DATA_TYPE_NULL;
}
}
if ((superTbls->columnCount + superTbls->tagCount + 1 /* ts */) > TSDB_MAX_COLUMNS) {
errorPrint("columns + tags is more than allowed max columns count: %d\n",
TSDB_MAX_COLUMNS);
......@@ -4595,9 +4901,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
prompt();
numRecPerReq->valueint = MAX_RECORDS_PER_REQ;
}
g_args.num_of_RPR = numRecPerReq->valueint;
g_args.reqPerReq = numRecPerReq->valueint;
} else if (!numRecPerReq) {
g_args.num_of_RPR = MAX_RECORDS_PER_REQ;
g_args.reqPerReq = MAX_RECORDS_PER_REQ;
} else {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n",
__func__, __LINE__);
......@@ -4623,13 +4929,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
}
// rows per table need be less than insert batch
if (g_args.interlace_rows > g_args.num_of_RPR) {
if (g_args.interlace_rows > g_args.reqPerReq) {
printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n",
g_args.interlace_rows, g_args.num_of_RPR);
g_args.interlace_rows, g_args.reqPerReq);
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n",
g_args.num_of_RPR);
g_args.reqPerReq);
prompt();
g_args.interlace_rows = g_args.num_of_RPR;
g_args.interlace_rows = g_args.reqPerReq;
}
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
......@@ -4858,7 +5164,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
errorPrint("%s", "failed to read json, stb name not found\n");
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring,
tstrncpy(g_Dbs.db[i].superTbls[j].stbName, stbName->valuestring,
TSDB_TABLE_NAME_LEN);
cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix");
......@@ -5512,7 +5818,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname");
if (stblname && stblname->type == cJSON_String
&& stblname->valuestring != NULL) {
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring,
tstrncpy(g_queryInfo.superQueryInfo.stbName, stblname->valuestring,
TSDB_TABLE_NAME_LEN);
} else {
errorPrint("%s", "failed to read json, super table name input error\n");
......@@ -5734,23 +6040,37 @@ static int prepareSampleData() {
static void postFreeResource() {
tmfclose(g_fpOfInsertResult);
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) {
free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
tmfree(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL;
}
if (0 != g_Dbs.db[i].superTbls[j].sampleDataBuf) {
free(g_Dbs.db[i].superTbls[j].sampleDataBuf);
tmfree(g_Dbs.db[i].superTbls[j].sampleDataBuf);
g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL;
}
#if STMT_BIND_PARAM_BATCH == 1
for (int c = 0;
c < g_Dbs.db[i].superTbls[j].columnCount; c ++) {
if (g_Dbs.db[i].superTbls[j].sampleBindBatchArray) {
tmfree((char *)((uintptr_t)*(uintptr_t*)(
g_Dbs.db[i].superTbls[j].sampleBindBatchArray
+ sizeof(char*) * c)));
}
}
tmfree(g_Dbs.db[i].superTbls[j].sampleBindBatchArray);
#endif
if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) {
free(g_Dbs.db[i].superTbls[j].tagDataBuf);
tmfree(g_Dbs.db[i].superTbls[j].tagDataBuf);
g_Dbs.db[i].superTbls[j].tagDataBuf = NULL;
}
if (0 != g_Dbs.db[i].superTbls[j].childTblName) {
free(g_Dbs.db[i].superTbls[j].childTblName);
tmfree(g_Dbs.db[i].superTbls[j].childTblName);
g_Dbs.db[i].superTbls[j].childTblName = NULL;
}
}
......@@ -5766,6 +6086,19 @@ static void postFreeResource() {
tmfree(g_rand_current_buff);
tmfree(g_rand_phase_buff);
tmfree(g_sampleDataBuf);
#if STMT_BIND_PARAM_BATCH == 1
for (int l = 0;
l < g_args.columnCount; l ++) {
if (g_sampleBindBatchArray) {
tmfree((char *)((uintptr_t)*(uintptr_t*)(
g_sampleBindBatchArray
+ sizeof(char*) * l)));
}
}
tmfree(g_sampleBindBatchArray);
#endif
}
static int getRowDataFromSample(
......@@ -5803,13 +6136,14 @@ static int64_t generateStbRowData(
int tmpLen;
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"(%" PRId64 ",", timestamp);
"(%" PRId64 "", timestamp);
for (int i = 0; i < stbInfo->columnCount; i++) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType,
"BINARY", 6))
|| (0 == strncasecmp(stbInfo->columns[i].dataType,
"NCHAR", 5))) {
tstrncpy(pstr + dataLen, ",", 2);
dataLen += 1;
if ((stbInfo->columns[i].data_type == TSDB_DATA_TYPE_BINARY)
|| (stbInfo->columns[i].data_type == TSDB_DATA_TYPE_NCHAR)) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint2("binary or nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
......@@ -5827,14 +6161,13 @@ static int64_t generateStbRowData(
return -1;
}
rand_string(buf, stbInfo->columns[i].dataLen);
dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\',", buf);
dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\'", buf);
tmfree(buf);
} else {
char *tmp;
if (0 == strncasecmp(stbInfo->columns[i].dataType,
"INT", 3)) {
char *tmp = NULL;
switch(stbInfo->columns[i].data_type) {
case TSDB_DATA_TYPE_INT:
if ((g_args.demo_mode) && (i == 1)) {
tmp = demo_voltage_int_str();
} else {
......@@ -5842,12 +6175,14 @@ static int64_t generateStbRowData(
}
tmpLen = strlen(tmp);
tstrncpy(pstr + dataLen, tmp, min(tmpLen + 1, INT_BUFF_LEN));
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"BIGINT", 6)) {
break;
case TSDB_DATA_TYPE_BIGINT:
tmp = rand_bigint_str();
tstrncpy(pstr + dataLen, tmp, BIGINT_BUFF_LEN);
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"FLOAT", 5)) {
break;
case TSDB_DATA_TYPE_FLOAT:
if (g_args.demo_mode) {
if (i == 0) {
tmp = demo_current_float_str();
......@@ -5859,48 +6194,58 @@ static int64_t generateStbRowData(
}
tmpLen = strlen(tmp);
tstrncpy(pstr + dataLen, tmp, min(tmpLen +1, FLOAT_BUFF_LEN));
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"DOUBLE", 6)) {
break;
case TSDB_DATA_TYPE_DOUBLE:
tmp = rand_double_str();
tmpLen = strlen(tmp);
tstrncpy(pstr + dataLen, tmp, min(tmpLen +1, DOUBLE_BUFF_LEN));
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"SMALLINT", 8)) {
break;
case TSDB_DATA_TYPE_SMALLINT:
tmp = rand_smallint_str();
tmpLen = strlen(tmp);
tstrncpy(pstr + dataLen, tmp,
min(tmpLen + 1, SMALLINT_BUFF_LEN));
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"TINYINT", 7)) {
break;
case TSDB_DATA_TYPE_TINYINT:
tmp = rand_tinyint_str();
tmpLen = strlen(tmp);
tstrncpy(pstr + dataLen, tmp, min(tmpLen +1, TINYINT_BUFF_LEN));
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"BOOL", 4)) {
break;
case TSDB_DATA_TYPE_BOOL:
tmp = rand_bool_str();
tmpLen = strlen(tmp);
tstrncpy(pstr + dataLen, tmp, min(tmpLen +1, BOOL_BUFF_LEN));
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"TIMESTAMP", 9)) {
break;
case TSDB_DATA_TYPE_TIMESTAMP:
tmp = rand_bigint_str();
tmpLen = strlen(tmp);
tstrncpy(pstr + dataLen, tmp, min(tmpLen +1, BIGINT_BUFF_LEN));
} else {
break;
case TSDB_DATA_TYPE_NULL:
break;
default:
errorPrint2("Not support data type: %s\n",
stbInfo->columns[i].dataType);
return -1;
exit(EXIT_FAILURE);
}
if (tmp) {
dataLen += strlen(tmp);
tstrncpy(pstr + dataLen, ",", 2);
dataLen += 1;
}
}
if (dataLen > (remainderBufLen - (128)))
return 0;
}
tstrncpy(pstr + dataLen - 1, ")", 2);
tstrncpy(pstr + dataLen, ")", 2);
verbosePrint("%s() LN%d, dataLen:%"PRId64"\n", __func__, __LINE__, dataLen);
verbosePrint("%s() LN%d, recBuf:\n\t%s\n", __func__, __LINE__, recBuf);
......@@ -5908,35 +6253,53 @@ static int64_t generateStbRowData(
return strlen(recBuf);
}
static int64_t generateData(char *recBuf, char **data_type,
static int64_t generateData(char *recBuf, char *data_type,
int64_t timestamp, int lenOfBinary) {
memset(recBuf, 0, MAX_DATA_SIZE);
char *pstr = recBuf;
pstr += sprintf(pstr, "(%"PRId64"", timestamp);
int columnCount = g_args.num_of_CPR;
int columnCount = g_args.columnCount;
bool b;
char *s;
for (int i = 0; i < columnCount; i++) {
if (strcasecmp(data_type[i % columnCount], "TINYINT") == 0) {
switch (data_type[i]) {
case TSDB_DATA_TYPE_TINYINT:
pstr += sprintf(pstr, ",%d", rand_tinyint() );
} else if (strcasecmp(data_type[i % columnCount], "SMALLINT") == 0) {
break;
case TSDB_DATA_TYPE_SMALLINT:
pstr += sprintf(pstr, ",%d", rand_smallint());
} else if (strcasecmp(data_type[i % columnCount], "INT") == 0) {
break;
case TSDB_DATA_TYPE_INT:
pstr += sprintf(pstr, ",%d", rand_int());
} else if (strcasecmp(data_type[i % columnCount], "BIGINT") == 0) {
break;
case TSDB_DATA_TYPE_BIGINT:
pstr += sprintf(pstr, ",%"PRId64"", rand_bigint());
} else if (strcasecmp(data_type[i % columnCount], "TIMESTAMP") == 0) {
break;
case TSDB_DATA_TYPE_TIMESTAMP:
pstr += sprintf(pstr, ",%"PRId64"", rand_bigint());
} else if (strcasecmp(data_type[i % columnCount], "FLOAT") == 0) {
break;
case TSDB_DATA_TYPE_FLOAT:
pstr += sprintf(pstr, ",%10.4f", rand_float());
} else if (strcasecmp(data_type[i % columnCount], "DOUBLE") == 0) {
double t = rand_double();
pstr += sprintf(pstr, ",%20.8f", t);
} else if (strcasecmp(data_type[i % columnCount], "BOOL") == 0) {
bool b = rand_bool() & 1;
break;
case TSDB_DATA_TYPE_DOUBLE:
pstr += sprintf(pstr, ",%20.8f", rand_double());
break;
case TSDB_DATA_TYPE_BOOL:
b = rand_bool() & 1;
pstr += sprintf(pstr, ",%s", b ? "true" : "false");
} else if (strcasecmp(data_type[i % columnCount], "BINARY") == 0) {
char *s = malloc(lenOfBinary + 1);
break;
case TSDB_DATA_TYPE_BINARY:
s = malloc(lenOfBinary + 1);
if (s == NULL) {
errorPrint2("%s() LN%d, memory allocation %d bytes failed\n",
__func__, __LINE__, lenOfBinary + 1);
......@@ -5945,8 +6308,10 @@ static int64_t generateData(char *recBuf, char **data_type,
rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s);
free(s);
} else if (strcasecmp(data_type[i % columnCount], "NCHAR") == 0) {
char *s = malloc(lenOfBinary + 1);
break;
case TSDB_DATA_TYPE_NCHAR:
s = malloc(lenOfBinary + 1);
if (s == NULL) {
errorPrint2("%s() LN%d, memory allocation %d bytes failed\n",
__func__, __LINE__, lenOfBinary + 1);
......@@ -5955,6 +6320,16 @@ static int64_t generateData(char *recBuf, char **data_type,
rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s);
free(s);
break;
case TSDB_DATA_TYPE_NULL:
break;
default:
errorPrint2("%s() LN%d, Unknown data type %d\n",
__func__, __LINE__,
data_type[i]);
exit(EXIT_FAILURE);
}
if (strlen(recBuf) > MAX_DATA_SIZE) {
......@@ -5969,45 +6344,60 @@ static int64_t generateData(char *recBuf, char **data_type,
return (int32_t)strlen(recBuf);
}
static int generateSampleMemoryFromRand(SSuperTable *stbInfo)
static int generateSampleFromRand(
char *sampleDataBuf,
uint64_t lenOfOneRow,
int columnCount,
StrColumn *columns
)
{
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *buff = malloc(stbInfo->lenOfOneRow);
char *buff = malloc(lenOfOneRow);
if (NULL == buff) {
errorPrint2("%s() LN%d, memory allocation %"PRId64" bytes failed\n",
__func__, __LINE__, stbInfo->lenOfOneRow);
errorPrint2("%s() LN%d, memory allocation %"PRIu64" bytes failed\n",
__func__, __LINE__, lenOfOneRow);
exit(EXIT_FAILURE);
}
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
uint64_t pos = 0;
memset(buff, 0, stbInfo->lenOfOneRow);
memset(buff, 0, lenOfOneRow);
for (int c = 0; c < stbInfo->columnCount; c++) {
char *tmp;
if (0 == strncasecmp(stbInfo->columns[c].dataType,
"BINARY", strlen("BINARY"))) {
rand_string(data, stbInfo->columns[c].dataLen);
for (int c = 0; c < columnCount; c++) {
char *tmp = NULL;
uint32_t dataLen;
char data_type = (columns)?(columns[c].data_type):g_args.data_type[c];
switch(data_type) {
case TSDB_DATA_TYPE_BINARY:
dataLen = (columns)?columns[c].dataLen:g_args.binwidth;
rand_string(data, dataLen);
pos += sprintf(buff + pos, "%s,", data);
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"NCHAR", strlen("NCHAR"))) {
rand_string(data, stbInfo->columns[c].dataLen);
break;
case TSDB_DATA_TYPE_NCHAR:
dataLen = (columns)?columns[c].dataLen:g_args.binwidth;
rand_string(data, dataLen);
pos += sprintf(buff + pos, "%s,", data);
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"INT", strlen("INT"))) {
break;
case TSDB_DATA_TYPE_INT:
if ((g_args.demo_mode) && (c == 1)) {
tmp = demo_voltage_int_str();
} else {
tmp = rand_int_str();
}
pos += sprintf(buff + pos, "%s,", tmp);
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"BIGINT", strlen("BIGINT"))) {
break;
case TSDB_DATA_TYPE_BIGINT:
pos += sprintf(buff + pos, "%s,", rand_bigint_str());
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"FLOAT", strlen("FLOAT"))) {
break;
case TSDB_DATA_TYPE_FLOAT:
if (g_args.demo_mode) {
if (c == 0) {
tmp = demo_current_float_str();
......@@ -6018,32 +6408,79 @@ static int generateSampleMemoryFromRand(SSuperTable *stbInfo)
tmp = rand_float_str();
}
pos += sprintf(buff + pos, "%s,", tmp);
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"DOUBLE", strlen("DOUBLE"))) {
break;
case TSDB_DATA_TYPE_DOUBLE:
pos += sprintf(buff + pos, "%s,", rand_double_str());
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"SMALLINT", strlen("SMALLINT"))) {
break;
case TSDB_DATA_TYPE_SMALLINT:
pos += sprintf(buff + pos, "%s,", rand_smallint_str());
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"TINYINT", strlen("TINYINT"))) {
break;
case TSDB_DATA_TYPE_TINYINT:
pos += sprintf(buff + pos, "%s,", rand_tinyint_str());
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"BOOL", strlen("BOOL"))) {
break;
case TSDB_DATA_TYPE_BOOL:
pos += sprintf(buff + pos, "%s,", rand_bool_str());
} else if (0 == strncasecmp(stbInfo->columns[c].dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
break;
case TSDB_DATA_TYPE_TIMESTAMP:
pos += sprintf(buff + pos, "%s,", rand_bigint_str());
break;
case TSDB_DATA_TYPE_NULL:
break;
default:
errorPrint2("%s() LN%d, Unknown data type %s\n",
__func__, __LINE__,
(columns)?(columns[c].dataType):g_args.dataType[c]);
exit(EXIT_FAILURE);
}
}
*(buff + pos - 1) = 0;
memcpy(stbInfo->sampleDataBuf + i * stbInfo->lenOfOneRow, buff, pos);
memcpy(sampleDataBuf + i * lenOfOneRow, buff, pos);
}
free(buff);
return 0;
}
static int prepareSampleDataForSTable(SSuperTable *stbInfo) {
static int generateSampleFromRandForNtb()
{
return generateSampleFromRand(
g_sampleDataBuf,
g_args.lenOfOneRow,
g_args.columnCount,
NULL);
}
static int generateSampleFromRandForStb(SSuperTable *stbInfo)
{
return generateSampleFromRand(
stbInfo->sampleDataBuf,
stbInfo->lenOfOneRow,
stbInfo->columnCount,
stbInfo->columns);
}
static int prepareSampleForNtb() {
g_sampleDataBuf = calloc(g_args.lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (NULL == g_sampleDataBuf) {
errorPrint2("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__,
g_args.lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno));
return -1;
}
return generateSampleFromRandForNtb();
}
static int prepareSampleForStb(SSuperTable *stbInfo) {
stbInfo->sampleDataBuf = calloc(
stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
......@@ -6056,10 +6493,11 @@ static int prepareSampleDataForSTable(SSuperTable *stbInfo) {
}
int ret;
if (0 == strncasecmp(stbInfo->dataSource, "sample", strlen("sample")))
ret = readSampleFromCsvFileToMem(stbInfo);
else
ret = generateSampleMemoryFromRand(stbInfo);
if (0 == strncasecmp(stbInfo->dataSource, "sample", strlen("sample"))) {
ret = generateSampleFromCsvForStb(stbInfo);
} else {
ret = generateSampleFromRandForStb(stbInfo);
}
if (0 != ret) {
errorPrint2("%s() LN%d, read sample from csv file failed.\n",
......@@ -6184,7 +6622,7 @@ static int32_t generateDataTailWithoutStb(
int64_t retLen = 0;
char **data_type = g_args.datatype;
char *data_type = g_args.data_type;
int lenOfBinary = g_args.binwidth;
if (g_args.disorderRatio) {
......@@ -6370,7 +6808,7 @@ static int generateStbSQLHead(
dbName,
tableName,
dbName,
stbInfo->sTblName,
stbInfo->stbName,
tagsValBuf);
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == stbInfo->childTblExists) {
......@@ -6502,12 +6940,21 @@ static int64_t generateInterlaceDataWithoutStb(
static int32_t prepareStmtBindArrayByType(
TAOS_BIND *bind,
char *dataType, int32_t dataLen,
char data_type, int32_t dataLen,
int32_t timePrec,
char *value)
{
if (0 == strncasecmp(dataType,
"BINARY", strlen("BINARY"))) {
int32_t *bind_int;
int64_t *bind_bigint;
float *bind_float;
double *bind_double;
int8_t *bind_bool;
int64_t *bind_ts2;
int16_t *bind_smallint;
int8_t *bind_tinyint;
switch(data_type) {
case TSDB_DATA_TYPE_BINARY:
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint2("binary length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
......@@ -6529,8 +6976,9 @@ static int32_t prepareStmtBindArrayByType(
bind->length = &bind->buffer_length;
bind->buffer = bind_binary;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"NCHAR", strlen("NCHAR"))) {
break;
case TSDB_DATA_TYPE_NCHAR:
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint2("nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
......@@ -6551,9 +6999,10 @@ static int32_t prepareStmtBindArrayByType(
bind->buffer = bind_nchar;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"INT", strlen("INT"))) {
int32_t *bind_int = malloc(sizeof(int32_t));
break;
case TSDB_DATA_TYPE_INT:
bind_int = malloc(sizeof(int32_t));
assert(bind_int);
if (value) {
......@@ -6566,9 +7015,10 @@ static int32_t prepareStmtBindArrayByType(
bind->buffer = bind_int;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"BIGINT", strlen("BIGINT"))) {
int64_t *bind_bigint = malloc(sizeof(int64_t));
break;
case TSDB_DATA_TYPE_BIGINT:
bind_bigint = malloc(sizeof(int64_t));
assert(bind_bigint);
if (value) {
......@@ -6581,9 +7031,10 @@ static int32_t prepareStmtBindArrayByType(
bind->buffer = bind_bigint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"FLOAT", strlen("FLOAT"))) {
float *bind_float = malloc(sizeof(float));
break;
case TSDB_DATA_TYPE_FLOAT:
bind_float = malloc(sizeof(float));
assert(bind_float);
if (value) {
......@@ -6596,9 +7047,10 @@ static int32_t prepareStmtBindArrayByType(
bind->buffer = bind_float;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"DOUBLE", strlen("DOUBLE"))) {
double *bind_double = malloc(sizeof(double));
break;
case TSDB_DATA_TYPE_DOUBLE:
bind_double = malloc(sizeof(double));
assert(bind_double);
if (value) {
......@@ -6611,9 +7063,10 @@ static int32_t prepareStmtBindArrayByType(
bind->buffer = bind_double;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"SMALLINT", strlen("SMALLINT"))) {
int16_t *bind_smallint = malloc(sizeof(int16_t));
break;
case TSDB_DATA_TYPE_SMALLINT:
bind_smallint = malloc(sizeof(int16_t));
assert(bind_smallint);
if (value) {
......@@ -6626,9 +7079,10 @@ static int32_t prepareStmtBindArrayByType(
bind->buffer = bind_smallint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"TINYINT", strlen("TINYINT"))) {
int8_t *bind_tinyint = malloc(sizeof(int8_t));
break;
case TSDB_DATA_TYPE_TINYINT:
bind_tinyint = malloc(sizeof(int8_t));
assert(bind_tinyint);
if (value) {
......@@ -6641,9 +7095,10 @@ static int32_t prepareStmtBindArrayByType(
bind->buffer = bind_tinyint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"BOOL", strlen("BOOL"))) {
int8_t *bind_bool = malloc(sizeof(int8_t));
break;
case TSDB_DATA_TYPE_BOOL:
bind_bool = malloc(sizeof(int8_t));
assert(bind_bool);
if (value) {
......@@ -6660,10 +7115,10 @@ static int32_t prepareStmtBindArrayByType(
bind->buffer = bind_bool;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
break;
} else if (0 == strncasecmp(dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
int64_t *bind_ts2 = malloc(sizeof(int64_t));
case TSDB_DATA_TYPE_TIMESTAMP:
bind_ts2 = malloc(sizeof(int64_t));
assert(bind_ts2);
if (value) {
......@@ -6695,9 +7150,14 @@ static int32_t prepareStmtBindArrayByType(
bind->buffer = bind_ts2;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else {
errorPrint2("Not support data type: %s\n", dataType);
return -1;
break;
case TSDB_DATA_TYPE_NULL:
break;
default:
errorPrint2("Not support data type: %d\n", data_type);
exit(EXIT_FAILURE);
}
return 0;
......@@ -6705,13 +7165,23 @@ static int32_t prepareStmtBindArrayByType(
static int32_t prepareStmtBindArrayByTypeForRand(
TAOS_BIND *bind,
char *dataType, int32_t dataLen,
char data_type, int32_t dataLen,
int32_t timePrec,
char **ptr,
char *value)
{
if (0 == strncasecmp(dataType,
"BINARY", strlen("BINARY"))) {
int32_t *bind_int;
int64_t *bind_bigint;
float *bind_float;
double *bind_double;
int16_t *bind_smallint;
int8_t *bind_tinyint;
int8_t *bind_bool;
int64_t *bind_ts2;
switch(data_type) {
case TSDB_DATA_TYPE_BINARY:
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint2("binary length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
......@@ -6733,8 +7203,9 @@ static int32_t prepareStmtBindArrayByTypeForRand(
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"NCHAR", strlen("NCHAR"))) {
break;
case TSDB_DATA_TYPE_NCHAR:
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint2("nchar length overflow, max size: %u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
......@@ -6755,9 +7226,10 @@ static int32_t prepareStmtBindArrayByTypeForRand(
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"INT", strlen("INT"))) {
int32_t *bind_int = (int32_t *)*ptr;
break;
case TSDB_DATA_TYPE_INT:
bind_int = (int32_t *)*ptr;
if (value) {
*bind_int = atoi(value);
......@@ -6771,9 +7243,10 @@ static int32_t prepareStmtBindArrayByTypeForRand(
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"BIGINT", strlen("BIGINT"))) {
int64_t *bind_bigint = (int64_t *)*ptr;
break;
case TSDB_DATA_TYPE_BIGINT:
bind_bigint = (int64_t *)*ptr;
if (value) {
*bind_bigint = atoll(value);
......@@ -6787,9 +7260,10 @@ static int32_t prepareStmtBindArrayByTypeForRand(
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"FLOAT", strlen("FLOAT"))) {
float *bind_float = (float *)*ptr;
break;
case TSDB_DATA_TYPE_FLOAT:
bind_float = (float *)*ptr;
if (value) {
*bind_float = (float)atof(value);
......@@ -6803,9 +7277,10 @@ static int32_t prepareStmtBindArrayByTypeForRand(
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"DOUBLE", strlen("DOUBLE"))) {
double *bind_double = (double *)*ptr;
break;
case TSDB_DATA_TYPE_DOUBLE:
bind_double = (double *)*ptr;
if (value) {
*bind_double = atof(value);
......@@ -6819,9 +7294,10 @@ static int32_t prepareStmtBindArrayByTypeForRand(
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"SMALLINT", strlen("SMALLINT"))) {
int16_t *bind_smallint = (int16_t *)*ptr;
break;
case TSDB_DATA_TYPE_SMALLINT:
bind_smallint = (int16_t *)*ptr;
if (value) {
*bind_smallint = (int16_t)atoi(value);
......@@ -6835,9 +7311,10 @@ static int32_t prepareStmtBindArrayByTypeForRand(
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"TINYINT", strlen("TINYINT"))) {
int8_t *bind_tinyint = (int8_t *)*ptr;
break;
case TSDB_DATA_TYPE_TINYINT:
bind_tinyint = (int8_t *)*ptr;
if (value) {
*bind_tinyint = (int8_t)atoi(value);
......@@ -6851,9 +7328,10 @@ static int32_t prepareStmtBindArrayByTypeForRand(
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"BOOL", strlen("BOOL"))) {
int8_t *bind_bool = (int8_t *)*ptr;
break;
case TSDB_DATA_TYPE_BOOL:
bind_bool = (int8_t *)*ptr;
if (value) {
if (strncasecmp(value, "true", 4)) {
......@@ -6871,9 +7349,10 @@ static int32_t prepareStmtBindArrayByTypeForRand(
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
int64_t *bind_ts2 = (int64_t *)*ptr;
break;
case TSDB_DATA_TYPE_TIMESTAMP:
bind_ts2 = (int64_t *)*ptr;
if (value) {
if (strchr(value, ':') && strchr(value, '-')) {
......@@ -6905,8 +7384,10 @@ static int32_t prepareStmtBindArrayByTypeForRand(
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else {
errorPrint2("No support data type: %s\n", dataType);
break;
default:
errorPrint2("No support data type: %d\n", data_type);
return -1;
}
......@@ -6929,12 +7410,12 @@ static int32_t prepareStmtWithoutStb(
return ret;
}
char **data_type = g_args.datatype;
char *data_type = g_args.data_type;
char *bindArray = malloc(sizeof(TAOS_BIND) * (g_args.num_of_CPR + 1));
char *bindArray = malloc(sizeof(TAOS_BIND) * (g_args.columnCount + 1));
if (bindArray == NULL) {
errorPrint2("Failed to allocate %d bind params\n",
(g_args.num_of_CPR + 1));
(g_args.columnCount + 1));
return -1;
}
......@@ -6961,7 +7442,7 @@ static int32_t prepareStmtWithoutStb(
bind->length = &bind->buffer_length;
bind->is_null = NULL;
for (int i = 0; i < g_args.num_of_CPR; i ++) {
for (int i = 0; i < g_args.columnCount; i ++) {
bind = (TAOS_BIND *)((char *)bindArray
+ (sizeof(TAOS_BIND) * (i + 1)));
if ( -1 == prepareStmtBindArrayByType(
......@@ -7001,29 +7482,20 @@ static int32_t prepareStbStmtBindTag(
char *tagsVal,
int32_t timePrec)
{
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.binwidth);
if (bindBuffer == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, DOUBLE_BUFF_LEN);
return -1;
}
TAOS_BIND *tag;
for (int t = 0; t < stbInfo->tagCount; t ++) {
tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t));
if ( -1 == prepareStmtBindArrayByType(
tag,
stbInfo->tags[t].dataType,
stbInfo->tags[t].data_type,
stbInfo->tags[t].dataLen,
timePrec,
NULL)) {
free(bindBuffer);
return -1;
}
}
free(bindBuffer);
return 0;
}
......@@ -7033,13 +7505,6 @@ static int32_t prepareStbStmtBindRand(
int64_t startTime, int32_t recSeq,
int32_t timePrec)
{
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.binwidth);
if (bindBuffer == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, DOUBLE_BUFF_LEN);
return -1;
}
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
......@@ -7069,51 +7534,15 @@ static int32_t prepareStbStmtBindRand(
ptr += bind->buffer_length;
} else if ( -1 == prepareStmtBindArrayByTypeForRand(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].data_type,
stbInfo->columns[i-1].dataLen,
timePrec,
&ptr,
NULL)) {
tmfree(bindBuffer);
return -1;
}
}
tmfree(bindBuffer);
return 0;
}
static int32_t prepareStbStmtBindStartTime(
char *tableName,
int64_t *ts,
char *bindArray, SSuperTable *stbInfo,
int64_t startTime, int32_t recSeq,
int32_t timePrec)
{
TAOS_BIND *bind;
bind = (TAOS_BIND *)bindArray;
int64_t *bind_ts = ts;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
verbosePrint("%s() LN%d, tableName: %s, bind_ts=%"PRId64"\n",
__func__, __LINE__, tableName, *bind_ts);
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
return 0;
}
......@@ -7229,7 +7658,8 @@ UNUSED_FUNC static int32_t prepareStbStmtRand(
return k;
}
static int32_t prepareStbStmtWithSample(
#if STMT_BIND_PARAM_BATCH == 1
static int execBindParamBatch(
threadInfo *pThreadInfo,
char *tableName,
int64_t tableSeq,
......@@ -7240,60 +7670,586 @@ static int32_t prepareStbStmtWithSample(
int64_t *pSamplePos)
{
int ret;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
TAOS_STMT *stmt = pThreadInfo->stmt;
if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
char* tagsValBuf = NULL;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
uint32_t columnCount = (stbInfo)?pThreadInfo->stbInfo->columnCount:g_args.columnCount;
if (0 == stbInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
} else {
tagsValBuf = getTagValueFromTagSample(
stbInfo,
tableSeq % stbInfo->tagSampleCount);
}
uint32_t thisBatch = MAX_SAMPLES_ONCE_FROM_FILE - (*pSamplePos);
if (NULL == tagsValBuf) {
errorPrint2("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
if (thisBatch > batch) {
thisBatch = batch;
}
verbosePrint("%s() LN%d, batch=%d pos=%"PRId64" thisBatch=%d\n",
__func__, __LINE__, batch, *pSamplePos, thisBatch);
char *tagsArray = calloc(1, sizeof(TAOS_BIND) * stbInfo->tagCount);
if (NULL == tagsArray) {
tmfree(tagsValBuf);
errorPrint2("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
memset(pThreadInfo->bindParams, 0,
(sizeof(TAOS_MULTI_BIND) * (columnCount + 1)));
memset(pThreadInfo->is_null, 0, thisBatch);
for (int c = 0; c < columnCount + 1; c ++) {
TAOS_MULTI_BIND *param = (TAOS_MULTI_BIND *)(pThreadInfo->bindParams + sizeof(TAOS_MULTI_BIND) * c);
char data_type;
if (c == 0) {
data_type = TSDB_DATA_TYPE_TIMESTAMP;
param->buffer_length = sizeof(int64_t);
param->buffer = pThreadInfo->bind_ts_array;
} else {
data_type = (stbInfo)?stbInfo->columns[c-1].data_type:g_args.data_type[c-1];
char *tmpP;
switch(data_type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
param->buffer_length =
((stbInfo)?stbInfo->columns[c-1].dataLen:g_args.binwidth);
tmpP =
(char *)((uintptr_t)*(uintptr_t*)(stbInfo->sampleBindBatchArray
+sizeof(char*)*(c-1)));
verbosePrint("%s() LN%d, tmpP=%p pos=%"PRId64" width=%d position=%"PRId64"\n",
__func__, __LINE__, tmpP, *pSamplePos,
(((stbInfo)?stbInfo->columns[c-1].dataLen:g_args.binwidth)),
(*pSamplePos) *
(((stbInfo)?stbInfo->columns[c-1].dataLen:g_args.binwidth)));
param->buffer = (void *)(tmpP + *pSamplePos *
(((stbInfo)?stbInfo->columns[c-1].dataLen:g_args.binwidth))
);
break;
case TSDB_DATA_TYPE_INT:
param->buffer_length = sizeof(int32_t);
param->buffer = (stbInfo)?
(void *)((uintptr_t)*(uintptr_t*)(stbInfo->sampleBindBatchArray+sizeof(char*)*(c-1))
+ stbInfo->columns[c-1].dataLen * (*pSamplePos)):
(void *)((uintptr_t)*(uintptr_t*)(g_sampleBindBatchArray+sizeof(char*)*(c-1))
+ sizeof(int32_t)*(*pSamplePos));
break;
case TSDB_DATA_TYPE_TINYINT:
param->buffer_length = sizeof(int8_t);
param->buffer = (stbInfo)?
(void *)((uintptr_t)*(uintptr_t*)(
stbInfo->sampleBindBatchArray
+sizeof(char*)*(c-1))
+ stbInfo->columns[c-1].dataLen*(*pSamplePos)):
(void *)((uintptr_t)*(uintptr_t*)(
g_sampleBindBatchArray+sizeof(char*)*(c-1))
+ sizeof(int8_t)*(*pSamplePos));
break;
case TSDB_DATA_TYPE_SMALLINT:
param->buffer_length = sizeof(int16_t);
param->buffer = (stbInfo)?
(void *)((uintptr_t)*(uintptr_t*)(stbInfo->sampleBindBatchArray+sizeof(char*)*(c-1))
+ stbInfo->columns[c-1].dataLen * (*pSamplePos)):
(void *)((uintptr_t)*(uintptr_t*)(g_sampleBindBatchArray+sizeof(char*)*(c-1))
+ sizeof(int16_t)*(*pSamplePos));
break;
case TSDB_DATA_TYPE_BIGINT:
param->buffer_length = sizeof(int64_t);
param->buffer = (stbInfo)?
(void *)((uintptr_t)*(uintptr_t*)(stbInfo->sampleBindBatchArray+sizeof(char*)*(c-1))
+ stbInfo->columns[c-1].dataLen * (*pSamplePos)):
(void *)((uintptr_t)*(uintptr_t*)(g_sampleBindBatchArray+sizeof(char*)*(c-1))
+ sizeof(int64_t)*(*pSamplePos));
break;
case TSDB_DATA_TYPE_BOOL:
param->buffer_length = sizeof(int8_t);
param->buffer = (stbInfo)?
(void *)((uintptr_t)*(uintptr_t*)(stbInfo->sampleBindBatchArray+sizeof(char*)*(c-1))
+ stbInfo->columns[c-1].dataLen * (*pSamplePos)):
(void *)((uintptr_t)*(uintptr_t*)(g_sampleBindBatchArray+sizeof(char*)*(c-1))
+ sizeof(int8_t)*(*pSamplePos));
break;
case TSDB_DATA_TYPE_FLOAT:
param->buffer_length = sizeof(float);
param->buffer = (stbInfo)?
(void *)((uintptr_t)*(uintptr_t*)(stbInfo->sampleBindBatchArray+sizeof(char*)*(c-1))
+ stbInfo->columns[c-1].dataLen * (*pSamplePos)):
(void *)((uintptr_t)*(uintptr_t*)(g_sampleBindBatchArray+sizeof(char*)*(c-1))
+ sizeof(float)*(*pSamplePos));
break;
case TSDB_DATA_TYPE_DOUBLE:
param->buffer_length = sizeof(double);
param->buffer = (stbInfo)?
(void *)((uintptr_t)*(uintptr_t*)(stbInfo->sampleBindBatchArray+sizeof(char*)*(c-1))
+ stbInfo->columns[c-1].dataLen * (*pSamplePos)):
(void *)((uintptr_t)*(uintptr_t*)(g_sampleBindBatchArray+sizeof(char*)*(c-1))
+ sizeof(double)*(*pSamplePos));
break;
case TSDB_DATA_TYPE_TIMESTAMP:
param->buffer_length = sizeof(int64_t);
param->buffer = (stbInfo)?
(void *)((uintptr_t)*(uintptr_t*)(stbInfo->sampleBindBatchArray+sizeof(char*)*(c-1))
+ stbInfo->columns[c-1].dataLen * (*pSamplePos)):
(void *)((uintptr_t)*(uintptr_t*)(g_sampleBindBatchArray+sizeof(char*)*(c-1))
+ sizeof(int64_t)*(*pSamplePos));
break;
default:
errorPrint("%s() LN%d, wrong data type: %d\n",
__func__,
__LINE__,
data_type);
exit(EXIT_FAILURE);
}
}
param->buffer_type = data_type;
param->length = malloc(sizeof(int32_t) * thisBatch);
assert(param->length);
for (int b = 0; b < thisBatch; b++) {
if (param->buffer_type == TSDB_DATA_TYPE_NCHAR) {
param->length[b] = strlen(
(char *)param->buffer + b *
((stbInfo)?stbInfo->columns[c].dataLen:g_args.binwidth)
);
} else {
param->length[b] = param->buffer_length;
}
}
param->is_null = pThreadInfo->is_null;
param->num = thisBatch;
}
uint32_t k;
for (k = 0; k < thisBatch;) {
/* columnCount + 1 (ts) */
if (stbInfo->disorderRatio) {
*(pThreadInfo->bind_ts_array + k) = startTime + getTSRandTail(
stbInfo->timeStampStep, k,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*(pThreadInfo->bind_ts_array + k) = startTime + stbInfo->timeStampStep * k;
}
debugPrint("%s() LN%d, k=%d ts=%"PRId64"\n",
__func__, __LINE__,
k, *(pThreadInfo->bind_ts_array +k));
k++;
recordFrom ++;
(*pSamplePos) ++;
if ((*pSamplePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
*pSamplePos = 0;
}
if (recordFrom >= insertRows) {
break;
}
}
ret = taos_stmt_bind_param_batch(stmt, (TAOS_MULTI_BIND *)pThreadInfo->bindParams);
if (0 != ret) {
errorPrint2("%s() LN%d, stmt_bind_param() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
for (int c = 0; c < stbInfo->columnCount + 1; c ++) {
TAOS_MULTI_BIND *param = (TAOS_MULTI_BIND *)(pThreadInfo->bindParams + sizeof(TAOS_MULTI_BIND) * c);
free(param->length);
}
// if msg > 3MB, break
ret = taos_stmt_add_batch(stmt);
if (0 != ret) {
errorPrint2("%s() LN%d, stmt_add_batch() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
return k;
}
static int parseSamplefileToStmtBatch(
SSuperTable* stbInfo)
{
// char *sampleDataBuf = (stbInfo)?
// stbInfo->sampleDataBuf:g_sampleDataBuf;
int32_t columnCount = (stbInfo)?stbInfo->columnCount:g_args.columnCount;
char *sampleBindBatchArray = NULL;
if (stbInfo) {
stbInfo->sampleBindBatchArray = calloc(1, sizeof(uintptr_t *) * columnCount);
sampleBindBatchArray = stbInfo->sampleBindBatchArray;
} else {
g_sampleBindBatchArray = calloc(1, sizeof(uintptr_t *) * columnCount);
sampleBindBatchArray = g_sampleBindBatchArray;
}
assert(sampleBindBatchArray);
for (int c = 0; c < columnCount; c++) {
char data_type = (stbInfo)?stbInfo->columns[c].data_type:g_args.data_type[c];
char *tmpP = NULL;
switch(data_type) {
case TSDB_DATA_TYPE_INT:
tmpP = calloc(1, sizeof(int) * MAX_SAMPLES_ONCE_FROM_FILE);
assert(tmpP);
*(uintptr_t*)(sampleBindBatchArray+ sizeof(uintptr_t*)*c) = (uintptr_t)tmpP;
break;
case TSDB_DATA_TYPE_TINYINT:
tmpP = calloc(1, sizeof(int8_t) * MAX_SAMPLES_ONCE_FROM_FILE);
assert(tmpP);
*(uintptr_t*)(sampleBindBatchArray+ sizeof(uintptr_t*)*c) = (uintptr_t)tmpP;
break;
case TSDB_DATA_TYPE_SMALLINT:
tmpP = calloc(1, sizeof(int16_t) * MAX_SAMPLES_ONCE_FROM_FILE);
assert(tmpP);
*(uintptr_t*)(sampleBindBatchArray+ sizeof(uintptr_t*)*c) = (uintptr_t)tmpP;
break;
case TSDB_DATA_TYPE_BIGINT:
tmpP = calloc(1, sizeof(int64_t) * MAX_SAMPLES_ONCE_FROM_FILE);
assert(tmpP);
*(uintptr_t*)(sampleBindBatchArray+ sizeof(uintptr_t*)*c) = (uintptr_t)tmpP;
break;
case TSDB_DATA_TYPE_BOOL:
tmpP = calloc(1, sizeof(int8_t) * MAX_SAMPLES_ONCE_FROM_FILE);
assert(tmpP);
*(uintptr_t*)(sampleBindBatchArray+ sizeof(uintptr_t*)*c) = (uintptr_t)tmpP;
break;
case TSDB_DATA_TYPE_FLOAT:
tmpP = calloc(1, sizeof(float) * MAX_SAMPLES_ONCE_FROM_FILE);
assert(tmpP);
*(uintptr_t*)(sampleBindBatchArray+ sizeof(uintptr_t*)*c) = (uintptr_t)tmpP;
break;
case TSDB_DATA_TYPE_DOUBLE:
tmpP = calloc(1, sizeof(double) * MAX_SAMPLES_ONCE_FROM_FILE);
assert(tmpP);
*(uintptr_t*)(sampleBindBatchArray+ sizeof(uintptr_t*)*c) = (uintptr_t)tmpP;
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
tmpP = calloc(1, MAX_SAMPLES_ONCE_FROM_FILE *
(((stbInfo)?stbInfo->columns[c].dataLen:g_args.binwidth)));
assert(tmpP);
*(uintptr_t*)(sampleBindBatchArray+ sizeof(uintptr_t*)*c) = (uintptr_t)tmpP;
break;
case TSDB_DATA_TYPE_TIMESTAMP:
tmpP = calloc(1, sizeof(int64_t) * MAX_SAMPLES_ONCE_FROM_FILE);
assert(tmpP);
*(uintptr_t*)(sampleBindBatchArray+ sizeof(uintptr_t*)*c) = (uintptr_t)tmpP;
break;
default:
errorPrint("Unknown data type: %s\n",
(stbInfo)?stbInfo->columns[c].dataType:g_args.dataType[c]);
exit(EXIT_FAILURE);
}
}
char *sampleDataBuf = (stbInfo)?stbInfo->sampleDataBuf:g_sampleDataBuf;
int64_t lenOfOneRow = (stbInfo)?stbInfo->lenOfOneRow:g_args.lenOfOneRow;
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
int cursor = 0;
for (int c = 0; c < columnCount; c++) {
char data_type = (stbInfo)?
stbInfo->columns[c].data_type:
g_args.data_type[c];
char *restStr = sampleDataBuf
+ lenOfOneRow * i + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
char *tmpStr = calloc(1, index + 1);
if (NULL == tmpStr) {
errorPrint2("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, index + 1);
return -1;
}
strncpy(tmpStr, restStr, index);
cursor += index + 1; // skip ',' too
char *tmpP;
switch(data_type) {
case TSDB_DATA_TYPE_INT:
*((int32_t*)((uintptr_t)*(uintptr_t*)(sampleBindBatchArray
+sizeof(char*)*c)+sizeof(int32_t)*i)) =
atoi(tmpStr);
break;
case TSDB_DATA_TYPE_FLOAT:
*(float*)(((uintptr_t)*(uintptr_t*)(sampleBindBatchArray
+sizeof(char*)*c)+sizeof(float)*i)) =
(float)atof(tmpStr);
break;
case TSDB_DATA_TYPE_DOUBLE:
*(double*)(((uintptr_t)*(uintptr_t*)(sampleBindBatchArray
+sizeof(char*)*c)+sizeof(double)*i)) =
atof(tmpStr);
break;
case TSDB_DATA_TYPE_TINYINT:
*((int8_t*)((uintptr_t)*(uintptr_t*)(sampleBindBatchArray
+sizeof(char*)*c)+sizeof(int8_t)*i)) =
(int8_t)atoi(tmpStr);
break;
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t*)((uintptr_t)*(uintptr_t*)(sampleBindBatchArray
+sizeof(char*)*c)+sizeof(int16_t)*i)) =
(int16_t)atoi(tmpStr);
break;
case TSDB_DATA_TYPE_BIGINT:
*((int64_t*)((uintptr_t)*(uintptr_t*)(sampleBindBatchArray
+sizeof(char*)*c)+sizeof(int64_t)*i)) =
(int64_t)atol(tmpStr);
break;
case TSDB_DATA_TYPE_BOOL:
*((int8_t*)((uintptr_t)*(uintptr_t*)(sampleBindBatchArray
+sizeof(char*)*c)+sizeof(int8_t)*i)) =
(int8_t)atoi(tmpStr);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
*((int64_t*)((uintptr_t)*(uintptr_t*)(sampleBindBatchArray
+sizeof(char*)*c)+sizeof(int64_t)*i)) =
(int64_t)atol(tmpStr);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
tmpP = (char *)(*(uintptr_t*)(sampleBindBatchArray
+sizeof(char*)*c));
strcpy(tmpP + i*
(((stbInfo)?stbInfo->columns[c].dataLen:g_args.binwidth))
, tmpStr);
break;
default:
break;
}
free(tmpStr);
}
}
return 0;
}
static int parseSampleToStmtBatchForThread(
threadInfo *pThreadInfo, SSuperTable *stbInfo,
uint32_t timePrec,
uint32_t batch)
{
uint32_t columnCount = (stbInfo)?stbInfo->columnCount:g_args.columnCount;
pThreadInfo->bind_ts_array = malloc(sizeof(int64_t) * batch);
assert(pThreadInfo->bind_ts_array);
pThreadInfo->bindParams = malloc(sizeof(TAOS_MULTI_BIND) * (columnCount + 1));
assert(pThreadInfo->bindParams);
pThreadInfo->is_null = malloc(batch);
assert(pThreadInfo->is_null);
return 0;
}
static int parseStbSampleToStmtBatchForThread(
threadInfo *pThreadInfo,
SSuperTable *stbInfo,
uint32_t timePrec,
uint32_t batch)
{
return parseSampleToStmtBatchForThread(
pThreadInfo, stbInfo, timePrec, batch);
}
static int parseNtbSampleToStmtBatchForThread(
threadInfo *pThreadInfo, uint32_t timePrec, uint32_t batch)
{
return parseSampleToStmtBatchForThread(
pThreadInfo, NULL, timePrec, batch);
}
#else
static int parseSampleToStmt(
threadInfo *pThreadInfo,
SSuperTable *stbInfo, uint32_t timePrec)
{
pThreadInfo->sampleBindArray =
calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
if (pThreadInfo->sampleBindArray == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n",
__func__, __LINE__,
(uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
return -1;
}
if (-1 == prepareStbStmtBindTag(
tagsArray, stbInfo, tagsValBuf, pThreadInfo->time_precision
/* is tag */)) {
tmfree(tagsValBuf);
tmfree(tagsArray);
return -1;
}
int32_t columnCount = (stbInfo)?stbInfo->columnCount:g_args.columnCount;
char *sampleDataBuf = (stbInfo)?stbInfo->sampleDataBuf:g_sampleDataBuf;
int64_t lenOfOneRow = (stbInfo)?stbInfo->lenOfOneRow:g_args.lenOfOneRow;
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
char *bindArray =
calloc(1, sizeof(TAOS_BIND) * (columnCount + 1));
if (bindArray == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (columnCount + 1));
return -1;
}
TAOS_BIND *bind;
int cursor = 0;
for (int c = 0; c < columnCount + 1; c++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * c));
if (c == 0) {
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = NULL; //bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else {
char data_type = (stbInfo)?
stbInfo->columns[c-1].data_type:
g_args.data_type[c-1];
int32_t dataLen = (stbInfo)?
stbInfo->columns[c-1].dataLen:
g_args.binwidth;
char *restStr = sampleDataBuf
+ lenOfOneRow * i + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
char *bindBuffer = calloc(1, index + 1);
if (bindBuffer == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, index + 1);
return -1;
}
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if (-1 == prepareStmtBindArrayByType(
bind,
data_type,
dataLen,
timePrec,
bindBuffer)) {
free(bindBuffer);
return -1;
}
free(bindBuffer);
}
}
*((uintptr_t *)(pThreadInfo->sampleBindArray + (sizeof(char *)) * i)) =
(uintptr_t)bindArray;
}
return 0;
}
static int parseStbSampleToStmt(
threadInfo *pThreadInfo,
SSuperTable *stbInfo, uint32_t timePrec)
{
return parseSampleToStmt(
pThreadInfo,
stbInfo, timePrec);
}
static int parseNtbSampleToStmt(
threadInfo *pThreadInfo,
uint32_t timePrec)
{
return parseSampleToStmt(
pThreadInfo,
NULL,
timePrec);
}
static int32_t prepareStbStmtBindStartTime(
char *tableName,
int64_t *ts,
char *bindArray, SSuperTable *stbInfo,
int64_t startTime, int32_t recSeq)
{
TAOS_BIND *bind;
bind = (TAOS_BIND *)bindArray;
int64_t *bind_ts = ts;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
verbosePrint("%s() LN%d, tableName: %s, bind_ts=%"PRId64"\n",
__func__, __LINE__, tableName, *bind_ts);
ret = taos_stmt_set_tbname_tags(stmt, tableName, (TAOS_BIND *)tagsArray);
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
tmfree(tagsValBuf);
tmfree(tagsArray);
return 0;
}
if (0 != ret) {
errorPrint2("%s() LN%d, stmt_set_tbname_tags() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
} else {
ret = taos_stmt_set_tbname(stmt, tableName);
if (0 != ret) {
errorPrint2("%s() LN%d, stmt_set_tbname() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
}
static uint32_t execBindParam(
threadInfo *pThreadInfo,
char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int64_t *pSamplePos)
{
int ret;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
TAOS_STMT *stmt = pThreadInfo->stmt;
uint32_t k;
for (k = 0; k < batch;) {
......@@ -7304,8 +8260,7 @@ static int32_t prepareStbStmtWithSample(
tableName,
pThreadInfo->bind_ts,
bindArray, stbInfo,
startTime, k,
pThreadInfo->time_precision
startTime, k
/* is column */)) {
return -1;
}
......@@ -7338,6 +8293,96 @@ static int32_t prepareStbStmtWithSample(
return k;
}
#endif
static int32_t prepareStbStmtWithSample(
threadInfo *pThreadInfo,
char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int64_t *pSamplePos)
{
int ret;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
TAOS_STMT *stmt = pThreadInfo->stmt;
if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == stbInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
} else {
tagsValBuf = getTagValueFromTagSample(
stbInfo,
tableSeq % stbInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
errorPrint2("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
char *tagsArray = calloc(1, sizeof(TAOS_BIND) * stbInfo->tagCount);
if (NULL == tagsArray) {
tmfree(tagsValBuf);
errorPrint2("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
if (-1 == prepareStbStmtBindTag(
tagsArray, stbInfo, tagsValBuf, pThreadInfo->time_precision
/* is tag */)) {
tmfree(tagsValBuf);
tmfree(tagsArray);
return -1;
}
ret = taos_stmt_set_tbname_tags(stmt, tableName, (TAOS_BIND *)tagsArray);
tmfree(tagsValBuf);
tmfree(tagsArray);
if (0 != ret) {
errorPrint2("%s() LN%d, stmt_set_tbname_tags() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
} else {
ret = taos_stmt_set_tbname(stmt, tableName);
if (0 != ret) {
errorPrint2("%s() LN%d, stmt_set_tbname() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
}
#if STMT_BIND_PARAM_BATCH == 1
return execBindParamBatch(
pThreadInfo,
tableName,
tableSeq,
batch,
insertRows,
recordFrom,
startTime,
pSamplePos);
#else
return execBindParam(
pThreadInfo,
tableName,
tableSeq,
batch,
insertRows,
recordFrom,
startTime,
pSamplePos);
#endif
}
static int32_t generateStbProgressiveData(
SSuperTable *stbInfo,
......@@ -7367,7 +8412,7 @@ static int32_t generateStbProgressiveData(
int64_t dataLen;
return generateStbDataTail(stbInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen,
g_args.reqPerReq, pstr, *pRemainderBufLen,
insertRows, recordFrom,
startTime,
pSamplePos, &dataLen);
......@@ -7399,20 +8444,22 @@ static int32_t generateProgressiveDataWithoutStb(
int64_t dataLen;
return generateDataTailWithoutStb(
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, recordFrom,
g_args.reqPerReq, pstr, *pRemainderBufLen, insertRows, recordFrom,
startTime,
/*pSamplePos, */&dataLen);
}
static void printStatPerThread(threadInfo *pThreadInfo)
{
if (0 == pThreadInfo->totalDelay)
pThreadInfo->totalDelay = 1;
fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows,
(pThreadInfo->totalDelay)?
(double)(pThreadInfo->totalAffectedRows/((double)pThreadInfo->totalDelay/1000000.0)):
FLT_MAX);
(double)(pThreadInfo->totalAffectedRows/((double)pThreadInfo->totalDelay/1000000.0))
);
}
// sync write interlace data
......@@ -7441,7 +8488,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
nTimeStampStep = stbInfo->timeStampStep;
insert_interval = stbInfo->insertInterval;
} else {
insertRows = g_args.num_of_DPT;
insertRows = g_args.insertRows;
interlaceRows = g_args.interlace_rows;
maxSqlLen = g_args.max_sql_len;
nTimeStampStep = g_args.timestamp_step;
......@@ -7456,15 +8503,15 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (interlaceRows > insertRows)
interlaceRows = insertRows;
if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR;
if (interlaceRows > g_args.reqPerReq)
interlaceRows = g_args.reqPerReq;
uint32_t batchPerTbl = interlaceRows;
uint32_t batchPerTblTimes;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes =
g_args.num_of_RPR / interlaceRows;
g_args.reqPerReq / interlaceRows;
} else {
batchPerTblTimes = 1;
}
......@@ -7514,6 +8561,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint32_t recOfBatch = 0;
int32_t generated;
for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
char tableName[TSDB_TABLE_NAME_LEN];
......@@ -7527,7 +8575,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint64_t oldRemainderLen = remainderBufLen;
int32_t generated;
if (stbInfo) {
if (stbInfo->iface == STMT_IFACE) {
generated = prepareStbStmtWithSample(
......@@ -7608,7 +8655,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if ((remainRows > 0) && (batchPerTbl > remainRows))
batchPerTbl = remainRows;
if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR)
if (pThreadInfo->ntables * batchPerTbl < g_args.reqPerReq)
break;
}
......@@ -7616,7 +8663,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->threadID, __func__, __LINE__,
generatedRecPerTbl, insertRows);
if ((g_args.num_of_RPR - recOfBatch) < batchPerTbl)
if ((g_args.reqPerReq - recOfBatch) < batchPerTbl)
break;
}
......@@ -7708,7 +8755,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int64_t timeStampStep =
stbInfo?stbInfo->timeStampStep:g_args.timestamp_step;
int64_t insertRows =
(stbInfo)?stbInfo->insertRows:g_args.num_of_DPT;
(stbInfo)?stbInfo->insertRows:g_args.insertRows;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
__func__, __LINE__, insertRows);
......@@ -7769,7 +8816,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
(g_args.reqPerReq>stbInfo->insertRows)?
stbInfo->insertRows:
g_args.reqPerReq,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
} else {
......@@ -7786,7 +8835,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
generated = prepareStmtWithoutStb(
pThreadInfo,
tableName,
g_args.num_of_RPR,
g_args.reqPerReq,
insertRows, i,
start_time);
} else {
......@@ -7854,7 +8903,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (i >= insertRows)
break;
} // num_of_DPT
} // insertRows
if ((g_args.verbose_print) &&
(tableSeq == pThreadInfo->ntables - 1) && (stbInfo)
......@@ -7865,8 +8914,10 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
__func__, __LINE__, pThreadInfo->samplePos);
}
} // tableSeq
if (percentComplete < 100)
if (percentComplete < 100) {
printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete);
}
free_of_progressive:
tmfree(pThreadInfo->buffer);
......@@ -7919,11 +8970,11 @@ static void callBack(void *param, TAOS_RES *res, int code) {
char *buffer = calloc(1, pThreadInfo->stbInfo->maxSqlLen);
char data[MAX_DATA_SIZE];
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%"PRId64" values",
pstr += sprintf(pstr, "INSERT INTO %s.%s%"PRId64" VALUES",
pThreadInfo->db_name, pThreadInfo->tb_prefix,
pThreadInfo->start_table_from);
// if (pThreadInfo->counter >= pThreadInfo->stbInfo->insertRows) {
if (pThreadInfo->counter >= g_args.num_of_RPR) {
if (pThreadInfo->counter >= g_args.reqPerReq) {
pThreadInfo->start_table_from++;
pThreadInfo->counter = 0;
}
......@@ -7934,7 +8985,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
return;
}
for (int i = 0; i < g_args.num_of_RPR; i++) {
for (int i = 0; i < g_args.reqPerReq; i++) {
int rand_num = taosRandom() % 100;
if (0 != pThreadInfo->stbInfo->disorderRatio
&& rand_num < pThreadInfo->stbInfo->disorderRatio) {
......@@ -8014,81 +9065,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
return 0;
}
static int parseSampleFileToStmt(
threadInfo *pThreadInfo,
SSuperTable *stbInfo, uint32_t timePrec)
{
pThreadInfo->sampleBindArray =
calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
if (pThreadInfo->sampleBindArray == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n",
__func__, __LINE__,
(uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
return -1;
}
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
char *bindArray =
calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
return -1;
}
TAOS_BIND *bind;
int cursor = 0;
for (int c = 0; c < stbInfo->columnCount + 1; c++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * c));
if (c == 0) {
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = NULL; //bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else {
char *restStr = stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * i + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
char *bindBuffer = calloc(1, index + 1);
if (bindBuffer == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, DOUBLE_BUFF_LEN);
return -1;
}
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if (-1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[c-1].dataType,
stbInfo->columns[c-1].dataLen,
timePrec,
bindBuffer)) {
free(bindBuffer);
return -1;
}
free(bindBuffer);
}
}
*((uintptr_t *)(pThreadInfo->sampleBindArray + (sizeof(char *)) * i)) =
(uintptr_t)bindArray;
}
return 0;
}
static void startMultiThreadInsertData(int threads, char* db_name,
char* precision, SSuperTable* stbInfo) {
......@@ -8126,13 +9102,18 @@ static void startMultiThreadInsertData(int threads, char* db_name,
__func__, __LINE__, start_time);
// read sample data from file first
int ret;
if (stbInfo) {
if (0 != prepareSampleDataForSTable(stbInfo)) {
ret = prepareSampleForStb(stbInfo);
} else {
ret = prepareSampleForNtb();
}
if (0 != ret) {
errorPrint2("%s() LN%d, prepare sample data for stable failed!\n",
__func__, __LINE__);
exit(EXIT_FAILURE);
}
}
TAOS* taos0 = taos_connect(
g_Dbs.host, g_Dbs.user,
......@@ -8162,6 +9143,12 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|| ((stbInfo->childTblOffset
+ stbInfo->childTblLimit)
> (stbInfo->childTblCount))) {
if (stbInfo->childTblCount < stbInfo->childTblOffset) {
printf("WARNING: offset will not be used since the child tables count is less then offset!\n");
stbInfo->childTblOffset = 0;
}
stbInfo->childTblLimit =
stbInfo->childTblCount - stbInfo->childTblOffset;
}
......@@ -8200,12 +9187,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos0,
db_name, stbInfo->sTblName,
db_name, stbInfo->stbName,
&stbInfo->childTblName, &childTblCount,
limit,
offset);
ntables = childTblCount; // CBD
} else {
ntables = g_args.num_of_tables;
ntables = g_args.ntables;
tableFrom = 0;
}
......@@ -8241,6 +9229,35 @@ static void startMultiThreadInsertData(int threads, char* db_name,
char *stmtBuffer = calloc(1, BUFFER_SIZE);
assert(stmtBuffer);
#if STMT_BIND_PARAM_BATCH == 1
uint32_t interlaceRows;
uint32_t batch;
if (stbInfo) {
if ((stbInfo->interlaceRows == 0)
&& (g_args.interlace_rows > 0)) {
interlaceRows = g_args.interlace_rows;
if (interlaceRows > stbInfo->insertRows) {
interlaceRows = stbInfo->insertRows;
}
} else {
interlaceRows = stbInfo->interlaceRows;
}
} else {
interlaceRows = g_args.interlace_rows;
}
if (interlaceRows > 0) {
batch = interlaceRows;
} else {
batch = (g_args.reqPerReq>g_args.insertRows)?
g_args.insertRows:g_args.reqPerReq;
}
#endif
if ((g_args.iface == STMT_IFACE)
|| ((stbInfo)
&& (stbInfo->iface == STMT_IFACE))) {
......@@ -8250,7 +9267,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
&& (AUTO_CREATE_SUBTBL
== stbInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
stbInfo->sTblName);
stbInfo->stbName);
for (int tag = 0; tag < (stbInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?");
......@@ -8260,12 +9277,9 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
int columnCount;
if (stbInfo) {
columnCount = stbInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
int columnCount = (stbInfo)?
stbInfo->columnCount:
g_args.columnCount;
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
......@@ -8273,6 +9287,9 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer);
#if STMT_BIND_PARAM_BATCH == 1
parseSamplefileToStmtBatch(stbInfo);
#endif
}
for (int i = 0; i < threads; i++) {
......@@ -8316,8 +9333,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
exit(EXIT_FAILURE);
}
int ret = taos_stmt_prepare(pThreadInfo->stmt, stmtBuffer, 0);
if (ret != 0) {
if (0 != taos_stmt_prepare(pThreadInfo->stmt, stmtBuffer, 0)) {
free(pids);
free(infos);
free(stmtBuffer);
......@@ -8328,7 +9344,19 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->bind_ts = malloc(sizeof(int64_t));
if (stbInfo) {
parseSampleFileToStmt(pThreadInfo, stbInfo, timePrec);
#if STMT_BIND_PARAM_BATCH == 1
parseStbSampleToStmtBatchForThread(
pThreadInfo, stbInfo, timePrec, batch);
#else
parseStbSampleToStmt(pThreadInfo, stbInfo, timePrec);
#endif
} else {
#if STMT_BIND_PARAM_BATCH == 1
parseNtbSampleToStmtBatchForThread(
pThreadInfo, timePrec, batch);
#else
parseNtbSampleToStmt(pThreadInfo, timePrec);
#endif
}
}
} else {
......@@ -8373,19 +9401,29 @@ static void startMultiThreadInsertData(int threads, char* db_name,
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
tsem_destroy(&(pThreadInfo->lock_sem));
taos_close(pThreadInfo->taos);
if (pThreadInfo->stmt) {
taos_stmt_close(pThreadInfo->stmt);
tmfree((char *)pThreadInfo->bind_ts);
}
tsem_destroy(&(pThreadInfo->lock_sem));
taos_close(pThreadInfo->taos);
#if STMT_BIND_PARAM_BATCH == 1
tmfree((char *)pThreadInfo->bind_ts);
tmfree((char *)pThreadInfo->bind_ts_array);
tmfree(pThreadInfo->bindParams);
tmfree(pThreadInfo->is_null);
#else
tmfree((char *)pThreadInfo->bind_ts);
if (pThreadInfo->sampleBindArray) {
for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) {
uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)(
pThreadInfo->sampleBindArray
+ sizeof(uintptr_t *) * k));
for (int c = 1; c < pThreadInfo->stbInfo->columnCount + 1; c++) {
int columnCount = (pThreadInfo->stbInfo)?
pThreadInfo->stbInfo->columnCount:
g_args.columnCount;
for (int c = 1; c < columnCount + 1; c++) {
TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c));
if (bind)
tmfree(bind->buffer);
......@@ -8394,6 +9432,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
tmfree(pThreadInfo->sampleBindArray);
}
#endif
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
__func__, __LINE__,
......@@ -8412,7 +9451,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if (pThreadInfo->maxDelay > maxDelay) maxDelay = pThreadInfo->maxDelay;
if (pThreadInfo->minDelay < minDelay) minDelay = pThreadInfo->minDelay;
}
cntDelay -= 1;
if (cntDelay == 0) cntDelay = 1;
avgDelay = (double)totalDelay / cntDelay;
......@@ -8427,7 +9465,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
fprintf(stderr, "Spent %.4f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, stbInfo->totalInsertRows,
stbInfo->totalAffectedRows,
threads, db_name, stbInfo->sTblName,
threads, db_name, stbInfo->stbName,
(double)(stbInfo->totalInsertRows/tInMs));
if (g_fpOfInsertResult) {
......@@ -8435,7 +9473,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
"Spent %.4f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, stbInfo->totalInsertRows,
stbInfo->totalAffectedRows,
threads, db_name, stbInfo->sTblName,
threads, db_name, stbInfo->stbName,
(double)(stbInfo->totalInsertRows/tInMs));
}
} else {
......@@ -8488,16 +9526,16 @@ static void *readTable(void *sarg) {
return NULL;
}
int64_t num_of_DPT;
int64_t insertRows;
/* if (pThreadInfo->stbInfo) {
num_of_DPT = pThreadInfo->stbInfo->insertRows; // nrecords_per_table;
insertRows = pThreadInfo->stbInfo->insertRows; // nrecords_per_table;
} else {
*/
num_of_DPT = g_args.num_of_DPT;
insertRows = g_args.insertRows;
// }
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
int64_t ntables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = insertRows * ntables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
int n = do_aggreFunc ? (sizeof(g_aggreFunc) / sizeof(g_aggreFunc[0])) : 2;
......@@ -8510,8 +9548,8 @@ static void *readTable(void *sarg) {
for (int j = 0; j < n; j++) {
double totalT = 0;
uint64_t count = 0;
for (int64_t i = 0; i < num_of_tables; i++) {
sprintf(command, "select %s from %s%"PRId64" where ts>= %" PRIu64,
for (int64_t i = 0; i < ntables; i++) {
sprintf(command, "SELECT %s FROM %s%"PRId64" WHERE ts>= %" PRIu64,
g_aggreFunc[j], tb_prefix, i, sTime);
double t = taosGetTimestampMs();
......@@ -8539,7 +9577,7 @@ static void *readTable(void *sarg) {
fprintf(fp, "|%10s | %"PRId64" | %12.2f | %10.2f |\n",
g_aggreFunc[j][0] == '*' ? " * " : g_aggreFunc[j], totalData,
(double)(num_of_tables * num_of_DPT) / totalT, totalT * 1000);
(double)(ntables * insertRows) / totalT, totalT * 1000);
printf("select %10s took %.6f second(s)\n", g_aggreFunc[j], totalT * 1000);
}
fprintf(fp, "\n");
......@@ -8564,9 +9602,9 @@ static void *readMetric(void *sarg) {
return NULL;
}
int64_t num_of_DPT = pThreadInfo->stbInfo->insertRows;
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
int64_t insertRows = pThreadInfo->stbInfo->insertRows;
int64_t ntables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = insertRows * ntables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
int n = do_aggreFunc ? (sizeof(g_aggreFunc) / sizeof(g_aggreFunc[0])) : 2;
......@@ -8580,7 +9618,7 @@ static void *readMetric(void *sarg) {
char condition[COND_BUF_LEN] = "\0";
char tempS[64] = "\0";
int64_t m = 10 < num_of_tables ? 10 : num_of_tables;
int64_t m = 10 < ntables ? 10 : ntables;
for (int64_t i = 1; i <= m; i++) {
if (i == 1) {
......@@ -8590,7 +9628,7 @@ static void *readMetric(void *sarg) {
}
strncat(condition, tempS, COND_BUF_LEN - 1);
sprintf(command, "select %s from meters where %s", g_aggreFunc[j], condition);
sprintf(command, "SELECT %s FROM meters WHERE %s", g_aggreFunc[j], condition);
printf("Where condition: %s\n", condition);
fprintf(fp, "%s\n", command);
......@@ -8615,7 +9653,7 @@ static void *readMetric(void *sarg) {
t = taosGetTimestampMs() - t;
fprintf(fp, "| Speed: %12.2f(per s) | Latency: %.4f(ms) |\n",
num_of_tables * num_of_DPT / (t * 1000.0), t);
ntables * insertRows / (t * 1000.0), t);
printf("select %10s took %.6f second(s)\n\n", g_aggreFunc[j], t * 1000.0);
taos_free_result(pSql);
......@@ -8948,7 +9986,7 @@ static int queryTestProcess() {
if (0 != g_queryInfo.superQueryInfo.sqlCount) {
getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.superQueryInfo.sTblName,
g_queryInfo.superQueryInfo.stbName,
&g_queryInfo.superQueryInfo.childTblName,
&g_queryInfo.superQueryInfo.childTblCount);
}
......@@ -9004,7 +10042,7 @@ static int queryTestProcess() {
}
}
pThreadInfo->taos = NULL;// TODO: workaround to use separate taos connection;
pThreadInfo->taos = NULL;// workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedTableQuery,
pThreadInfo);
......@@ -9054,7 +10092,7 @@ static int queryTestProcess() {
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pThreadInfo->taos = NULL; // workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
}
......@@ -9081,7 +10119,7 @@ static int queryTestProcess() {
tmfree((char*)pidsOfSub);
tmfree((char*)infosOfSub);
// taos_close(taos);// TODO: workaround to use separate taos connection;
// taos_close(taos);// workaround to use separate taos connection;
uint64_t endTs = taosGetTimestampMs();
uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried +
......@@ -9441,12 +10479,12 @@ static int subscribeTestProcess() {
if (0 != g_queryInfo.superQueryInfo.sqlCount) {
getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.superQueryInfo.sTblName,
g_queryInfo.superQueryInfo.stbName,
&g_queryInfo.superQueryInfo.childTblName,
&g_queryInfo.superQueryInfo.childTblCount);
}
taos_close(taos); // TODO: workaround to use separate taos connection;
taos_close(taos); // workaround to use separate taos connection;
pthread_t *pids = NULL;
threadInfo *infos = NULL;
......@@ -9488,7 +10526,7 @@ static int subscribeTestProcess() {
threadInfo *pThreadInfo = infos + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pThreadInfo->taos = NULL; // workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedSubscribe, pThreadInfo);
}
}
......@@ -9545,7 +10583,7 @@ static int subscribeTestProcess() {
pThreadInfo->ntables = j<b?a+1:a;
pThreadInfo->end_table_to = j<b?tableFrom+a:tableFrom+a-1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pThreadInfo->taos = NULL; // workaround to use separate taos connection;
pthread_create(pidsOfStable + seq,
NULL, superSubscribe, pThreadInfo);
}
......@@ -9618,8 +10656,8 @@ static void setParaFromArg() {
g_Dbs.port = g_args.port;
}
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountForCreateTbl = g_args.num_of_threads;
g_Dbs.threadCount = g_args.nthreads;
g_Dbs.threadCountForCreateTbl = g_args.nthreads;
g_Dbs.dbCount = 1;
g_Dbs.db[0].drop = true;
......@@ -9636,22 +10674,23 @@ static void setParaFromArg() {
g_Dbs.do_aggreFunc = true;
char dataString[TSDB_MAX_BYTES_PER_ROW];
char **data_type = g_args.datatype;
char *data_type = g_args.data_type;
char **dataType = g_args.dataType;
memset(dataString, 0, TSDB_MAX_BYTES_PER_ROW);
if (strcasecmp(data_type[0], "BINARY") == 0
|| strcasecmp(data_type[0], "BOOL") == 0
|| strcasecmp(data_type[0], "NCHAR") == 0 ) {
if ((data_type[0] == TSDB_DATA_TYPE_BINARY)
|| (data_type[0] == TSDB_DATA_TYPE_BOOL)
|| (data_type[0] == TSDB_DATA_TYPE_NCHAR)) {
g_Dbs.do_aggreFunc = false;
}
if (g_args.use_metric) {
g_Dbs.db[0].superTblCount = 1;
tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", TSDB_TABLE_NAME_LEN);
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountForCreateTbl = g_args.num_of_threads;
tstrncpy(g_Dbs.db[0].superTbls[0].stbName, "meters", TSDB_TABLE_NAME_LEN);
g_Dbs.db[0].superTbls[0].childTblCount = g_args.ntables;
g_Dbs.threadCount = g_args.nthreads;
g_Dbs.threadCountForCreateTbl = g_args.nthreads;
g_Dbs.asyncMode = g_args.async_mode;
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
......@@ -9671,26 +10710,28 @@ static void setParaFromArg() {
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = g_args.timestamp_step;
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
g_Dbs.db[0].superTbls[0].insertRows = g_args.insertRows;
g_Dbs.db[0].superTbls[0].maxSqlLen = g_args.max_sql_len;
g_Dbs.db[0].superTbls[0].columnCount = 0;
for (int i = 0; i < MAX_NUM_COLUMNS; i++) {
if (data_type[i] == NULL) {
if (data_type[i] == TSDB_DATA_TYPE_NULL) {
break;
}
g_Dbs.db[0].superTbls[0].columns[i].data_type = data_type[i];
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
data_type[i], min(DATATYPE_BUFF_LEN, strlen(data_type[i]) + 1));
dataType[i], min(DATATYPE_BUFF_LEN, strlen(dataType[i]) + 1));
g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.binwidth;
g_Dbs.db[0].superTbls[0].columnCount++;
}
if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) {
g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR;
if (g_Dbs.db[0].superTbls[0].columnCount > g_args.columnCount) {
g_Dbs.db[0].superTbls[0].columnCount = g_args.columnCount;
} else {
for (int i = g_Dbs.db[0].superTbls[0].columnCount;
i < g_args.num_of_CPR; i++) {
i < g_args.columnCount; i++) {
g_Dbs.db[0].superTbls[0].columns[i].data_type = TSDB_DATA_TYPE_INT;
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
"INT", min(DATATYPE_BUFF_LEN, strlen("INT") + 1));
g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0;
......@@ -9707,7 +10748,7 @@ static void setParaFromArg() {
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.binwidth;
g_Dbs.db[0].superTbls[0].tagCount = 2;
} else {
g_Dbs.threadCountForCreateTbl = g_args.num_of_threads;
g_Dbs.threadCountForCreateTbl = g_args.nthreads;
g_Dbs.db[0].superTbls[0].tagCount = 0;
}
}
......@@ -9840,8 +10881,8 @@ static void queryResult() {
tstrncpy(pThreadInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix, TBNAME_PREFIX_LEN);
} else {
pThreadInfo->ntables = g_args.num_of_tables;
pThreadInfo->end_table_to = g_args.num_of_tables -1;
pThreadInfo->ntables = g_args.ntables;
pThreadInfo->end_table_to = g_args.ntables -1;
tstrncpy(pThreadInfo->tb_prefix, g_args.tb_prefix, TSDB_TABLE_NAME_LEN);
}
......
......@@ -5,7 +5,7 @@
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count": 1,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
......
......@@ -26,7 +26,7 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numberOfTables = 10
self.numberOfTables = 8
self.numberOfRecords = 1000000
def getBuildPath(self):
......@@ -86,7 +86,7 @@ class TDTestCase:
while True:
print("query started")
try:
tdSql.query("select * from test.t9")
tdSql.query("select * from test.t7")
except Exception as e:
tdLog.info("select * test failed")
time.sleep(2)
......@@ -100,8 +100,8 @@ class TDTestCase:
print("alter table test.meters add column c10 int")
tdSql.execute("alter table test.meters add column c10 int")
print("insert into test.t9 values (now, 1, 2, 3, 4, 0)")
tdSql.execute("insert into test.t9 values (now, 1, 2, 3, 4, 0)")
print("insert into test.t7 values (now, 1, 2, 3, 4, 0)")
tdSql.execute("insert into test.t7 values (now, 1, 2, 3, 4, 0)")
def run(self):
tdSql.prepare()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册