未验证 提交 72e2c66d 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-4068]<feature>: taosdemo support stmt. (#6270)

* [TD-4068]<feature>: taosdemo support stmt.

for easy merge purpose. disabled in master.

* fix clang compile error.
上级 339b8077
......@@ -19,6 +19,7 @@
*/
#include <stdint.h>
#include <taos.h>
#define _GNU_SOURCE
#define CURL_STATICLIB
......@@ -52,6 +53,8 @@
#include "taoserror.h"
#include "tutil.h"
#define STMT_IFACE_ENABLED 0
#define REQ_EXTRA_BUF_LEN 1024
#define RESP_BUF_LEN 4096
......@@ -61,6 +64,8 @@ extern char configDir[];
#define QUERY_JSON_NAME "query.json"
#define SUBSCRIBE_JSON_NAME "subscribe.json"
#define STR_INSERT_INTO "INSERT INTO "
enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
......@@ -70,6 +75,8 @@ enum TEST_MODE {
#define MAX_RECORDS_PER_REQ 32766
#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into ..
#define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2)
#define COND_BUF_LEN BUFFER_SIZE - 30
......@@ -120,17 +127,24 @@ enum enumSYNC_MODE {
MODE_BUT
};
enum enum_TAOS_INTERFACE {
TAOSC_IFACE,
REST_IFACE,
STMT_IFACE,
INTERFACE_BUT
};
typedef enum enumQUERY_CLASS {
SPECIFIED_CLASS,
STABLE_CLASS,
CLASS_BUT
} QUERY_CLASS;
typedef enum enum_INSERT_MODE {
typedef enum enum_PROGRESSIVE_OR_INTERLACE {
PROGRESSIVE_INSERT_MODE,
INTERLACE_INSERT_MODE,
INVALID_INSERT_MODE
} INSERT_MODE;
} PROG_OR_INTERLACE_MODE;
typedef enum enumQUERY_TYPE {
NO_INSERT_TYPE,
......@@ -196,6 +210,7 @@ typedef struct SArguments_S {
uint32_t test_mode;
char * host;
uint16_t port;
uint16_t iface;
char * user;
char * password;
char * database;
......@@ -217,13 +232,13 @@ typedef struct SArguments_S {
uint32_t num_of_threads;
uint64_t insert_interval;
int64_t query_times;
uint64_t interlace_rows;
uint64_t num_of_RPR; // num_of_records_per_req
uint32_t interlace_rows;
uint32_t num_of_RPR; // num_of_records_per_req
uint64_t max_sql_len;
int64_t num_of_tables;
int64_t num_of_DPT;
int abort;
int disorderRatio; // 0: no disorder, >0: x%
uint32_t disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision
uint32_t method_of_delete;
char ** arg_list;
......@@ -246,12 +261,12 @@ typedef struct SSuperTable_S {
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
uint16_t iface; // 0: taosc, 1: rest, 2: stmt
int64_t childTblLimit;
uint64_t childTblOffset;
// int multiThreadWriteOneTbl; // 0: no, 1: yes
uint64_t interlaceRows; //
uint32_t interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision
uint64_t maxSqlLen; //
......@@ -364,7 +379,7 @@ typedef struct SDbs_S {
typedef struct SpecifiedQueryInfo_S {
uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint32_t concurrent;
int sqlCount;
uint64_t sqlCount;
uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms
uint64_t queryTimes;
......@@ -392,7 +407,7 @@ typedef struct SuperQueryInfo_S {
uint64_t queryTimes;
int64_t childTblCount;
char childTblPrefix[MAX_TB_NAME_SIZE];
int sqlCount;
uint64_t sqlCount;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume;
......@@ -420,6 +435,7 @@ typedef struct SQueryMetaInfo_S {
typedef struct SThreadInfo_S {
TAOS * taos;
TAOS_STMT *stmt;
int threadID;
char db_name[MAX_DB_NAME_SIZE+1];
uint32_t time_precision;
......@@ -434,6 +450,7 @@ typedef struct SThreadInfo_S {
char* cols;
bool use_metric;
SSuperTable* superTblInfo;
char *buffer; // sql cmd buffer
// for async insert
tsem_t lock_sem;
......@@ -557,6 +574,7 @@ SArguments g_args = {
0, // test_mode
"127.0.0.1", // host
6030, // port
TAOSC_IFACE, // iface
"root", // user
#ifdef _TD_POWER_
"powerdb", // password
......@@ -673,6 +691,8 @@ static void printHelp() {
"The host to connect to TDengine. Default is localhost.");
printf("%s%s%s%s\n", indent, "-p", indent,
"The TCP/IP port number to use for the connection. Default is 0.");
printf("%s%s%s%s\n", indent, "-I", indent,
"The interface (taosc, rest, and stmt) taosdemo uses. Default is 'taosc'.");
printf("%s%s%s%s\n", indent, "-d", indent,
"Destination database. Default is 'test'.");
printf("%s%s%s%s\n", indent, "-a", indent,
......@@ -761,6 +781,23 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
exit(EXIT_FAILURE);
}
arguments->port = atoi(argv[++i]);
} else if (strcmp(argv[i], "-I") == 0) {
if (argc == i+1) {
printHelp();
errorPrint("%s", "\n\t-I need a valid string following!\n");
exit(EXIT_FAILURE);
}
++i;
if (0 == strcasecmp(argv[i], "taosc")) {
arguments->iface = TAOSC_IFACE;
} else if (0 == strcasecmp(argv[i], "rest")) {
arguments->iface = REST_IFACE;
} else if (0 == strcasecmp(argv[i], "stmt")) {
arguments->iface = STMT_IFACE;
} else {
errorPrint("%s", "\n\t-I need a valid string following!\n");
exit(EXIT_FAILURE);
}
} else if (strcmp(argv[i], "-u") == 0) {
if (argc == i+1) {
printHelp();
......@@ -897,6 +934,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(argv[i], "BIGINT")
&& strcasecmp(argv[i], "DOUBLE")
&& strcasecmp(argv[i], "BINARY")
&& strcasecmp(argv[i], "TIMESTAMP")
&& strcasecmp(argv[i], "NCHAR")) {
printHelp();
errorPrint("%s", "-b: Invalid data_type!\n");
......@@ -918,6 +956,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(token, "BIGINT")
&& strcasecmp(token, "DOUBLE")
&& strcasecmp(token, "BINARY")
&& strcasecmp(token, "TIMESTAMP")
&& strcasecmp(token, "NCHAR")) {
printHelp();
free(g_dupstr);
......@@ -1019,6 +1058,19 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
}
int columnCount;
for (columnCount = 0; columnCount < MAX_NUM_DATATYPE; columnCount ++) {
if (g_args.datatype[columnCount] == NULL) {
break;
}
}
if (0 == columnCount) {
perror("data type error!");
exit(-1);
}
g_args.num_of_CPR = columnCount;
if (((arguments->debug_print) && (arguments->metaFile == NULL))
|| arguments->verbose_print) {
printf("###################################################################\n");
......@@ -1028,7 +1080,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->port );
printf("# User: %s\n", arguments->user);
printf("# Password: %s\n", arguments->password);
printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false");
printf("# Use metric: %s\n",
arguments->use_metric ? "true" : "false");
if (*(arguments->datatype)) {
printf("# Specified data type: ");
for (int i = 0; i < MAX_NUM_DATATYPE; i++)
......@@ -1040,7 +1093,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
printf("# Insertion interval: %"PRIu64"\n",
arguments->insert_interval);
printf("# Number of records per req: %"PRIu64"\n",
printf("# Number of records per req: %u\n",
arguments->num_of_RPR);
printf("# Max SQL length: %"PRIu64"\n",
arguments->max_sql_len);
......@@ -1068,8 +1121,6 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
static bool getInfoFromJsonFile(char* file);
//static int generateOneRowDataForStb(SSuperTable* stbInfo);
//static int getDataIntoMemForStb(SSuperTable* stbInfo);
static void init_rand_data();
static void tmfclose(FILE *fp) {
if (NULL != fp) {
......@@ -1088,7 +1139,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
TAOS_RES *res = NULL;
int32_t code = -1;
for (i = 0; i < 5; i++) {
for (i = 0; i < 5 /* retry */; i++) {
if (NULL != res) {
taos_free_result(res);
res = NULL;
......@@ -1104,7 +1155,8 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
verbosePrint("%s() LN%d - command: %s\n", __func__, __LINE__, command);
if (code != 0) {
if (!quiet) {
errorPrint("Failed to execute %s, reason: %s\n", command, taos_errstr(res));
errorPrint("Failed to execute %s, reason: %s\n",
command, taos_errstr(res));
}
taos_free_result(res);
//taos_close(taos);
......@@ -1320,6 +1372,8 @@ static void init_rand_data() {
static int printfInsertMeta() {
SHOW_PARSE_RESULT_START();
printf("interface: \033[33m%s\033[0m\n",
(g_args.iface==TAOSC_IFACE)?"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt");
printf("host: \033[33m%s:%u\033[0m\n",
g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
......@@ -1331,7 +1385,7 @@ static int printfInsertMeta() {
g_Dbs.threadCountByCreateTbl);
printf("top insert interval: \033[33m%"PRIu64"\033[0m\n",
g_args.insert_interval);
printf("number of records per req: \033[33m%"PRIu64"\033[0m\n",
printf("number of records per req: \033[33m%u\033[0m\n",
g_args.num_of_RPR);
printf("max sql length: \033[33m%"PRIu64"\033[0m\n",
g_args.max_sql_len);
......@@ -1437,8 +1491,9 @@ static int printfInsertMeta() {
g_Dbs.db[i].superTbls[j].childTblPrefix);
printf(" dataSource: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].dataSource);
printf(" insertMode: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].insertMode);
printf(" iface: \033[33m%s\033[0m\n",
(g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc":
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt");
if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) {
printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n",
g_Dbs.db[i].superTbls[j].childTblLimit);
......@@ -1456,7 +1511,7 @@ static int printfInsertMeta() {
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
}
*/
printf(" interlaceRows: \033[33m%"PRIu64"\033[0m\n",
printf(" interlaceRows: \033[33m%u\033[0m\n",
g_Dbs.db[i].superTbls[j].interlaceRows);
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
......@@ -1534,7 +1589,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
fprintf(fp, "number of records per req: %"PRIu64"\n", g_args.num_of_RPR);
fprintf(fp, "number of records per req: %u\n", g_args.num_of_RPR);
fprintf(fp, "max sql length: %"PRIu64"\n", g_args.max_sql_len);
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
......@@ -1626,11 +1681,12 @@ static void printfInsertMetaToFile(FILE* fp) {
g_Dbs.db[i].superTbls[j].childTblPrefix);
fprintf(fp, " dataSource: %s\n",
g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n",
g_Dbs.db[i].superTbls[j].insertMode);
fprintf(fp, " iface: %s\n",
(g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc":
(g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt");
fprintf(fp, " insertRows: %"PRId64"\n",
g_Dbs.db[i].superTbls[j].insertRows);
fprintf(fp, " interlace rows: %"PRIu64"\n",
fprintf(fp, " interlace rows: %u\n",
g_Dbs.db[i].superTbls[j].interlaceRows);
if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) {
fprintf(fp, " stable insert interval: %"PRIu64"\n",
......@@ -1643,7 +1699,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " multiThreadWriteOneTbl: yes\n");
}
*/
fprintf(fp, " interlaceRows: %"PRIu64"\n",
fprintf(fp, " interlaceRows: %u\n",
g_Dbs.db[i].superTbls[j].interlaceRows);
fprintf(fp, " disorderRange: %d\n",
g_Dbs.db[i].superTbls[j].disorderRange);
......@@ -1719,7 +1775,7 @@ static void printfQueryMeta() {
if ((SUBSCRIBE_TEST == g_args.test_mode) || (QUERY_TEST == g_args.test_mode)) {
printf("specified table query info: \n");
printf("sqlCount: \033[33m%d\033[0m\n",
printf("sqlCount: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.specifiedQueryInfo.sqlCount);
if (g_queryInfo.specifiedQueryInfo.sqlCount > 0) {
printf("specified tbl query times:\n");
......@@ -1739,15 +1795,15 @@ static void printfQueryMeta() {
printf("keepProgress: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
printf(" sql[%d]: \033[33m%s\033[0m\n",
for (uint64_t i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
printf(" sql[%"PRIu64"]: \033[33m%s\033[0m\n",
i, g_queryInfo.specifiedQueryInfo.sql[i]);
}
printf("\n");
}
printf("super table query info:\n");
printf("sqlCount: \033[33m%d\033[0m\n",
printf("sqlCount: \033[33m%"PRIu64"\033[0m\n",
g_queryInfo.superQueryInfo.sqlCount);
if (g_queryInfo.superQueryInfo.sqlCount > 0) {
......@@ -2803,7 +2859,7 @@ static int createDatabasesAndStables() {
int validStbCount = 0;
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].sTblName);
ret = queryDbExec(taos, command, NO_INSERT_TYPE, true);
......@@ -2813,7 +2869,7 @@ static int createDatabasesAndStables() {
&g_Dbs.db[i].superTbls[j]);
if (0 != ret) {
errorPrint("create super table %d failed!\n\n", j);
errorPrint("create super table %"PRIu64" failed!\n\n", j);
continue;
}
}
......@@ -2841,7 +2897,7 @@ static void* createTable(void *sarg)
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int64_t lastPrintTime = taosGetTimestampMs();
uint64_t lastPrintTime = taosGetTimestampMs();
int buff_len;
buff_len = BUFFER_SIZE / 8;
......@@ -2915,7 +2971,7 @@ static void* createTable(void *sarg)
return NULL;
}
int64_t currentPrintTime = taosGetTimestampMs();
uint64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n",
pThreadInfo->threadID, pThreadInfo->start_table_from, i);
......@@ -2934,11 +2990,11 @@ static void* createTable(void *sarg)
}
static int startMultiThreadCreateChildTable(
char* cols, int threads, uint64_t startFrom, int64_t ntables,
char* cols, int threads, uint64_t tableFrom, int64_t ntables,
char* db_name, SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
threadInfo *infos = malloc(threads * sizeof(threadInfo));
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
printf("malloc failed\n");
......@@ -2978,10 +3034,10 @@ static int startMultiThreadCreateChildTable(
return -1;
}
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->use_metric = true;
pThreadInfo->cols = cols;
pThreadInfo->minDelay = UINT64_MAX;
......@@ -3011,7 +3067,7 @@ static void createChildTables() {
if (g_Dbs.use_metric) {
if (g_Dbs.db[i].superTblCount > 0) {
// with super table
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable)
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
continue;
......@@ -3019,15 +3075,15 @@ static void createChildTables() {
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
uint64_t startFrom = 0;
uint64_t tableFrom = 0;
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n",
__func__, __LINE__, g_totalChildTables, startFrom);
__func__, __LINE__, g_totalChildTables, tableFrom);
startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
g_Dbs.threadCountByCreateTbl,
startFrom,
tableFrom,
g_Dbs.db[i].superTbls[j].childTblCount,
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
}
......@@ -3036,11 +3092,14 @@ static void createChildTables() {
// normal table
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
for (int j = 0; j < g_args.num_of_CPR; j++) {
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
if (g_args.datatype[j]
&& ((strncasecmp(g_args.datatype[j],
"BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.datatype[j],
"NCHAR", strlen("NCHAR")) == 0)) {
"NCHAR", strlen("NCHAR")) == 0))) {
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary);
", COL%d %s(%d)", j, g_args.datatype[j],
g_args.len_of_binary);
} else {
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
", COL%d %s", j, g_args.datatype[j]);
......@@ -3132,10 +3191,12 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
return 0;
}
#if 0
int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) {
// TODO
return 0;
}
#endif
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
......@@ -3520,9 +3581,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
// rows per table need be less than insert batch
if (g_args.interlace_rows > g_args.num_of_RPR) {
printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n",
printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n",
g_args.interlace_rows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n",
g_args.num_of_RPR);
prompt();
g_args.interlace_rows = g_args.num_of_RPR;
......@@ -3837,15 +3898,24 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest
if (insertMode && insertMode->type == cJSON_String
&& insertMode->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode,
insertMode->valuestring, MAX_DB_NAME_SIZE);
} else if (!insertMode) {
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE);
cJSON *stbIface = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt
if (stbIface && stbIface->type == cJSON_String
&& stbIface->valuestring != NULL) {
if (0 == strcasecmp(stbIface->valuestring, "taosc")) {
g_Dbs.db[i].superTbls[j].iface= TAOSC_IFACE;
} else if (0 == strcasecmp(stbIface->valuestring, "rest")) {
g_Dbs.db[i].superTbls[j].iface= REST_IFACE;
} else if (0 == strcasecmp(stbIface->valuestring, "stmt")) {
g_Dbs.db[i].superTbls[j].iface= STMT_IFACE;
} else {
errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n",
__func__, __LINE__, stbIface->valuestring);
goto PARSE_OVER;
}
} else if (!stbIface) {
g_Dbs.db[i].superTbls[j].iface = TAOSC_IFACE;
} else {
printf("ERROR: failed to read json, insert_mode not found\n");
errorPrint("%s", "failed to read json, insert_mode not found\n");
goto PARSE_OVER;
}
......@@ -3936,9 +4006,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
cJSON* maxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len");
if (maxSqlLen && maxSqlLen->type == cJSON_Number) {
int32_t len = maxSqlLen->valueint;
cJSON* stbMaxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len");
if (stbMaxSqlLen && stbMaxSqlLen->type == cJSON_Number) {
int32_t len = stbMaxSqlLen->valueint;
if (len > TSDB_MAX_ALLOWED_SQL_LEN) {
len = TSDB_MAX_ALLOWED_SQL_LEN;
} else if (len < 5) {
......@@ -3948,7 +4018,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!maxSqlLen) {
g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len;
} else {
errorPrint("%s() LN%d, failed to read json, maxSqlLen input mistake\n",
errorPrint("%s() LN%d, failed to read json, stbMaxSqlLen input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
......@@ -3970,24 +4040,25 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
*/
cJSON* interlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
if (interlaceRows && interlaceRows->type == cJSON_Number) {
if (interlaceRows->valueint < 0) {
cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) {
if (stbInterlaceRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, interlace rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint;
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
// rows per table need be less than insert batch
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) {
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n",
i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > num_of_records_per_req %u\n\n",
i, j, g_Dbs.db[i].superTbls[j].interlaceRows,
g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n",
g_args.num_of_RPR);
prompt();
g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR;
}
} else if (!interlaceRows) {
} else if (!stbInterlaceRows) {
g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else {
errorPrint(
......@@ -4199,7 +4270,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (concurrent && concurrent->type == cJSON_Number) {
if (concurrent->valueint <= 0) {
errorPrint(
"%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
"%s() LN%d, query sqlCount %"PRIu64" or concurrent %d is not correct.\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.specifiedQueryInfo.concurrent);
......@@ -4613,7 +4684,7 @@ static void prepareSampleData() {
static void postFreeResource() {
tmfclose(g_fpOfInsertResult);
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) {
free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL;
......@@ -4661,16 +4732,22 @@ static int getRowDataFromSample(
return dataLen;
}
static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stbInfo) {
static int64_t generateStbRowData(
SSuperTable* stbInfo,
char* recBuf, int64_t timestamp)
{
int64_t dataLen = 0;
char *pstr = recBuf;
int64_t maxLen = MAX_DATA_SIZE;
dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "(%" PRId64 ",", timestamp);
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"(%" PRId64 ",", timestamp);
for (int i = 0; i < stbInfo->columnCount; i++) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType, "BINARY", strlen("BINARY")))
|| (0 == strncasecmp(stbInfo->columns[i].dataType, "NCHAR", strlen("NCHAR")))) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType,
"BINARY", strlen("BINARY")))
|| (0 == strncasecmp(stbInfo->columns[i].dataType,
"NCHAR", strlen("NCHAR")))) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "binary or nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
......@@ -4686,23 +4763,23 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb
dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\',", buf);
tmfree(buf);
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"INT", 3)) {
"INT", strlen("INT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%d,", rand_int());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"BIGINT", 6)) {
"BIGINT", strlen("BIGINT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%"PRId64",", rand_bigint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"FLOAT", 5)) {
"FLOAT", strlen("FLOAT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%f,", rand_float());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"DOUBLE", 6)) {
"DOUBLE", strlen("DOUBLE"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%f,", rand_double());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"SMALLINT", 8)) {
"SMALLINT", strlen("SMALLINT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%d,", rand_smallint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
......@@ -4732,46 +4809,38 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb
}
static int64_t generateData(char *recBuf, char **data_type,
int num_of_cols, int64_t timestamp, int lenOfBinary) {
int64_t timestamp, int lenOfBinary) {
memset(recBuf, 0, MAX_DATA_SIZE);
char *pstr = recBuf;
pstr += sprintf(pstr, "(%" PRId64, timestamp);
int c = 0;
for (; c < MAX_NUM_DATATYPE; c++) {
if (data_type[c] == NULL) {
break;
}
}
int columnCount = g_args.num_of_CPR;
if (0 == c) {
perror("data type error!");
exit(-1);
}
for (int i = 0; i < c; i++) {
if (strcasecmp(data_type[i % c], "TINYINT") == 0) {
for (int i = 0; i < columnCount; i++) {
if (strcasecmp(data_type[i % columnCount], "TINYINT") == 0) {
pstr += sprintf(pstr, ",%d", rand_tinyint() );
} else if (strcasecmp(data_type[i % c], "SMALLINT") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "SMALLINT") == 0) {
pstr += sprintf(pstr, ",%d", rand_smallint());
} else if (strcasecmp(data_type[i % c], "INT") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "INT") == 0) {
pstr += sprintf(pstr, ",%d", rand_int());
} else if (strcasecmp(data_type[i % c], "BIGINT") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "BIGINT") == 0) {
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
} else if (strcasecmp(data_type[i % c], "FLOAT") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "TIMESTAMP") == 0) {
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
} else if (strcasecmp(data_type[i % columnCount], "FLOAT") == 0) {
pstr += sprintf(pstr, ",%10.4f", rand_float());
} else if (strcasecmp(data_type[i % c], "DOUBLE") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "DOUBLE") == 0) {
double t = rand_double();
pstr += sprintf(pstr, ",%20.8f", t);
} else if (strcasecmp(data_type[i % c], "BOOL") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "BOOL") == 0) {
bool b = rand_bool() & 1;
pstr += sprintf(pstr, ",%s", b ? "true" : "false");
} else if (strcasecmp(data_type[i % c], "BINARY") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "BINARY") == 0) {
char *s = malloc(lenOfBinary);
rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s);
free(s);
} else if (strcasecmp(data_type[i % c], "NCHAR") == 0) {
} else if (strcasecmp(data_type[i % columnCount], "NCHAR") == 0) {
char *s = malloc(lenOfBinary);
rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s);
......@@ -4818,32 +4887,57 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
return 0;
}
static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
{
int affectedRows;
int32_t affectedRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, buffer);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
} else if (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest"))) {
__func__, __LINE__, pThreadInfo->buffer);
uint16_t iface;
if (superTblInfo)
iface = superTblInfo->iface;
else
iface = g_args.iface;
debugPrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__,
(g_args.iface==TAOSC_IFACE)?
"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt");
switch(iface) {
case TAOSC_IFACE:
affectedRows = queryDbExec(
pThreadInfo->taos,
pThreadInfo->buffer, INSERT_TYPE, false);
break;
case REST_IFACE:
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
buffer, NULL /* not set result file */)) {
pThreadInfo->buffer, NULL /* not set result file */)) {
affectedRows = -1;
printf("========restful return fail, threadID[%d]\n",
pThreadInfo->threadID);
} else {
affectedRows = k;
}
} else {
errorPrint("%s() LN%d: unknown insert mode: %s\n",
__func__, __LINE__, superTblInfo->insertMode);
affectedRows = 0;
break;
case STMT_IFACE:
debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt);
if (0 != taos_stmt_execute(pThreadInfo->stmt)) {
errorPrint("%s() LN%d, failied to execute insert statement\n",
__func__, __LINE__);
exit(-1);
}
} else {
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
affectedRows = k;
break;
default:
errorPrint("%s() LN%d: unknown insert mode: %d\n",
__func__, __LINE__, superTblInfo->iface);
affectedRows = 0;
}
return affectedRows;
......@@ -4874,49 +4968,104 @@ static void getTableName(char *pTblName,
}
}
static int64_t generateDataTail(
SSuperTable* superTblInfo,
uint64_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows,
uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) {
uint64_t len = 0;
uint32_t ncols_per_record = 1; // count first col ts
static int32_t generateDataTailWithoutStb(
uint32_t batch, char* buffer,
int64_t remainderBufLen, int64_t insertRows,
uint64_t recordFrom, int64_t startTime,
/* int64_t *pSamplePos, */int64_t *dataLen) {
uint64_t len = 0;
char *pstr = buffer;
if (superTblInfo == NULL) {
uint32_t datatypeSeq = 0;
while(g_args.datatype[datatypeSeq]) {
datatypeSeq ++;
ncols_per_record ++;
verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch);
int32_t k = 0;
for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
int64_t retLen = 0;
char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary;
retLen = generateData(data, data_type,
startTime + getTSRandTail(
(int64_t) DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio,
g_args.disorderRange),
lenOfBinary);
if (len > remainderBufLen)
break;
pstr += sprintf(pstr, "%s", data);
k++;
len += retLen;
remainderBufLen -= retLen;
verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer);
recordFrom ++;
if (recordFrom >= insertRows) {
break;
}
}
verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch);
*dataLen = len;
return k;
}
static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
int disorderRatio, int disorderRange)
{
int64_t randTail = timeStampStep * seq;
if (disorderRatio > 0) {
int rand_num = taosRandom() % 100;
if(rand_num < disorderRatio) {
randTail = (randTail +
(taosRandom() % disorderRange + 1)) * (-1);
debugPrint("rand data generated, back %"PRId64"\n", randTail);
}
}
return randTail;
}
static int32_t generateStbDataTail(
SSuperTable* superTblInfo,
uint32_t batch, char* buffer,
int64_t remainderBufLen, int64_t insertRows,
uint64_t recordFrom, int64_t startTime,
int64_t *pSamplePos, int64_t *dataLen) {
uint64_t len = 0;
char *pstr = buffer;
bool tsRand;
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand")))) {
if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
tsRand = true;
} else {
tsRand = false;
}
verbosePrint("%s() LN%d batch=%u\n", __func__, __LINE__, batch);
uint64_t k = 0;
int32_t k = 0;
for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
int64_t retLen = 0;
if (superTblInfo) {
if (tsRand) {
retLen = generateRowData(
data,
retLen = generateStbRowData(superTblInfo, data,
startTime + getTSRandTail(
superTblInfo->timeStampStep, k,
superTblInfo->disorderRatio,
superTblInfo->disorderRange),
superTblInfo);
superTblInfo->disorderRange)
);
} else {
retLen = getRowDataFromSample(
data,
......@@ -4925,6 +5074,7 @@ static int64_t generateDataTail(
superTblInfo,
pSamplePos);
}
if (retLen > remainderBufLen) {
break;
}
......@@ -4933,31 +5083,13 @@ static int64_t generateDataTail(
k++;
len += retLen;
remainderBufLen -= retLen;
} else {
char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary;
retLen = generateData(data, data_type,
ncols_per_record,
startTime + getTSRandTail(
DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio,
g_args.disorderRange),
lenOfBinary);
if (len > remainderBufLen)
break;
pstr += sprintf(pstr, "%s", data);
k++;
len += retLen;
remainderBufLen -= retLen;
}
verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n",
verbosePrint("%s() LN%d len=%"PRIu64" k=%u \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer);
startFrom ++;
recordFrom ++;
if (startFrom >= insertRows) {
if (recordFrom >= insertRows) {
break;
}
}
......@@ -4966,16 +5098,40 @@ static int64_t generateDataTail(
return k;
}
static int generateSQLHead(char *tableName, int32_t tableSeq,
threadInfo* pThreadInfo, SSuperTable* superTblInfo,
static int generateSQLHeadWithoutStb(char *tableName,
char *dbName,
char *buffer, int remainderBufLen)
{
int len;
char headBuf[HEAD_BUFF_LEN];
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
dbName,
tableName);
if (len > remainderBufLen)
return -1;
tstrncpy(buffer, headBuf, len + 1);
return len;
}
static int generateStbSQLHead(
SSuperTable* superTblInfo,
char *tableName, int32_t tableSeq,
char *dbName,
char *buffer, int remainderBufLen)
{
int len;
#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into ..
char headBuf[HEAD_BUFF_LEN];
if (superTblInfo) {
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
......@@ -4995,9 +5151,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
headBuf,
HEAD_BUFF_LEN,
"%s.%s using %s.%s tags %s values",
pThreadInfo->db_name,
dbName,
tableName,
pThreadInfo->db_name,
dbName,
superTblInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
......@@ -5006,22 +5162,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
pThreadInfo->db_name,
dbName,
tableName);
} else {
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
pThreadInfo->db_name,
tableName);
}
} else {
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
"%s.%s values",
pThreadInfo->db_name,
dbName,
tableName);
}
......@@ -5033,8 +5181,11 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
return len;
}
static int64_t generateInterlaceDataBuffer(
char *tableName, uint64_t batchPerTbl, uint64_t i, uint64_t batchPerTblTimes,
static int32_t generateStbInterlaceData(
SSuperTable *superTblInfo,
char *tableName, uint32_t batchPerTbl,
uint64_t i,
uint32_t batchPerTblTimes,
uint64_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
......@@ -5043,10 +5194,11 @@ static int64_t generateInterlaceDataBuffer(
{
assert(buffer);
char *pstr = buffer;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo,
superTblInfo, pstr, *pRemainderBufLen);
int headLen = generateStbSQLHead(
superTblInfo,
tableName, tableSeq, pThreadInfo->db_name,
pstr, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
......@@ -5060,19 +5212,15 @@ static int64_t generateInterlaceDataBuffer(
int64_t dataLen = 0;
verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%"PRIu64" batchPerTbl = %"PRIu64"\n",
verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%u batchPerTbl = %u\n",
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision);
}
} else {
startTime = 1500000000000;
}
int64_t k = generateDataTail(
int32_t k = generateStbDataTail(
superTblInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
......@@ -5082,7 +5230,7 @@ static int64_t generateInterlaceDataBuffer(
pstr += dataLen;
*pRemainderBufLen -= dataLen;
} else {
debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %"PRIu64"\n",
debugPrint("%s() LN%d, generated data tail: %u, not equal batch per table: %u\n",
__func__, __LINE__, k, batchPerTbl);
pstr -= headLen;
pstr[0] = '\0';
......@@ -5092,50 +5240,361 @@ static int64_t generateInterlaceDataBuffer(
return k;
}
static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
int disorderRatio, int disorderRange)
static int64_t generateInterlaceDataWithoutStb(
char *tableName, uint32_t batch,
uint64_t tableSeq,
char *dbName, char *buffer,
int64_t insertRows,
int64_t startTime,
uint64_t *pRemainderBufLen)
{
int64_t randTail = timeStampStep * seq;
if (disorderRatio > 0) {
int rand_num = taosRandom() % 100;
if(rand_num < disorderRatio) {
randTail = (randTail +
(taosRandom() % disorderRange + 1)) * (-1);
debugPrint("rand data generated, back %"PRId64"\n", randTail);
assert(buffer);
char *pstr = buffer;
int headLen = generateSQLHeadWithoutStb(
tableName, dbName,
pstr, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
}
pstr += headLen;
*pRemainderBufLen -= headLen;
int64_t dataLen = 0;
int32_t k = generateDataTailWithoutStb(
batch, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
&dataLen);
if (k == batch) {
pstr += dataLen;
*pRemainderBufLen -= dataLen;
} else {
debugPrint("%s() LN%d, generated data tail: %d, not equal batch per table: %u\n",
__func__, __LINE__, k, batch);
pstr -= headLen;
pstr[0] = '\0';
k = 0;
}
return k;
}
#if STMT_IFACE_ENABLED == 1
static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
char *dataType, int32_t dataLen, char **ptr)
{
if (0 == strncasecmp(dataType,
"BINARY", strlen("BINARY"))) {
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "binary length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_binary = (char *)*ptr;
rand_string(bind_binary, dataLen);
return randTail;
bind->buffer_type = TSDB_DATA_TYPE_BINARY;
bind->buffer_length = dataLen;
bind->buffer = bind_binary;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"NCHAR", strlen("NCHAR"))) {
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_nchar = (char *)*ptr;
rand_string(bind_nchar, dataLen);
bind->buffer_type = TSDB_DATA_TYPE_NCHAR;
bind->buffer_length = strlen(bind_nchar);
bind->buffer = bind_nchar;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"INT", strlen("INT"))) {
int32_t *bind_int = (int32_t *)*ptr;
*bind_int = rand_int();
bind->buffer_type = TSDB_DATA_TYPE_INT;
bind->buffer_length = sizeof(int32_t);
bind->buffer = bind_int;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"BIGINT", strlen("BIGINT"))) {
int64_t *bind_bigint = (int64_t *)*ptr;
*bind_bigint = rand_bigint();
bind->buffer_type = TSDB_DATA_TYPE_BIGINT;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_bigint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"FLOAT", strlen("FLOAT"))) {
float *bind_float = (float *) *ptr;
*bind_float = rand_float();
bind->buffer_type = TSDB_DATA_TYPE_FLOAT;
bind->buffer_length = sizeof(float);
bind->buffer = bind_float;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"DOUBLE", strlen("DOUBLE"))) {
double *bind_double = (double *)*ptr;
*bind_double = rand_double();
bind->buffer_type = TSDB_DATA_TYPE_DOUBLE;
bind->buffer_length = sizeof(double);
bind->buffer = bind_double;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"SMALLINT", strlen("SMALLINT"))) {
int16_t *bind_smallint = (int16_t *)*ptr;
*bind_smallint = rand_smallint();
bind->buffer_type = TSDB_DATA_TYPE_SMALLINT;
bind->buffer_length = sizeof(int16_t);
bind->buffer = bind_smallint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"TINYINT", strlen("TINYINT"))) {
int8_t *bind_tinyint = (int8_t *)*ptr;
*bind_tinyint = rand_tinyint();
bind->buffer_type = TSDB_DATA_TYPE_TINYINT;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_tinyint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"BOOL", strlen("BOOL"))) {
int8_t *bind_bool = (int8_t *)*ptr;
*bind_bool = rand_bool();
bind->buffer_type = TSDB_DATA_TYPE_BOOL;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_bool;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
int64_t *bind_ts2 = (int64_t *) *ptr;
*bind_ts2 = rand_bigint();
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts2;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else {
errorPrint( "No support data type: %s\n",
dataType);
return -1;
}
return 0;
}
static int64_t generateProgressiveDataBuffer(
static int32_t prepareStmtWithoutStb(
TAOS_STMT *stmt,
char *tableName,
int64_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
uint32_t batch,
int64_t insertRows,
uint64_t startFrom, int64_t startTime, int64_t *pSamplePos,
int64_t *pRemainderBufLen)
int64_t recordFrom,
int64_t startTime)
{
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int ret = taos_stmt_set_tbname(stmt, tableName);
if (ret != 0) {
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
tableName, ret, taos_errstr(NULL));
return ret;
}
int ncols_per_record = 1; // count first col ts
char **data_type = g_args.datatype;
if (superTblInfo == NULL) {
int datatypeSeq = 0;
while(g_args.datatype[datatypeSeq]) {
datatypeSeq ++;
ncols_per_record ++;
char *bindArray = malloc(sizeof(TAOS_BIND) * (g_args.num_of_CPR + 1));
if (bindArray == NULL) {
errorPrint("Failed to allocate %d bind params\n",
(g_args.num_of_CPR + 1));
return -1;
}
int32_t k = 0;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0);
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
*bind_ts = startTime + getTSRandTail(
(int64_t)DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio,
g_args.disorderRange);
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
for (int i = 0; i < g_args.num_of_CPR; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
if ( -1 == prepareStmtBindArrayByType(
bind,
data_type[i],
g_args.len_of_binary,
&ptr)) {
return -1;
}
}
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
// if msg > 3MB, break
taos_stmt_add_batch(stmt);
k++;
recordFrom ++;
if (recordFrom >= insertRows) {
break;
}
}
return k;
}
static int32_t prepareStbStmt(SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName, uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime, char *buffer)
{
int ret = taos_stmt_set_tbname(stmt, tableName);
if (ret != 0) {
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
tableName, ret, taos_errstr(NULL));
return ret;
}
char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("Failed to allocate %d bind params\n",
(stbInfo->columnCount + 1));
return -1;
}
bool tsRand;
if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
tsRand = true;
} else {
tsRand = false;
}
uint32_t k;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0);
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (tsRand) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, k,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * k;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
for (int i = 0; i < stbInfo->columnCount; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i].dataType,
stbInfo->columns[i].dataLen,
&ptr)) {
return -1;
}
}
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
// if msg > 3MB, break
taos_stmt_add_batch(stmt);
k++;
recordFrom ++;
if (recordFrom >= insertRows) {
break;
}
}
return k;
}
#endif
static int32_t generateStbProgressiveData(
SSuperTable *superTblInfo,
char *tableName,
int64_t tableSeq,
char *dbName, char *buffer,
int64_t insertRows,
uint64_t recordFrom, int64_t startTime, int64_t *pSamplePos,
int64_t *pRemainderBufLen)
{
assert(buffer != NULL);
char *pstr = buffer;
int64_t k = 0;
memset(buffer, 0, *pRemainderBufLen);
int64_t headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo,
int64_t headLen = generateStbSQLHead(
superTblInfo,
tableName, tableSeq, dbName,
buffer, *pRemainderBufLen);
if (headLen <= 0) {
......@@ -5145,12 +5604,43 @@ static int64_t generateProgressiveDataBuffer(
*pRemainderBufLen -= headLen;
int64_t dataLen;
k = generateDataTail(superTblInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom,
return generateStbDataTail(superTblInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen,
insertRows, recordFrom,
startTime,
pSamplePos, &dataLen);
}
return k;
static int32_t generateProgressiveDataWithoutStb(
char *tableName,
/* int64_t tableSeq, */
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
uint64_t recordFrom, int64_t startTime, /*int64_t *pSamplePos, */
int64_t *pRemainderBufLen)
{
assert(buffer != NULL);
char *pstr = buffer;
memset(buffer, 0, *pRemainderBufLen);
int64_t headLen = generateSQLHeadWithoutStb(
tableName, pThreadInfo->db_name,
buffer, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
}
pstr += headLen;
*pRemainderBufLen -= headLen;
int64_t dataLen;
return generateDataTailWithoutStb(
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, recordFrom,
startTime,
/*pSamplePos, */&dataLen);
}
static void printStatPerThread(threadInfo *pThreadInfo)
......@@ -5162,12 +5652,16 @@ static void printStatPerThread(threadInfo *pThreadInfo)
(double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0)));
}
// sync write interlace data
static void* syncWriteInterlace(threadInfo *pThreadInfo) {
debugPrint("[%d] %s() LN%d: ### interlace write\n",
pThreadInfo->threadID, __func__, __LINE__);
int64_t insertRows;
uint64_t interlaceRows;
uint32_t interlaceRows;
uint64_t maxSqlLen;
int64_t nTimeStampStep;
uint64_t insert_interval;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
......@@ -5180,45 +5674,48 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
} else {
interlaceRows = superTblInfo->interlaceRows;
}
maxSqlLen = superTblInfo->maxSqlLen;
nTimeStampStep = superTblInfo->timeStampStep;
insert_interval = superTblInfo->insertInterval;
} else {
insertRows = g_args.num_of_DPT;
interlaceRows = g_args.interlace_rows;
maxSqlLen = g_args.max_sql_len;
nTimeStampStep = DEFAULT_TIMESTAMP_STEP;
insert_interval = g_args.insert_interval;
}
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows);
if (interlaceRows > insertRows)
interlaceRows = insertRows;
if (interlaceRows > g_args.num_of_RPR)
interlaceRows = g_args.num_of_RPR;
int insertMode;
uint32_t batchPerTbl = interlaceRows;
uint32_t batchPerTblTimes;
if (interlaceRows > 0) {
insertMode = INTERLACE_INSERT_MODE;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes =
g_args.num_of_RPR / interlaceRows;
} else {
insertMode = PROGRESSIVE_INSERT_MODE;
batchPerTblTimes = 1;
}
// TODO: prompt tbl count multple interlace rows and batch
//
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
char* buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) {
pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) {
errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__, maxSqlLen, strerror(errno));
return NULL;
}
char tableName[TSDB_TABLE_NAME_LEN];
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
int64_t nTimeStampStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
uint64_t insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = UINT64_MAX;
......@@ -5227,69 +5724,98 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint64_t endTs;
uint64_t tableSeq = pThreadInfo->start_table_from;
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from,
pThreadInfo->ntables, insertRows);
int64_t startTime = pThreadInfo->start_time;
uint64_t batchPerTbl = interlaceRows;
uint64_t batchPerTblTimes;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes =
g_args.num_of_RPR / interlaceRows;
} else {
batchPerTblTimes = 1;
}
uint64_t generatedRecPerTbl = 0;
bool flagSleep = true;
uint64_t sleepTimeTotal = 0;
char *strInsertInto = "insert into ";
int nInsertBufLen = strlen(strInsertInto);
while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) {
if ((flagSleep) && (insert_interval)) {
st = taosGetTimestampMs();
flagSleep = false;
}
// generate data
memset(buffer, 0, maxSqlLen);
memset(pThreadInfo->buffer, 0, maxSqlLen);
uint64_t remainderBufLen = maxSqlLen;
char *pstr = buffer;
char *pstr = pThreadInfo->buffer;
int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto);
int len = snprintf(pstr,
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
pstr += len;
remainderBufLen -= len;
uint64_t recOfBatch = 0;
uint32_t recOfBatch = 0;
for (uint32_t i = 0; i < batchPerTblTimes; i ++) {
char tableName[TSDB_TABLE_NAME_LEN];
for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
getTableName(tableName, pThreadInfo, tableSeq);
if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n",
pThreadInfo->threadID, __func__, __LINE__);
free(buffer);
free(pThreadInfo->buffer);
return NULL;
}
uint64_t oldRemainderLen = remainderBufLen;
int64_t generated = generateInterlaceDataBuffer(
tableName, batchPerTbl, i, batchPerTblTimes,
int32_t generated;
if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStbStmt(superTblInfo,
pThreadInfo->stmt,
tableName,
batchPerTbl,
insertRows, i,
startTime,
pThreadInfo->buffer);
#else
generated = -1;
#endif
} else {
generated = generateStbInterlaceData(
superTblInfo,
tableName, batchPerTbl, i,
batchPerTblTimes,
tableSeq,
pThreadInfo, pstr,
insertRows,
startTime,
&remainderBufLen);
}
} else {
if (g_args.iface == STMT_IFACE) {
debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__,
tableName, batchPerTbl, startTime);
#if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb(
pThreadInfo->stmt, tableName,
batchPerTbl,
insertRows, i,
startTime);
#else
generated = -1;
#endif
} else {
generated = generateInterlaceDataWithoutStb(
tableName, batchPerTbl,
tableSeq,
pThreadInfo->db_name, pstr,
insertRows,
startTime,
&remainderBufLen);
}
}
debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
debugPrint("[%d] %s() LN%d, generated records is %d\n",
pThreadInfo->threadID, __func__, __LINE__, generated);
if (generated < 0) {
errorPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
errorPrint("[%d] %s() LN%d, generated records is %d\n",
pThreadInfo->threadID, __func__, __LINE__, generated);
goto free_of_interlace;
} else if (generated == 0) {
......@@ -5298,14 +5824,14 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
tableSeq ++;
recOfBatch += batchPerTbl;
pstr += (oldRemainderLen - remainderBufLen);
// startTime += batchPerTbl * superTblInfo->timeStampStep;
pThreadInfo->totalInsertRows += batchPerTbl;
verbosePrint("[%d] %s() LN%d batchPerTbl=%"PRId64" recOfBatch=%"PRId64"\n",
verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n",
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch);
if (insertMode == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table
tableSeq = pThreadInfo->start_table_from;
......@@ -5318,14 +5844,13 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (generatedRecPerTbl >= insertRows)
break;
int remainRows = insertRows - generatedRecPerTbl;
int64_t remainRows = insertRows - generatedRecPerTbl;
if ((remainRows > 0) && (batchPerTbl > remainRows))
batchPerTbl = remainRows;
if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR)
break;
}
}
verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__,
......@@ -5335,22 +5860,22 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
break;
}
verbosePrint("[%d] %s() LN%d recOfBatch=%"PRIu64" totalInsertRows=%"PRIu64"\n",
verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
pThreadInfo->totalInsertRows);
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
pThreadInfo->threadID, __func__, __LINE__, buffer);
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer);
startTs = taosGetTimestampMs();
if (recOfBatch == 0) {
errorPrint("[%d] %s() LN%d try inserting records of batch is %"PRIu64"\n",
errorPrint("[%d] %s() LN%d try inserting records of batch is %d\n",
pThreadInfo->threadID, __func__, __LINE__,
recOfBatch);
errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n");
goto free_of_interlace;
}
int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
endTs = taosGetTimestampMs();
uint64_t delay = endTs - startTs;
......@@ -5366,9 +5891,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->totalDelay += delay;
if (recOfBatch != affectedRows) {
errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n",
errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n",
pThreadInfo->threadID, __func__, __LINE__,
recOfBatch, affectedRows, buffer);
recOfBatch, affectedRows, pThreadInfo->buffer);
goto free_of_interlace;
}
......@@ -5387,8 +5912,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
et = taosGetTimestampMs();
if (insert_interval > (et - st) ) {
int sleepTime = insert_interval - (et -st);
performancePrint("%s() LN%d sleep: %d ms for insert interval\n",
uint64_t sleepTime = insert_interval - (et -st);
performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n",
__func__, __LINE__, sleepTime);
taosMsleep(sleepTime); // ms
sleepTimeTotal += insert_interval;
......@@ -5397,27 +5922,26 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
}
free_of_interlace:
tmfree(buffer);
tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo);
return NULL;
}
// sync insertion
/*
1 thread: 100 tables * 2000 rows/s
1 thread: 10 tables * 20000 rows/s
6 thread: 300 tables * 2000 rows/s
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/
// sync insertion progressive data
static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
int64_t insertRows =
(superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
__func__, __LINE__, insertRows);
char* buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) {
pThreadInfo->buffer = calloc(maxSqlLen, 1);
if (NULL == pThreadInfo->buffer) {
errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n",
maxSqlLen,
strerror(errno));
......@@ -5428,35 +5952,17 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
uint64_t startTs = taosGetTimestampMs();
uint64_t endTs;
int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
/* int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0;
uint64_t et = 0xffffffff;
*/
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
pThreadInfo->samplePos = 0;
for (uint64_t tableSeq =
pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to;
for (uint64_t tableSeq = pThreadInfo->start_table_from;
tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) {
int64_t start_time = pThreadInfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
for (uint64_t i = 0; i < insertRows;) {
/*
if (insert_interval) {
st = taosGetTimestampMs();
}
*/
char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq);
verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n",
......@@ -5464,19 +5970,57 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->threadID, tableSeq, tableName);
int64_t remainderBufLen = maxSqlLen;
char *pstr = buffer;
int nInsertBufLen = strlen("insert into ");
char *pstr = pThreadInfo->buffer;
int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into ");
int len = snprintf(pstr,
strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO);
pstr += len;
remainderBufLen -= len;
int64_t generated = generateProgressiveDataBuffer(
tableName, tableSeq, pThreadInfo, pstr, insertRows,
i, start_time,
int32_t generated;
if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStbStmt(
superTblInfo,
pThreadInfo->stmt,
tableName,
g_args.num_of_RPR,
insertRows, i, start_time, pstr);
#else
generated = -1;
#endif
} else {
generated = generateStbProgressiveData(
superTblInfo,
tableName, tableSeq, pThreadInfo->db_name, pstr,
insertRows, i, start_time,
&(pThreadInfo->samplePos),
&remainderBufLen);
}
} else {
if (g_args.iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb(
pThreadInfo->stmt,
tableName,
g_args.num_of_RPR,
insertRows, i,
start_time);
#else
generated = -1;
#endif
} else {
generated = generateProgressiveDataWithoutStb(
tableName,
/* tableSeq, */
pThreadInfo, pstr, insertRows,
i, start_time,
/* &(pThreadInfo->samplePos), */
&remainderBufLen);
}
}
if (generated > 0)
i += generated;
else
......@@ -5487,13 +6031,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
startTs = taosGetTimestampMs();
int64_t affectedRows = execInsert(pThreadInfo, buffer, generated);
int32_t affectedRows = execInsert(pThreadInfo, generated);
endTs = taosGetTimestampMs();
uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n",
__func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
verbosePrint("[%d] %s() LN%d affectedRows=%d\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
......@@ -5503,7 +6047,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->totalDelay += delay;
if (affectedRows < 0) {
errorPrint("%s() LN%d, affected rows: %"PRId64"\n",
errorPrint("%s() LN%d, affected rows: %d\n",
__func__, __LINE__, affectedRows);
goto free_of_progressive;
}
......@@ -5521,32 +6065,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (i >= insertRows)
break;
/*
if (insert_interval) {
et = taosGetTimestampMs();
if (insert_interval > ((et - st)) ) {
int sleep_time = insert_interval - (et -st);
performancePrint("%s() LN%d sleep: %d ms for insert interval\n",
__func__, __LINE__, sleep_time);
taosMsleep(sleep_time); // ms
}
}
*/
} // num_of_DPT
if (g_args.verbose_print) {
if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo &&
if ((g_args.verbose_print) &&
(tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) &&
(0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) {
verbosePrint("%s() LN%d samplePos=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->samplePos);
}
}
} // tableSeq
free_of_progressive:
tmfree(buffer);
tmfree(pThreadInfo->buffer);
printStatPerThread(pThreadInfo);
return NULL;
}
......@@ -5556,7 +6087,7 @@ static void* syncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int interlaceRows;
uint32_t interlaceRows;
if (superTblInfo) {
if ((superTblInfo->interlaceRows == 0)
......@@ -5576,7 +6107,6 @@ static void* syncWrite(void *sarg) {
// progressive mode
return syncWriteProgressive(pThreadInfo);
}
}
static void callBack(void *param, TAOS_RES *res, int code) {
......@@ -5616,9 +6146,10 @@ static void callBack(void *param, TAOS_RES *res, int code) {
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) {
int64_t d = pThreadInfo->lastTs
- (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
generateRowData(data, d, pThreadInfo->superTblInfo);
generateStbRowData(pThreadInfo->superTblInfo, data, d);
} else {
generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo);
generateStbRowData(pThreadInfo->superTblInfo,
data, pThreadInfo->lastTs += 1000);
}
pstr += sprintf(pstr, "%s", data);
pThreadInfo->counter++;
......@@ -5686,24 +6217,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
static void startMultiThreadInsertData(int threads, char* db_name,
char* precision,SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = malloc(threads * sizeof(threadInfo));
assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
//TAOS* taos;
//if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
// taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
// if (NULL == taos) {
// printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
// exit(-1);
// }
//}
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) {
if (0 == strncasecmp(precision, "ms", 2)) {
......@@ -5755,17 +6268,17 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
}
TAOS* taos = taos_connect(
TAOS* taos0 = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) {
if (NULL == taos0) {
errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
__func__, __LINE__, taos_errstr(NULL));
exit(-1);
}
int64_t ntables = 0;
uint64_t startFrom;
uint64_t tableFrom;
if (superTblInfo) {
int64_t limit;
......@@ -5792,7 +6305,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
ntables = limit;
startFrom = offset;
tableFrom = offset;
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset + superTblInfo->childTblLimit )
......@@ -5811,23 +6324,23 @@ static void startMultiThreadInsertData(int threads, char* db_name,
limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
taos_close(taos);
taos_close(taos0);
exit(-1);
}
int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos,
taos0,
db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount,
limit,
offset);
} else {
ntables = g_args.num_of_tables;
startFrom = 0;
tableFrom = 0;
}
taos_close(taos);
taos_close(taos0);
int64_t a = ntables / threads;
if (a < 1) {
......@@ -5841,10 +6354,21 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
if ((superTblInfo)
&& (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest")))) {
if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0)
&& (superTblInfo->iface == REST_IFACE)) {
if (convertHostToServAddr(
g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
exit(-1);
}
}
pthread_t *pids = malloc(threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
......@@ -5857,17 +6381,59 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->minDelay = UINT64_MAX;
if ((NULL == superTblInfo) ||
(0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) {
//pThreadInfo->taos = taos;
(superTblInfo->iface != REST_IFACE)) {
//t_info->taos = taos;
pThreadInfo->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == pThreadInfo->taos) {
errorPrint(
"connect to server fail from insert sub thread, reason: %s\n",
"%s() LN%d, connect to server fail from insert sub thread, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(infos);
exit(-1);
}
if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) {
int columnCount;
if (superTblInfo) {
columnCount = superTblInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if (NULL == pThreadInfo->stmt) {
errorPrint(
"%s() LN%d, failed init stmt, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
char buffer[3000];
char *pstr = buffer;
pstr += sprintf(pstr, "INSERT INTO ? values(?");
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ")");
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
if (ret != 0){
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
ret, taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
}
} else {
pThreadInfo->taos = NULL;
}
......@@ -5875,10 +6441,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
/* if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
*/
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
/* } else {
pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount;
......@@ -5906,6 +6472,11 @@ static void startMultiThreadInsertData(int threads, char* db_name,
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
tsem_destroy(&(pThreadInfo->lock_sem));
if (pThreadInfo->stmt) {
taos_stmt_close(pThreadInfo->stmt);
}
tsem_destroy(&(pThreadInfo->lock_sem));
taos_close(pThreadInfo->taos);
......@@ -6182,13 +6753,13 @@ static int insertTestProcess() {
}
}
taosMsleep(1000);
// taosMsleep(1000);
// create sub threads for inserting data
//start = taosGetTimestampMs();
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.use_metric) {
if (g_Dbs.db[i].superTblCount > 0) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
......@@ -6512,15 +7083,15 @@ static int queryTestProcess() {
b = ntables % threads;
}
uint64_t startFrom = 0;
uint64_t tableFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infosOfSub + i;
pThreadInfo->threadID = i;
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
}
......@@ -6901,12 +7472,12 @@ static int subscribeTestProcess() {
//==== create threads for query for specified table
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
debugPrint("%s() LN%d, sepcified query sqlCount %d.\n",
debugPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount);
} else {
if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) {
errorPrint("%s() LN%d, sepcified query sqlCount %d.\n",
errorPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount);
exit(-1);
......@@ -6939,7 +7510,7 @@ static int subscribeTestProcess() {
//==== create threads for super table query
if (g_queryInfo.superQueryInfo.sqlCount <= 0) {
debugPrint("%s() LN%d, super table query sqlCount %d.\n",
debugPrint("%s() LN%d, super table query sqlCount %"PRIu64".\n",
__func__, __LINE__,
g_queryInfo.superQueryInfo.sqlCount);
} else {
......@@ -6975,17 +7546,17 @@ static int subscribeTestProcess() {
}
for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
uint64_t startFrom = 0;
uint64_t tableFrom = 0;
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
threadInfo *pThreadInfo = infosOfStable + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = j<b?a+1:a;
pThreadInfo->end_table_to = j<b?startFrom+a:startFrom+a-1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = j<b?tableFrom+a:tableFrom+a-1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfStable + seq,
NULL, superSubscribe, pThreadInfo);
......@@ -7104,7 +7675,7 @@ static void setParaFromArg(){
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
g_args.tb_prefix, MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].iface = g_args.iface;
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册