From 72e2c66d86a6f37904d552422b310f23c456d115 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 28 May 2021 17:59:32 +0800 Subject: [PATCH] [TD-4068]: taosdemo support stmt. (#6270) * [TD-4068]: taosdemo support stmt. for easy merge purpose. disabled in master. * fix clang compile error. --- src/kit/taosdemo/taosdemo.c | 1425 ++++++++++++++++++++++++----------- 1 file changed, 998 insertions(+), 427 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 3c528ab26f..70ddbacf44 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -19,6 +19,7 @@ */ #include +#include #define _GNU_SOURCE #define CURL_STATICLIB @@ -52,6 +53,8 @@ #include "taoserror.h" #include "tutil.h" +#define STMT_IFACE_ENABLED 0 + #define REQ_EXTRA_BUF_LEN 1024 #define RESP_BUF_LEN 4096 @@ -61,6 +64,8 @@ extern char configDir[]; #define QUERY_JSON_NAME "query.json" #define SUBSCRIBE_JSON_NAME "subscribe.json" +#define STR_INSERT_INTO "INSERT INTO " + enum TEST_MODE { INSERT_TEST, // 0 QUERY_TEST, // 1 @@ -70,6 +75,8 @@ enum TEST_MODE { #define MAX_RECORDS_PER_REQ 32766 +#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into .. + #define MAX_SQL_SIZE 65536 #define BUFFER_SIZE (65536*2) #define COND_BUF_LEN BUFFER_SIZE - 30 @@ -120,17 +127,24 @@ enum enumSYNC_MODE { MODE_BUT }; +enum enum_TAOS_INTERFACE { + TAOSC_IFACE, + REST_IFACE, + STMT_IFACE, + INTERFACE_BUT +}; + typedef enum enumQUERY_CLASS { SPECIFIED_CLASS, STABLE_CLASS, CLASS_BUT } QUERY_CLASS; -typedef enum enum_INSERT_MODE { +typedef enum enum_PROGRESSIVE_OR_INTERLACE { PROGRESSIVE_INSERT_MODE, INTERLACE_INSERT_MODE, INVALID_INSERT_MODE -} INSERT_MODE; +} PROG_OR_INTERLACE_MODE; typedef enum enumQUERY_TYPE { NO_INSERT_TYPE, @@ -196,6 +210,7 @@ typedef struct SArguments_S { uint32_t test_mode; char * host; uint16_t port; + uint16_t iface; char * user; char * password; char * database; @@ -217,13 +232,13 @@ typedef struct SArguments_S { uint32_t num_of_threads; uint64_t insert_interval; int64_t query_times; - uint64_t interlace_rows; - uint64_t num_of_RPR; // num_of_records_per_req + uint32_t interlace_rows; + uint32_t num_of_RPR; // num_of_records_per_req uint64_t max_sql_len; int64_t num_of_tables; int64_t num_of_DPT; int abort; - int disorderRatio; // 0: no disorder, >0: x% + uint32_t disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision uint32_t method_of_delete; char ** arg_list; @@ -246,12 +261,12 @@ typedef struct SSuperTable_S { uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table char childTblPrefix[MAX_TB_NAME_SIZE]; char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample - char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest + uint16_t iface; // 0: taosc, 1: rest, 2: stmt int64_t childTblLimit; uint64_t childTblOffset; // int multiThreadWriteOneTbl; // 0: no, 1: yes - uint64_t interlaceRows; // + uint32_t interlaceRows; // int disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision uint64_t maxSqlLen; // @@ -364,7 +379,7 @@ typedef struct SDbs_S { typedef struct SpecifiedQueryInfo_S { uint64_t queryInterval; // 0: unlimit > 0 loop/s uint32_t concurrent; - int sqlCount; + uint64_t sqlCount; uint32_t asyncMode; // 0: sync, 1: async uint64_t subscribeInterval; // ms uint64_t queryTimes; @@ -392,7 +407,7 @@ typedef struct SuperQueryInfo_S { uint64_t queryTimes; int64_t childTblCount; char childTblPrefix[MAX_TB_NAME_SIZE]; - int sqlCount; + uint64_t sqlCount; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; int resubAfterConsume; @@ -420,6 +435,7 @@ typedef struct SQueryMetaInfo_S { typedef struct SThreadInfo_S { TAOS * taos; + TAOS_STMT *stmt; int threadID; char db_name[MAX_DB_NAME_SIZE+1]; uint32_t time_precision; @@ -434,6 +450,7 @@ typedef struct SThreadInfo_S { char* cols; bool use_metric; SSuperTable* superTblInfo; + char *buffer; // sql cmd buffer // for async insert tsem_t lock_sem; @@ -541,7 +558,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, char* sqlstr, threadInfo *pThreadInfo); static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, - int disorderRatio, int disorderRange); + int disorderRatio, int disorderRange); /* ************ Global variables ************ */ @@ -557,6 +574,7 @@ SArguments g_args = { 0, // test_mode "127.0.0.1", // host 6030, // port + TAOSC_IFACE, // iface "root", // user #ifdef _TD_POWER_ "powerdb", // password @@ -673,6 +691,8 @@ static void printHelp() { "The host to connect to TDengine. Default is localhost."); printf("%s%s%s%s\n", indent, "-p", indent, "The TCP/IP port number to use for the connection. Default is 0."); + printf("%s%s%s%s\n", indent, "-I", indent, + "The interface (taosc, rest, and stmt) taosdemo uses. Default is 'taosc'."); printf("%s%s%s%s\n", indent, "-d", indent, "Destination database. Default is 'test'."); printf("%s%s%s%s\n", indent, "-a", indent, @@ -761,6 +781,23 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { exit(EXIT_FAILURE); } arguments->port = atoi(argv[++i]); + } else if (strcmp(argv[i], "-I") == 0) { + if (argc == i+1) { + printHelp(); + errorPrint("%s", "\n\t-I need a valid string following!\n"); + exit(EXIT_FAILURE); + } + ++i; + if (0 == strcasecmp(argv[i], "taosc")) { + arguments->iface = TAOSC_IFACE; + } else if (0 == strcasecmp(argv[i], "rest")) { + arguments->iface = REST_IFACE; + } else if (0 == strcasecmp(argv[i], "stmt")) { + arguments->iface = STMT_IFACE; + } else { + errorPrint("%s", "\n\t-I need a valid string following!\n"); + exit(EXIT_FAILURE); + } } else if (strcmp(argv[i], "-u") == 0) { if (argc == i+1) { printHelp(); @@ -897,6 +934,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { && strcasecmp(argv[i], "BIGINT") && strcasecmp(argv[i], "DOUBLE") && strcasecmp(argv[i], "BINARY") + && strcasecmp(argv[i], "TIMESTAMP") && strcasecmp(argv[i], "NCHAR")) { printHelp(); errorPrint("%s", "-b: Invalid data_type!\n"); @@ -918,6 +956,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { && strcasecmp(token, "BIGINT") && strcasecmp(token, "DOUBLE") && strcasecmp(token, "BINARY") + && strcasecmp(token, "TIMESTAMP") && strcasecmp(token, "NCHAR")) { printHelp(); free(g_dupstr); @@ -1019,6 +1058,19 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } } + int columnCount; + for (columnCount = 0; columnCount < MAX_NUM_DATATYPE; columnCount ++) { + if (g_args.datatype[columnCount] == NULL) { + break; + } + } + + if (0 == columnCount) { + perror("data type error!"); + exit(-1); + } + g_args.num_of_CPR = columnCount; + if (((arguments->debug_print) && (arguments->metaFile == NULL)) || arguments->verbose_print) { printf("###################################################################\n"); @@ -1028,7 +1080,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->port ); printf("# User: %s\n", arguments->user); printf("# Password: %s\n", arguments->password); - printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false"); + printf("# Use metric: %s\n", + arguments->use_metric ? "true" : "false"); if (*(arguments->datatype)) { printf("# Specified data type: "); for (int i = 0; i < MAX_NUM_DATATYPE; i++) @@ -1040,7 +1093,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } printf("# Insertion interval: %"PRIu64"\n", arguments->insert_interval); - printf("# Number of records per req: %"PRIu64"\n", + printf("# Number of records per req: %u\n", arguments->num_of_RPR); printf("# Max SQL length: %"PRIu64"\n", arguments->max_sql_len); @@ -1068,8 +1121,6 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } static bool getInfoFromJsonFile(char* file); -//static int generateOneRowDataForStb(SSuperTable* stbInfo); -//static int getDataIntoMemForStb(SSuperTable* stbInfo); static void init_rand_data(); static void tmfclose(FILE *fp) { if (NULL != fp) { @@ -1088,7 +1139,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { TAOS_RES *res = NULL; int32_t code = -1; - for (i = 0; i < 5; i++) { + for (i = 0; i < 5 /* retry */; i++) { if (NULL != res) { taos_free_result(res); res = NULL; @@ -1104,7 +1155,8 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { verbosePrint("%s() LN%d - command: %s\n", __func__, __LINE__, command); if (code != 0) { if (!quiet) { - errorPrint("Failed to execute %s, reason: %s\n", command, taos_errstr(res)); + errorPrint("Failed to execute %s, reason: %s\n", + command, taos_errstr(res)); } taos_free_result(res); //taos_close(taos); @@ -1320,6 +1372,8 @@ static void init_rand_data() { static int printfInsertMeta() { SHOW_PARSE_RESULT_START(); + printf("interface: \033[33m%s\033[0m\n", + (g_args.iface==TAOSC_IFACE)?"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt"); printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); printf("user: \033[33m%s\033[0m\n", g_Dbs.user); @@ -1331,7 +1385,7 @@ static int printfInsertMeta() { g_Dbs.threadCountByCreateTbl); printf("top insert interval: \033[33m%"PRIu64"\033[0m\n", g_args.insert_interval); - printf("number of records per req: \033[33m%"PRIu64"\033[0m\n", + printf("number of records per req: \033[33m%u\033[0m\n", g_args.num_of_RPR); printf("max sql length: \033[33m%"PRIu64"\033[0m\n", g_args.max_sql_len); @@ -1437,8 +1491,9 @@ static int printfInsertMeta() { g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); - printf(" insertMode: \033[33m%s\033[0m\n", - g_Dbs.db[i].superTbls[j].insertMode); + printf(" iface: \033[33m%s\033[0m\n", + (g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc": + (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt"); if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) { printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].childTblLimit); @@ -1456,7 +1511,7 @@ static int printfInsertMeta() { printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); } */ - printf(" interlaceRows: \033[33m%"PRIu64"\033[0m\n", + printf(" interlaceRows: \033[33m%u\033[0m\n", g_Dbs.db[i].superTbls[j].interlaceRows); if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) { @@ -1534,7 +1589,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile); fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount); fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl); - fprintf(fp, "number of records per req: %"PRIu64"\n", g_args.num_of_RPR); + fprintf(fp, "number of records per req: %u\n", g_args.num_of_RPR); fprintf(fp, "max sql length: %"PRIu64"\n", g_args.max_sql_len); fprintf(fp, "database count: %d\n", g_Dbs.dbCount); @@ -1626,11 +1681,12 @@ static void printfInsertMetaToFile(FILE* fp) { g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); - fprintf(fp, " insertMode: %s\n", - g_Dbs.db[i].superTbls[j].insertMode); + fprintf(fp, " iface: %s\n", + (g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc": + (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt"); fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); - fprintf(fp, " interlace rows: %"PRIu64"\n", + fprintf(fp, " interlace rows: %u\n", g_Dbs.db[i].superTbls[j].interlaceRows); if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) { fprintf(fp, " stable insert interval: %"PRIu64"\n", @@ -1643,7 +1699,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " multiThreadWriteOneTbl: yes\n"); } */ - fprintf(fp, " interlaceRows: %"PRIu64"\n", + fprintf(fp, " interlaceRows: %u\n", g_Dbs.db[i].superTbls[j].interlaceRows); fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); @@ -1719,7 +1775,7 @@ static void printfQueryMeta() { if ((SUBSCRIBE_TEST == g_args.test_mode) || (QUERY_TEST == g_args.test_mode)) { printf("specified table query info: \n"); - printf("sqlCount: \033[33m%d\033[0m\n", + printf("sqlCount: \033[33m%"PRIu64"\033[0m\n", g_queryInfo.specifiedQueryInfo.sqlCount); if (g_queryInfo.specifiedQueryInfo.sqlCount > 0) { printf("specified tbl query times:\n"); @@ -1739,15 +1795,15 @@ static void printfQueryMeta() { printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { - printf(" sql[%d]: \033[33m%s\033[0m\n", + for (uint64_t i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { + printf(" sql[%"PRIu64"]: \033[33m%s\033[0m\n", i, g_queryInfo.specifiedQueryInfo.sql[i]); } printf("\n"); } printf("super table query info:\n"); - printf("sqlCount: \033[33m%d\033[0m\n", + printf("sqlCount: \033[33m%"PRIu64"\033[0m\n", g_queryInfo.superQueryInfo.sqlCount); if (g_queryInfo.superQueryInfo.sqlCount > 0) { @@ -2803,7 +2859,7 @@ static int createDatabasesAndStables() { int validStbCount = 0; - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); ret = queryDbExec(taos, command, NO_INSERT_TYPE, true); @@ -2813,7 +2869,7 @@ static int createDatabasesAndStables() { &g_Dbs.db[i].superTbls[j]); if (0 != ret) { - errorPrint("create super table %d failed!\n\n", j); + errorPrint("create super table %"PRIu64" failed!\n\n", j); continue; } } @@ -2841,7 +2897,7 @@ static void* createTable(void *sarg) threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int64_t lastPrintTime = taosGetTimestampMs(); + uint64_t lastPrintTime = taosGetTimestampMs(); int buff_len; buff_len = BUFFER_SIZE / 8; @@ -2915,7 +2971,7 @@ static void* createTable(void *sarg) return NULL; } - int64_t currentPrintTime = taosGetTimestampMs(); + uint64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n", pThreadInfo->threadID, pThreadInfo->start_table_from, i); @@ -2934,11 +2990,11 @@ static void* createTable(void *sarg) } static int startMultiThreadCreateChildTable( - char* cols, int threads, uint64_t startFrom, int64_t ntables, + char* cols, int threads, uint64_t tableFrom, int64_t ntables, char* db_name, SSuperTable* superTblInfo) { pthread_t *pids = malloc(threads * sizeof(pthread_t)); - threadInfo *infos = malloc(threads * sizeof(threadInfo)); + threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { printf("malloc failed\n"); @@ -2978,10 +3034,10 @@ static int startMultiThreadCreateChildTable( return -1; } - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; pThreadInfo->use_metric = true; pThreadInfo->cols = cols; pThreadInfo->minDelay = UINT64_MAX; @@ -3011,7 +3067,7 @@ static void createChildTables() { if (g_Dbs.use_metric) { if (g_Dbs.db[i].superTblCount > 0) { // with super table - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { continue; @@ -3019,15 +3075,15 @@ static void createChildTables() { verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); - uint64_t startFrom = 0; + uint64_t tableFrom = 0; g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n", - __func__, __LINE__, g_totalChildTables, startFrom); + __func__, __LINE__, g_totalChildTables, tableFrom); startMultiThreadCreateChildTable( g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, g_Dbs.threadCountByCreateTbl, - startFrom, + tableFrom, g_Dbs.db[i].superTbls[j].childTblCount, g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); } @@ -3036,11 +3092,14 @@ static void createChildTables() { // normal table len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP"); for (int j = 0; j < g_args.num_of_CPR; j++) { - if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0) + if (g_args.datatype[j] + && ((strncasecmp(g_args.datatype[j], + "BINARY", strlen("BINARY")) == 0) || (strncasecmp(g_args.datatype[j], - "NCHAR", strlen("NCHAR")) == 0)) { + "NCHAR", strlen("NCHAR")) == 0))) { snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, - ", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary); + ", COL%d %s(%d)", j, g_args.datatype[j], + g_args.len_of_binary); } else { snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ", COL%d %s", j, g_args.datatype[j]); @@ -3132,10 +3191,12 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { return 0; } +#if 0 int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) { // TODO return 0; } +#endif /* Read 10000 lines at most. If more than 10000 lines, continue to read after using @@ -3520,9 +3581,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { // rows per table need be less than insert batch if (g_args.interlace_rows > g_args.num_of_RPR) { - printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n", + printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n", g_args.interlace_rows, g_args.num_of_RPR); - printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", + printf(" interlace rows value will be set to num_of_records_per_req %u\n\n", g_args.num_of_RPR); prompt(); g_args.interlace_rows = g_args.num_of_RPR; @@ -3837,15 +3898,24 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest - if (insertMode && insertMode->type == cJSON_String - && insertMode->valuestring != NULL) { - tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, - insertMode->valuestring, MAX_DB_NAME_SIZE); - } else if (!insertMode) { - tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE); + cJSON *stbIface = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt + if (stbIface && stbIface->type == cJSON_String + && stbIface->valuestring != NULL) { + if (0 == strcasecmp(stbIface->valuestring, "taosc")) { + g_Dbs.db[i].superTbls[j].iface= TAOSC_IFACE; + } else if (0 == strcasecmp(stbIface->valuestring, "rest")) { + g_Dbs.db[i].superTbls[j].iface= REST_IFACE; + } else if (0 == strcasecmp(stbIface->valuestring, "stmt")) { + g_Dbs.db[i].superTbls[j].iface= STMT_IFACE; + } else { + errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n", + __func__, __LINE__, stbIface->valuestring); + goto PARSE_OVER; + } + } else if (!stbIface) { + g_Dbs.db[i].superTbls[j].iface = TAOSC_IFACE; } else { - printf("ERROR: failed to read json, insert_mode not found\n"); + errorPrint("%s", "failed to read json, insert_mode not found\n"); goto PARSE_OVER; } @@ -3936,9 +4006,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON* maxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len"); - if (maxSqlLen && maxSqlLen->type == cJSON_Number) { - int32_t len = maxSqlLen->valueint; + cJSON* stbMaxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len"); + if (stbMaxSqlLen && stbMaxSqlLen->type == cJSON_Number) { + int32_t len = stbMaxSqlLen->valueint; if (len > TSDB_MAX_ALLOWED_SQL_LEN) { len = TSDB_MAX_ALLOWED_SQL_LEN; } else if (len < 5) { @@ -3948,7 +4018,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!maxSqlLen) { g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len; } else { - errorPrint("%s() LN%d, failed to read json, maxSqlLen input mistake\n", + errorPrint("%s() LN%d, failed to read json, stbMaxSqlLen input mistake\n", __func__, __LINE__); goto PARSE_OVER; } @@ -3970,24 +4040,25 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } */ - cJSON* interlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows"); - if (interlaceRows && interlaceRows->type == cJSON_Number) { - if (interlaceRows->valueint < 0) { + cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows"); + if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) { + if (stbInterlaceRows->valueint < 0) { errorPrint("%s() LN%d, failed to read json, interlace rows input mistake\n", __func__, __LINE__); goto PARSE_OVER; } - g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint; + g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint; // rows per table need be less than insert batch if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) { - printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n", - i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR); - printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", + printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > num_of_records_per_req %u\n\n", + i, j, g_Dbs.db[i].superTbls[j].interlaceRows, + g_args.num_of_RPR); + printf(" interlace rows value will be set to num_of_records_per_req %u\n\n", g_args.num_of_RPR); prompt(); g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR; } - } else if (!interlaceRows) { + } else if (!stbInterlaceRows) { g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req } else { errorPrint( @@ -4199,7 +4270,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { if (concurrent && concurrent->type == cJSON_Number) { if (concurrent->valueint <= 0) { errorPrint( - "%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n", + "%s() LN%d, query sqlCount %"PRIu64" or concurrent %d is not correct.\n", __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount, g_queryInfo.specifiedQueryInfo.concurrent); @@ -4613,7 +4684,7 @@ static void prepareSampleData() { static void postFreeResource() { tmfclose(g_fpOfInsertResult); for (int i = 0; i < g_Dbs.dbCount; i++) { - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) { free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL; @@ -4661,16 +4732,22 @@ static int getRowDataFromSample( return dataLen; } -static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stbInfo) { +static int64_t generateStbRowData( + SSuperTable* stbInfo, + char* recBuf, int64_t timestamp) +{ int64_t dataLen = 0; char *pstr = recBuf; int64_t maxLen = MAX_DATA_SIZE; - dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "(%" PRId64 ",", timestamp); + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, + "(%" PRId64 ",", timestamp); for (int i = 0; i < stbInfo->columnCount; i++) { - if ((0 == strncasecmp(stbInfo->columns[i].dataType, "BINARY", strlen("BINARY"))) - || (0 == strncasecmp(stbInfo->columns[i].dataType, "NCHAR", strlen("NCHAR")))) { + if ((0 == strncasecmp(stbInfo->columns[i].dataType, + "BINARY", strlen("BINARY"))) + || (0 == strncasecmp(stbInfo->columns[i].dataType, + "NCHAR", strlen("NCHAR")))) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { errorPrint( "binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); @@ -4686,23 +4763,23 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\',", buf); tmfree(buf); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "INT", 3)) { + "INT", strlen("INT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d,", rand_int()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "BIGINT", 6)) { + "BIGINT", strlen("BIGINT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%"PRId64",", rand_bigint()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "FLOAT", 5)) { + "FLOAT", strlen("FLOAT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%f,", rand_float()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "DOUBLE", 6)) { + "DOUBLE", strlen("DOUBLE"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%f,", rand_double()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "SMALLINT", 8)) { + "SMALLINT", strlen("SMALLINT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d,", rand_smallint()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, @@ -4732,46 +4809,38 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb } static int64_t generateData(char *recBuf, char **data_type, - int num_of_cols, int64_t timestamp, int lenOfBinary) { + int64_t timestamp, int lenOfBinary) { memset(recBuf, 0, MAX_DATA_SIZE); char *pstr = recBuf; pstr += sprintf(pstr, "(%" PRId64, timestamp); - int c = 0; - for (; c < MAX_NUM_DATATYPE; c++) { - if (data_type[c] == NULL) { - break; - } - } - - if (0 == c) { - perror("data type error!"); - exit(-1); - } + int columnCount = g_args.num_of_CPR; - for (int i = 0; i < c; i++) { - if (strcasecmp(data_type[i % c], "TINYINT") == 0) { + for (int i = 0; i < columnCount; i++) { + if (strcasecmp(data_type[i % columnCount], "TINYINT") == 0) { pstr += sprintf(pstr, ",%d", rand_tinyint() ); - } else if (strcasecmp(data_type[i % c], "SMALLINT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "SMALLINT") == 0) { pstr += sprintf(pstr, ",%d", rand_smallint()); - } else if (strcasecmp(data_type[i % c], "INT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "INT") == 0) { pstr += sprintf(pstr, ",%d", rand_int()); - } else if (strcasecmp(data_type[i % c], "BIGINT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "BIGINT") == 0) { pstr += sprintf(pstr, ",%" PRId64, rand_bigint()); - } else if (strcasecmp(data_type[i % c], "FLOAT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "TIMESTAMP") == 0) { + pstr += sprintf(pstr, ",%" PRId64, rand_bigint()); + } else if (strcasecmp(data_type[i % columnCount], "FLOAT") == 0) { pstr += sprintf(pstr, ",%10.4f", rand_float()); - } else if (strcasecmp(data_type[i % c], "DOUBLE") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "DOUBLE") == 0) { double t = rand_double(); pstr += sprintf(pstr, ",%20.8f", t); - } else if (strcasecmp(data_type[i % c], "BOOL") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "BOOL") == 0) { bool b = rand_bool() & 1; pstr += sprintf(pstr, ",%s", b ? "true" : "false"); - } else if (strcasecmp(data_type[i % c], "BINARY") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "BINARY") == 0) { char *s = malloc(lenOfBinary); rand_string(s, lenOfBinary); pstr += sprintf(pstr, ",\"%s\"", s); free(s); - } else if (strcasecmp(data_type[i % c], "NCHAR") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "NCHAR") == 0) { char *s = malloc(lenOfBinary); rand_string(s, lenOfBinary); pstr += sprintf(pstr, ",\"%s\"", s); @@ -4818,35 +4887,60 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { return 0; } -static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k) +static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { - int affectedRows; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + int32_t affectedRows; + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, - __func__, __LINE__, buffer); - if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { - affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false); - } else if (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest"))) { - if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, - buffer, NULL /* not set result file */)) { - affectedRows = -1; - printf("========restful return fail, threadID[%d]\n", - pThreadInfo->threadID); - } else { - affectedRows = k; - } - } else { - errorPrint("%s() LN%d: unknown insert mode: %s\n", - __func__, __LINE__, superTblInfo->insertMode); - affectedRows = 0; + verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, + __func__, __LINE__, pThreadInfo->buffer); + + uint16_t iface; + if (superTblInfo) + iface = superTblInfo->iface; + else + iface = g_args.iface; + + debugPrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, + __func__, __LINE__, + (g_args.iface==TAOSC_IFACE)? + "taosc":(g_args.iface==REST_IFACE)?"rest":"stmt"); + + switch(iface) { + case TAOSC_IFACE: + affectedRows = queryDbExec( + pThreadInfo->taos, + pThreadInfo->buffer, INSERT_TYPE, false); + break; + + case REST_IFACE: + if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, + pThreadInfo->buffer, NULL /* not set result file */)) { + affectedRows = -1; + printf("========restful return fail, threadID[%d]\n", + pThreadInfo->threadID); + } else { + affectedRows = k; + } + break; + + case STMT_IFACE: + debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt); + if (0 != taos_stmt_execute(pThreadInfo->stmt)) { + errorPrint("%s() LN%d, failied to execute insert statement\n", + __func__, __LINE__); + exit(-1); + } + affectedRows = k; + break; + + default: + errorPrint("%s() LN%d: unknown insert mode: %d\n", + __func__, __LINE__, superTblInfo->iface); + affectedRows = 0; } - } else { - affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false); - } - return affectedRows; + return affectedRows; } static void getTableName(char *pTblName, @@ -4874,90 +4968,128 @@ static void getTableName(char *pTblName, } } -static int64_t generateDataTail( - SSuperTable* superTblInfo, - uint64_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows, - uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) { - uint64_t len = 0; - uint32_t ncols_per_record = 1; // count first col ts +static int32_t generateDataTailWithoutStb( + uint32_t batch, char* buffer, + int64_t remainderBufLen, int64_t insertRows, + uint64_t recordFrom, int64_t startTime, + /* int64_t *pSamplePos, */int64_t *dataLen) { + uint64_t len = 0; char *pstr = buffer; - if (superTblInfo == NULL) { - uint32_t datatypeSeq = 0; - while(g_args.datatype[datatypeSeq]) { - datatypeSeq ++; - ncols_per_record ++; + verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch); + + int32_t k = 0; + for (k = 0; k < batch;) { + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + + int64_t retLen = 0; + + char **data_type = g_args.datatype; + int lenOfBinary = g_args.len_of_binary; + + retLen = generateData(data, data_type, + startTime + getTSRandTail( + (int64_t) DEFAULT_TIMESTAMP_STEP, k, + g_args.disorderRatio, + g_args.disorderRange), + lenOfBinary); + + if (len > remainderBufLen) + break; + + pstr += sprintf(pstr, "%s", data); + k++; + len += retLen; + remainderBufLen -= retLen; + + verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n", + __func__, __LINE__, len, k, buffer); + + recordFrom ++; + + if (recordFrom >= insertRows) { + break; } } - verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch); + *dataLen = len; + return k; +} + +static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, + int disorderRatio, int disorderRange) +{ + int64_t randTail = timeStampStep * seq; + if (disorderRatio > 0) { + int rand_num = taosRandom() % 100; + if(rand_num < disorderRatio) { + randTail = (randTail + + (taosRandom() % disorderRange + 1)) * (-1); + debugPrint("rand data generated, back %"PRId64"\n", randTail); + } + } + + return randTail; +} + +static int32_t generateStbDataTail( + SSuperTable* superTblInfo, + uint32_t batch, char* buffer, + int64_t remainderBufLen, int64_t insertRows, + uint64_t recordFrom, int64_t startTime, + int64_t *pSamplePos, int64_t *dataLen) { + uint64_t len = 0; + + char *pstr = buffer; bool tsRand; - if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, - "rand", strlen("rand")))) { - tsRand = true; - } else { - tsRand = false; + if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { + tsRand = true; + } else { + tsRand = false; } + verbosePrint("%s() LN%d batch=%u\n", __func__, __LINE__, batch); - uint64_t k = 0; + int32_t k = 0; for (k = 0; k < batch;) { char data[MAX_DATA_SIZE]; memset(data, 0, MAX_DATA_SIZE); int64_t retLen = 0; - if (superTblInfo) { - if (tsRand) { - retLen = generateRowData( - data, - startTime + getTSRandTail( - superTblInfo->timeStampStep, k, - superTblInfo->disorderRatio, - superTblInfo->disorderRange), - superTblInfo); - } else { - retLen = getRowDataFromSample( - data, - remainderBufLen, - startTime + superTblInfo->timeStampStep * k, - superTblInfo, - pSamplePos); - } - if (retLen > remainderBufLen) { - break; - } - - pstr += snprintf(pstr , retLen + 1, "%s", data); - k++; - len += retLen; - remainderBufLen -= retLen; - } else { - char **data_type = g_args.datatype; - int lenOfBinary = g_args.len_of_binary; - retLen = generateData(data, data_type, - ncols_per_record, + if (tsRand) { + retLen = generateStbRowData(superTblInfo, data, startTime + getTSRandTail( - DEFAULT_TIMESTAMP_STEP, k, - g_args.disorderRatio, - g_args.disorderRange), - lenOfBinary); - if (len > remainderBufLen) - break; + superTblInfo->timeStampStep, k, + superTblInfo->disorderRatio, + superTblInfo->disorderRange) + ); + } else { + retLen = getRowDataFromSample( + data, + remainderBufLen, + startTime + superTblInfo->timeStampStep * k, + superTblInfo, + pSamplePos); + } - pstr += sprintf(pstr, "%s", data); - k++; - len += retLen; - remainderBufLen -= retLen; + if (retLen > remainderBufLen) { + break; } - verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n", + pstr += snprintf(pstr , retLen + 1, "%s", data); + k++; + len += retLen; + remainderBufLen -= retLen; + + verbosePrint("%s() LN%d len=%"PRIu64" k=%u \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); - startFrom ++; + recordFrom ++; - if (startFrom >= insertRows) { + if (recordFrom >= insertRows) { break; } } @@ -4966,17 +5098,41 @@ static int64_t generateDataTail( return k; } -static int generateSQLHead(char *tableName, int32_t tableSeq, - threadInfo* pThreadInfo, SSuperTable* superTblInfo, + +static int generateSQLHeadWithoutStb(char *tableName, + char *dbName, char *buffer, int remainderBufLen) { int len; -#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into .. char headBuf[HEAD_BUFF_LEN]; - if (superTblInfo) { - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + len = snprintf( + headBuf, + HEAD_BUFF_LEN, + "%s.%s values", + dbName, + tableName); + + if (len > remainderBufLen) + return -1; + + tstrncpy(buffer, headBuf, len + 1); + + return len; +} + +static int generateStbSQLHead( + SSuperTable* superTblInfo, + char *tableName, int32_t tableSeq, + char *dbName, + char *buffer, int remainderBufLen) +{ + int len; + + char headBuf[HEAD_BUFF_LEN]; + + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo, tableSeq); @@ -4995,9 +5151,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, headBuf, HEAD_BUFF_LEN, "%s.%s using %s.%s tags %s values", - pThreadInfo->db_name, + dbName, tableName, - pThreadInfo->db_name, + dbName, superTblInfo->sTblName, tagsValBuf); tmfree(tagsValBuf); @@ -5006,22 +5162,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, headBuf, HEAD_BUFF_LEN, "%s.%s values", - pThreadInfo->db_name, + dbName, tableName); } else { len = snprintf( headBuf, HEAD_BUFF_LEN, "%s.%s values", - pThreadInfo->db_name, - tableName); - } - } else { - len = snprintf( - headBuf, - HEAD_BUFF_LEN, - "%s.%s values", - pThreadInfo->db_name, + dbName, tableName); } @@ -5033,8 +5181,11 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, return len; } -static int64_t generateInterlaceDataBuffer( - char *tableName, uint64_t batchPerTbl, uint64_t i, uint64_t batchPerTblTimes, +static int32_t generateStbInterlaceData( + SSuperTable *superTblInfo, + char *tableName, uint32_t batchPerTbl, + uint64_t i, + uint32_t batchPerTblTimes, uint64_t tableSeq, threadInfo *pThreadInfo, char *buffer, int64_t insertRows, @@ -5043,10 +5194,11 @@ static int64_t generateInterlaceDataBuffer( { assert(buffer); char *pstr = buffer; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, - superTblInfo, pstr, *pRemainderBufLen); + int headLen = generateStbSQLHead( + superTblInfo, + tableName, tableSeq, pThreadInfo->db_name, + pstr, *pRemainderBufLen); if (headLen <= 0) { return 0; @@ -5060,29 +5212,25 @@ static int64_t generateInterlaceDataBuffer( int64_t dataLen = 0; - verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%"PRIu64" batchPerTbl = %"PRIu64"\n", + verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%u batchPerTbl = %u\n", pThreadInfo->threadID, __func__, __LINE__, i, batchPerTblTimes, batchPerTbl); - if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { + if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { startTime = taosGetTimestamp(pThreadInfo->time_precision); - } - } else { - startTime = 1500000000000; } - int64_t k = generateDataTail( - superTblInfo, - batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, - startTime, - &(pThreadInfo->samplePos), &dataLen); + int32_t k = generateStbDataTail( + superTblInfo, + batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, + startTime, + &(pThreadInfo->samplePos), &dataLen); if (k == batchPerTbl) { pstr += dataLen; *pRemainderBufLen -= dataLen; } else { - debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %"PRIu64"\n", + debugPrint("%s() LN%d, generated data tail: %u, not equal batch per table: %u\n", __func__, __LINE__, k, batchPerTbl); pstr -= headLen; pstr[0] = '\0'; @@ -5092,50 +5240,361 @@ static int64_t generateInterlaceDataBuffer( return k; } -static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, - int disorderRatio, int disorderRange) +static int64_t generateInterlaceDataWithoutStb( + char *tableName, uint32_t batch, + uint64_t tableSeq, + char *dbName, char *buffer, + int64_t insertRows, + int64_t startTime, + uint64_t *pRemainderBufLen) { - int64_t randTail = timeStampStep * seq; - if (disorderRatio > 0) { - int rand_num = taosRandom() % 100; - if(rand_num < disorderRatio) { - randTail = (randTail + - (taosRandom() % disorderRange + 1)) * (-1); - debugPrint("rand data generated, back %"PRId64"\n", randTail); + assert(buffer); + char *pstr = buffer; + + int headLen = generateSQLHeadWithoutStb( + tableName, dbName, + pstr, *pRemainderBufLen); + + if (headLen <= 0) { + return 0; + } + + pstr += headLen; + *pRemainderBufLen -= headLen; + + int64_t dataLen = 0; + + int32_t k = generateDataTailWithoutStb( + batch, pstr, *pRemainderBufLen, insertRows, 0, + startTime, + &dataLen); + + if (k == batch) { + pstr += dataLen; + *pRemainderBufLen -= dataLen; + } else { + debugPrint("%s() LN%d, generated data tail: %d, not equal batch per table: %u\n", + __func__, __LINE__, k, batch); + pstr -= headLen; + pstr[0] = '\0'; + k = 0; + } + + return k; +} + +#if STMT_IFACE_ENABLED == 1 +static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, + char *dataType, int32_t dataLen, char **ptr) +{ + if (0 == strncasecmp(dataType, + "BINARY", strlen("BINARY"))) { + if (dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint( "binary length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + return -1; + } + char *bind_binary = (char *)*ptr; + rand_string(bind_binary, dataLen); + + bind->buffer_type = TSDB_DATA_TYPE_BINARY; + bind->buffer_length = dataLen; + bind->buffer = bind_binary; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "NCHAR", strlen("NCHAR"))) { + if (dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint( "nchar length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + return -1; } + char *bind_nchar = (char *)*ptr; + rand_string(bind_nchar, dataLen); + + bind->buffer_type = TSDB_DATA_TYPE_NCHAR; + bind->buffer_length = strlen(bind_nchar); + bind->buffer = bind_nchar; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "INT", strlen("INT"))) { + int32_t *bind_int = (int32_t *)*ptr; + + *bind_int = rand_int(); + bind->buffer_type = TSDB_DATA_TYPE_INT; + bind->buffer_length = sizeof(int32_t); + bind->buffer = bind_int; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "BIGINT", strlen("BIGINT"))) { + int64_t *bind_bigint = (int64_t *)*ptr; + + *bind_bigint = rand_bigint(); + bind->buffer_type = TSDB_DATA_TYPE_BIGINT; + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_bigint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "FLOAT", strlen("FLOAT"))) { + float *bind_float = (float *) *ptr; + + *bind_float = rand_float(); + bind->buffer_type = TSDB_DATA_TYPE_FLOAT; + bind->buffer_length = sizeof(float); + bind->buffer = bind_float; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "DOUBLE", strlen("DOUBLE"))) { + double *bind_double = (double *)*ptr; + + *bind_double = rand_double(); + bind->buffer_type = TSDB_DATA_TYPE_DOUBLE; + bind->buffer_length = sizeof(double); + bind->buffer = bind_double; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "SMALLINT", strlen("SMALLINT"))) { + int16_t *bind_smallint = (int16_t *)*ptr; + + *bind_smallint = rand_smallint(); + bind->buffer_type = TSDB_DATA_TYPE_SMALLINT; + bind->buffer_length = sizeof(int16_t); + bind->buffer = bind_smallint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "TINYINT", strlen("TINYINT"))) { + int8_t *bind_tinyint = (int8_t *)*ptr; + + *bind_tinyint = rand_tinyint(); + bind->buffer_type = TSDB_DATA_TYPE_TINYINT; + bind->buffer_length = sizeof(int8_t); + bind->buffer = bind_tinyint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "BOOL", strlen("BOOL"))) { + int8_t *bind_bool = (int8_t *)*ptr; + + *bind_bool = rand_bool(); + bind->buffer_type = TSDB_DATA_TYPE_BOOL; + bind->buffer_length = sizeof(int8_t); + bind->buffer = bind_bool; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "TIMESTAMP", strlen("TIMESTAMP"))) { + int64_t *bind_ts2 = (int64_t *) *ptr; + + *bind_ts2 = rand_bigint(); + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts2; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else { + errorPrint( "No support data type: %s\n", + dataType); + return -1; } - return randTail; + return 0; } -static int64_t generateProgressiveDataBuffer( +static int32_t prepareStmtWithoutStb( + TAOS_STMT *stmt, char *tableName, - int64_t tableSeq, - threadInfo *pThreadInfo, char *buffer, + uint32_t batch, int64_t insertRows, - uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, - int64_t *pRemainderBufLen) + int64_t recordFrom, + int64_t startTime) { - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + int ret = taos_stmt_set_tbname(stmt, tableName); + if (ret != 0) { + errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n", + tableName, ret, taos_errstr(NULL)); + return ret; + } - int ncols_per_record = 1; // count first col ts + char **data_type = g_args.datatype; - if (superTblInfo == NULL) { - int datatypeSeq = 0; - while(g_args.datatype[datatypeSeq]) { - datatypeSeq ++; - ncols_per_record ++; + char *bindArray = malloc(sizeof(TAOS_BIND) * (g_args.num_of_CPR + 1)); + if (bindArray == NULL) { + errorPrint("Failed to allocate %d bind params\n", + (g_args.num_of_CPR + 1)); + return -1; + } + + int32_t k = 0; + for (k = 0; k < batch;) { + /* columnCount + 1 (ts) */ + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + + char *ptr = data; + TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0); + + int64_t *bind_ts; + + bind_ts = (int64_t *)ptr; + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + *bind_ts = startTime + getTSRandTail( + (int64_t)DEFAULT_TIMESTAMP_STEP, k, + g_args.disorderRatio, + g_args.disorderRange); + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + ptr += bind->buffer_length; + + for (int i = 0; i < g_args.num_of_CPR; i ++) { + bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1))); + if ( -1 == prepareStmtBindArrayByType( + bind, + data_type[i], + g_args.len_of_binary, + &ptr)) { + return -1; + } + } + taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray); + // if msg > 3MB, break + taos_stmt_add_batch(stmt); + + k++; + recordFrom ++; + if (recordFrom >= insertRows) { + break; + } + } + + return k; +} + +static int32_t prepareStbStmt(SSuperTable *stbInfo, + TAOS_STMT *stmt, + char *tableName, uint32_t batch, + uint64_t insertRows, + uint64_t recordFrom, + int64_t startTime, char *buffer) +{ + int ret = taos_stmt_set_tbname(stmt, tableName); + if (ret != 0) { + errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n", + tableName, ret, taos_errstr(NULL)); + return ret; + } + + char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); + if (bindArray == NULL) { + errorPrint("Failed to allocate %d bind params\n", + (stbInfo->columnCount + 1)); + return -1; } - } + bool tsRand; + if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) { + tsRand = true; + } else { + tsRand = false; + } + + uint32_t k; + for (k = 0; k < batch;) { + /* columnCount + 1 (ts) */ + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + + char *ptr = data; + TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0); + + int64_t *bind_ts; + + bind_ts = (int64_t *)ptr; + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + if (tsRand) { + *bind_ts = startTime + getTSRandTail( + stbInfo->timeStampStep, k, + stbInfo->disorderRatio, + stbInfo->disorderRange); + } else { + *bind_ts = startTime + stbInfo->timeStampStep * k; + } + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + ptr += bind->buffer_length; + + for (int i = 0; i < stbInfo->columnCount; i ++) { + bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1))); + if ( -1 == prepareStmtBindArrayByType( + bind, + stbInfo->columns[i].dataType, + stbInfo->columns[i].dataLen, + &ptr)) { + return -1; + } + } + taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray); + // if msg > 3MB, break + taos_stmt_add_batch(stmt); + + k++; + recordFrom ++; + if (recordFrom >= insertRows) { + break; + } + } + + return k; +} +#endif + +static int32_t generateStbProgressiveData( + SSuperTable *superTblInfo, + char *tableName, + int64_t tableSeq, + char *dbName, char *buffer, + int64_t insertRows, + uint64_t recordFrom, int64_t startTime, int64_t *pSamplePos, + int64_t *pRemainderBufLen) +{ assert(buffer != NULL); char *pstr = buffer; - int64_t k = 0; - memset(buffer, 0, *pRemainderBufLen); - int64_t headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, + int64_t headLen = generateStbSQLHead( + superTblInfo, + tableName, tableSeq, dbName, buffer, *pRemainderBufLen); if (headLen <= 0) { @@ -5145,29 +5604,64 @@ static int64_t generateProgressiveDataBuffer( *pRemainderBufLen -= headLen; int64_t dataLen; - k = generateDataTail(superTblInfo, - g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom, + + return generateStbDataTail(superTblInfo, + g_args.num_of_RPR, pstr, *pRemainderBufLen, + insertRows, recordFrom, startTime, pSamplePos, &dataLen); +} - return k; +static int32_t generateProgressiveDataWithoutStb( + char *tableName, + /* int64_t tableSeq, */ + threadInfo *pThreadInfo, char *buffer, + int64_t insertRows, + uint64_t recordFrom, int64_t startTime, /*int64_t *pSamplePos, */ + int64_t *pRemainderBufLen) +{ + assert(buffer != NULL); + char *pstr = buffer; + + memset(buffer, 0, *pRemainderBufLen); + + int64_t headLen = generateSQLHeadWithoutStb( + tableName, pThreadInfo->db_name, + buffer, *pRemainderBufLen); + + if (headLen <= 0) { + return 0; + } + pstr += headLen; + *pRemainderBufLen -= headLen; + + int64_t dataLen; + + return generateDataTailWithoutStb( + g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, recordFrom, + startTime, + /*pSamplePos, */&dataLen); } static void printStatPerThread(threadInfo *pThreadInfo) { - fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n", + fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n", pThreadInfo->threadID, pThreadInfo->totalInsertRows, pThreadInfo->totalAffectedRows, (double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0))); } +// sync write interlace data static void* syncWriteInterlace(threadInfo *pThreadInfo) { debugPrint("[%d] %s() LN%d: ### interlace write\n", pThreadInfo->threadID, __func__, __LINE__); int64_t insertRows; - uint64_t interlaceRows; + uint32_t interlaceRows; + uint64_t maxSqlLen; + int64_t nTimeStampStep; + uint64_t insert_interval; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; @@ -5180,45 +5674,48 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { } else { interlaceRows = superTblInfo->interlaceRows; } + maxSqlLen = superTblInfo->maxSqlLen; + nTimeStampStep = superTblInfo->timeStampStep; + insert_interval = superTblInfo->insertInterval; } else { insertRows = g_args.num_of_DPT; interlaceRows = g_args.interlace_rows; + maxSqlLen = g_args.max_sql_len; + nTimeStampStep = DEFAULT_TIMESTAMP_STEP; + insert_interval = g_args.insert_interval; } + debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, + pThreadInfo->ntables, insertRows); + if (interlaceRows > insertRows) interlaceRows = insertRows; if (interlaceRows > g_args.num_of_RPR) interlaceRows = g_args.num_of_RPR; - int insertMode; + uint32_t batchPerTbl = interlaceRows; + uint32_t batchPerTblTimes; - if (interlaceRows > 0) { - insertMode = INTERLACE_INSERT_MODE; + if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { + batchPerTblTimes = + g_args.num_of_RPR / interlaceRows; } else { - insertMode = PROGRESSIVE_INSERT_MODE; + batchPerTblTimes = 1; } - // TODO: prompt tbl count multple interlace rows and batch - // - - uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; - char* buffer = calloc(maxSqlLen, 1); - if (NULL == buffer) { + pThreadInfo->buffer = calloc(maxSqlLen, 1); + if (NULL == pThreadInfo->buffer) { errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n", __func__, __LINE__, maxSqlLen, strerror(errno)); return NULL; } - char tableName[TSDB_TABLE_NAME_LEN]; - pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; - int64_t nTimeStampStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; - - uint64_t insert_interval = - superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; uint64_t st = 0; uint64_t et = UINT64_MAX; @@ -5227,69 +5724,98 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { uint64_t endTs; uint64_t tableSeq = pThreadInfo->start_table_from; - - debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", - pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from, - pThreadInfo->ntables, insertRows); - int64_t startTime = pThreadInfo->start_time; - uint64_t batchPerTbl = interlaceRows; - uint64_t batchPerTblTimes; - - if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { - batchPerTblTimes = - g_args.num_of_RPR / interlaceRows; - } else { - batchPerTblTimes = 1; - } - uint64_t generatedRecPerTbl = 0; bool flagSleep = true; uint64_t sleepTimeTotal = 0; - char *strInsertInto = "insert into "; - int nInsertBufLen = strlen(strInsertInto); - while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { if ((flagSleep) && (insert_interval)) { st = taosGetTimestampMs(); flagSleep = false; } // generate data - memset(buffer, 0, maxSqlLen); + memset(pThreadInfo->buffer, 0, maxSqlLen); uint64_t remainderBufLen = maxSqlLen; - char *pstr = buffer; + char *pstr = pThreadInfo->buffer; - int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto); + int len = snprintf(pstr, + strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); pstr += len; remainderBufLen -= len; - uint64_t recOfBatch = 0; + uint32_t recOfBatch = 0; + + for (uint32_t i = 0; i < batchPerTblTimes; i ++) { + char tableName[TSDB_TABLE_NAME_LEN]; - for (uint64_t i = 0; i < batchPerTblTimes; i ++) { getTableName(tableName, pThreadInfo, tableSeq); if (0 == strlen(tableName)) { errorPrint("[%d] %s() LN%d, getTableName return null\n", pThreadInfo->threadID, __func__, __LINE__); - free(buffer); + free(pThreadInfo->buffer); return NULL; } uint64_t oldRemainderLen = remainderBufLen; - int64_t generated = generateInterlaceDataBuffer( - tableName, batchPerTbl, i, batchPerTblTimes, - tableSeq, - pThreadInfo, pstr, - insertRows, - startTime, - &remainderBufLen); - - debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n", + + int32_t generated; + if (superTblInfo) { + if (superTblInfo->iface == STMT_IFACE) { +#if STMT_IFACE_ENABLED == 1 + generated = prepareStbStmt(superTblInfo, + pThreadInfo->stmt, + tableName, + batchPerTbl, + insertRows, i, + startTime, + pThreadInfo->buffer); +#else + generated = -1; +#endif + } else { + generated = generateStbInterlaceData( + superTblInfo, + tableName, batchPerTbl, i, + batchPerTblTimes, + tableSeq, + pThreadInfo, pstr, + insertRows, + startTime, + &remainderBufLen); + } + } else { + if (g_args.iface == STMT_IFACE) { + debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", + pThreadInfo->threadID, + __func__, __LINE__, + tableName, batchPerTbl, startTime); +#if STMT_IFACE_ENABLED == 1 + generated = prepareStmtWithoutStb( + pThreadInfo->stmt, tableName, + batchPerTbl, + insertRows, i, + startTime); +#else + generated = -1; +#endif + } else { + generated = generateInterlaceDataWithoutStb( + tableName, batchPerTbl, + tableSeq, + pThreadInfo->db_name, pstr, + insertRows, + startTime, + &remainderBufLen); + } + } + + debugPrint("[%d] %s() LN%d, generated records is %d\n", pThreadInfo->threadID, __func__, __LINE__, generated); if (generated < 0) { - errorPrint("[%d] %s() LN%d, generated records is %"PRId64"\n", + errorPrint("[%d] %s() LN%d, generated records is %d\n", pThreadInfo->threadID, __func__, __LINE__, generated); goto free_of_interlace; } else if (generated == 0) { @@ -5298,15 +5824,15 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { tableSeq ++; recOfBatch += batchPerTbl; + pstr += (oldRemainderLen - remainderBufLen); -// startTime += batchPerTbl * superTblInfo->timeStampStep; pThreadInfo->totalInsertRows += batchPerTbl; - verbosePrint("[%d] %s() LN%d batchPerTbl=%"PRId64" recOfBatch=%"PRId64"\n", + + verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", pThreadInfo->threadID, __func__, __LINE__, batchPerTbl, recOfBatch); - if (insertMode == INTERLACE_INSERT_MODE) { - if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { + if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { // turn to first table tableSeq = pThreadInfo->start_table_from; generatedRecPerTbl += batchPerTbl; @@ -5318,13 +5844,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { if (generatedRecPerTbl >= insertRows) break; - int remainRows = insertRows - generatedRecPerTbl; + int64_t remainRows = insertRows - generatedRecPerTbl; if ((remainRows > 0) && (batchPerTbl > remainRows)) batchPerTbl = remainRows; if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR) break; - } } verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n", @@ -5335,22 +5860,22 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { break; } - verbosePrint("[%d] %s() LN%d recOfBatch=%"PRIu64" totalInsertRows=%"PRIu64"\n", + verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n", pThreadInfo->threadID, __func__, __LINE__, recOfBatch, pThreadInfo->totalInsertRows); verbosePrint("[%d] %s() LN%d, buffer=%s\n", - pThreadInfo->threadID, __func__, __LINE__, buffer); + pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer); startTs = taosGetTimestampMs(); if (recOfBatch == 0) { - errorPrint("[%d] %s() LN%d try inserting records of batch is %"PRIu64"\n", + errorPrint("[%d] %s() LN%d try inserting records of batch is %d\n", pThreadInfo->threadID, __func__, __LINE__, recOfBatch); errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n"); goto free_of_interlace; } - int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch); + int64_t affectedRows = execInsert(pThreadInfo, recOfBatch); endTs = taosGetTimestampMs(); uint64_t delay = endTs - startTs; @@ -5366,9 +5891,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->totalDelay += delay; if (recOfBatch != affectedRows) { - errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n", + errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n", pThreadInfo->threadID, __func__, __LINE__, - recOfBatch, affectedRows, buffer); + recOfBatch, affectedRows, pThreadInfo->buffer); goto free_of_interlace; } @@ -5387,8 +5912,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { et = taosGetTimestampMs(); if (insert_interval > (et - st) ) { - int sleepTime = insert_interval - (et -st); - performancePrint("%s() LN%d sleep: %d ms for insert interval\n", + uint64_t sleepTime = insert_interval - (et -st); + performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n", __func__, __LINE__, sleepTime); taosMsleep(sleepTime); // ms sleepTimeTotal += insert_interval; @@ -5397,27 +5922,26 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { } free_of_interlace: - tmfree(buffer); + tmfree(pThreadInfo->buffer); printStatPerThread(pThreadInfo); return NULL; } -// sync insertion -/* - 1 thread: 100 tables * 2000 rows/s - 1 thread: 10 tables * 20000 rows/s - 6 thread: 300 tables * 2000 rows/s - - 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s -*/ +// sync insertion progressive data static void* syncWriteProgressive(threadInfo *pThreadInfo) { debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); SSuperTable* superTblInfo = pThreadInfo->superTblInfo; uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; + int64_t timeStampStep = + superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; + int64_t insertRows = + (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; + verbosePrint("%s() LN%d insertRows=%"PRId64"\n", + __func__, __LINE__, insertRows); - char* buffer = calloc(maxSqlLen, 1); - if (NULL == buffer) { + pThreadInfo->buffer = calloc(maxSqlLen, 1); + if (NULL == pThreadInfo->buffer) { errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n", maxSqlLen, strerror(errno)); @@ -5428,35 +5952,17 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { uint64_t startTs = taosGetTimestampMs(); uint64_t endTs; - int64_t timeStampStep = - superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; -/* int insert_interval = - superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; - uint64_t st = 0; - uint64_t et = 0xffffffff; - */ - pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; pThreadInfo->samplePos = 0; - for (uint64_t tableSeq = - pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to; - tableSeq ++) { + for (uint64_t tableSeq = pThreadInfo->start_table_from; + tableSeq <= pThreadInfo->end_table_to; + tableSeq ++) { int64_t start_time = pThreadInfo->start_time; - int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; - - verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); - for (uint64_t i = 0; i < insertRows;) { - /* - if (insert_interval) { - st = taosGetTimestampMs(); - } - */ - char tableName[TSDB_TABLE_NAME_LEN]; getTableName(tableName, pThreadInfo, tableSeq); verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n", @@ -5464,19 +5970,57 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pThreadInfo->threadID, tableSeq, tableName); int64_t remainderBufLen = maxSqlLen; - char *pstr = buffer; - int nInsertBufLen = strlen("insert into "); + char *pstr = pThreadInfo->buffer; - int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into "); + int len = snprintf(pstr, + strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); pstr += len; remainderBufLen -= len; - int64_t generated = generateProgressiveDataBuffer( - tableName, tableSeq, pThreadInfo, pstr, insertRows, - i, start_time, - &(pThreadInfo->samplePos), - &remainderBufLen); + int32_t generated; + if (superTblInfo) { + if (superTblInfo->iface == STMT_IFACE) { +#if STMT_IFACE_ENABLED == 1 + generated = prepareStbStmt( + superTblInfo, + pThreadInfo->stmt, + tableName, + g_args.num_of_RPR, + insertRows, i, start_time, pstr); +#else + generated = -1; +#endif + } else { + generated = generateStbProgressiveData( + superTblInfo, + tableName, tableSeq, pThreadInfo->db_name, pstr, + insertRows, i, start_time, + &(pThreadInfo->samplePos), + &remainderBufLen); + } + } else { + if (g_args.iface == STMT_IFACE) { +#if STMT_IFACE_ENABLED == 1 + generated = prepareStmtWithoutStb( + pThreadInfo->stmt, + tableName, + g_args.num_of_RPR, + insertRows, i, + start_time); +#else + generated = -1; +#endif + } else { + generated = generateProgressiveDataWithoutStb( + tableName, + /* tableSeq, */ + pThreadInfo, pstr, insertRows, + i, start_time, + /* &(pThreadInfo->samplePos), */ + &remainderBufLen); + } + } if (generated > 0) i += generated; else @@ -5487,13 +6031,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { startTs = taosGetTimestampMs(); - int64_t affectedRows = execInsert(pThreadInfo, buffer, generated); + int32_t affectedRows = execInsert(pThreadInfo, generated); endTs = taosGetTimestampMs(); uint64_t delay = endTs - startTs; performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n", __func__, __LINE__, delay); - verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n", + verbosePrint("[%d] %s() LN%d affectedRows=%d\n", pThreadInfo->threadID, __func__, __LINE__, affectedRows); @@ -5503,7 +6047,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pThreadInfo->totalDelay += delay; if (affectedRows < 0) { - errorPrint("%s() LN%d, affected rows: %"PRId64"\n", + errorPrint("%s() LN%d, affected rows: %d\n", __func__, __LINE__, affectedRows); goto free_of_progressive; } @@ -5521,32 +6065,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { if (i >= insertRows) break; -/* - if (insert_interval) { - et = taosGetTimestampMs(); - - if (insert_interval > ((et - st)) ) { - int sleep_time = insert_interval - (et -st); - performancePrint("%s() LN%d sleep: %d ms for insert interval\n", - __func__, __LINE__, sleep_time); - taosMsleep(sleep_time); // ms - } - } - */ } // num_of_DPT - if (g_args.verbose_print) { - if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo && + if ((g_args.verbose_print) && + (tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) && (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample")))) { verbosePrint("%s() LN%d samplePos=%"PRId64"\n", __func__, __LINE__, pThreadInfo->samplePos); - } } } // tableSeq free_of_progressive: - tmfree(buffer); + tmfree(pThreadInfo->buffer); printStatPerThread(pThreadInfo); return NULL; } @@ -5556,7 +6087,7 @@ static void* syncWrite(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int interlaceRows; + uint32_t interlaceRows; if (superTblInfo) { if ((superTblInfo->interlaceRows == 0) @@ -5576,7 +6107,6 @@ static void* syncWrite(void *sarg) { // progressive mode return syncWriteProgressive(pThreadInfo); } - } static void callBack(void *param, TAOS_RES *res, int code) { @@ -5616,9 +6146,10 @@ static void callBack(void *param, TAOS_RES *res, int code) { && rand_num < pThreadInfo->superTblInfo->disorderRatio) { int64_t d = pThreadInfo->lastTs - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1); - generateRowData(data, d, pThreadInfo->superTblInfo); + generateStbRowData(pThreadInfo->superTblInfo, data, d); } else { - generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo); + generateStbRowData(pThreadInfo->superTblInfo, + data, pThreadInfo->lastTs += 1000); } pstr += sprintf(pstr, "%s", data); pThreadInfo->counter++; @@ -5686,24 +6217,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * static void startMultiThreadInsertData(int threads, char* db_name, char* precision,SSuperTable* superTblInfo) { - pthread_t *pids = malloc(threads * sizeof(pthread_t)); - assert(pids != NULL); - - threadInfo *infos = malloc(threads * sizeof(threadInfo)); - assert(infos != NULL); - - memset(pids, 0, threads * sizeof(pthread_t)); - memset(infos, 0, threads * sizeof(threadInfo)); - - //TAOS* taos; - //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { - // taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); - // if (NULL == taos) { - // printf("connect to server fail, reason: %s\n", taos_errstr(NULL)); - // exit(-1); - // } - //} - int32_t timePrec = TSDB_TIME_PRECISION_MILLI; if (0 != precision[0]) { if (0 == strncasecmp(precision, "ms", 2)) { @@ -5755,17 +6268,17 @@ static void startMultiThreadInsertData(int threads, char* db_name, } } - TAOS* taos = taos_connect( + TAOS* taos0 = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); - if (NULL == taos) { + if (NULL == taos0) { errorPrint("%s() LN%d, connect to server fail , reason: %s\n", __func__, __LINE__, taos_errstr(NULL)); exit(-1); } int64_t ntables = 0; - uint64_t startFrom; + uint64_t tableFrom; if (superTblInfo) { int64_t limit; @@ -5792,7 +6305,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, } ntables = limit; - startFrom = offset; + tableFrom = offset; if ((superTblInfo->childTblExists != TBL_NO_EXISTS) && ((superTblInfo->childTblOffset + superTblInfo->childTblLimit ) @@ -5811,23 +6324,23 @@ static void startMultiThreadInsertData(int threads, char* db_name, limit * TSDB_TABLE_NAME_LEN); if (superTblInfo->childTblName == NULL) { errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); - taos_close(taos); + taos_close(taos0); exit(-1); } int64_t childTblCount; getChildNameOfSuperTableWithLimitAndOffset( - taos, + taos0, db_name, superTblInfo->sTblName, &superTblInfo->childTblName, &childTblCount, limit, offset); } else { ntables = g_args.num_of_tables; - startFrom = 0; + tableFrom = 0; } - taos_close(taos); + taos_close(taos0); int64_t a = ntables / threads; if (a < 1) { @@ -5841,11 +6354,22 @@ static void startMultiThreadInsertData(int threads, char* db_name, } if ((superTblInfo) - && (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest")))) { - if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) - exit(-1); + && (superTblInfo->iface == REST_IFACE)) { + if (convertHostToServAddr( + g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) { + exit(-1); + } } + pthread_t *pids = malloc(threads * sizeof(pthread_t)); + assert(pids != NULL); + + threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); + assert(infos != NULL); + + memset(pids, 0, threads * sizeof(pthread_t)); + memset(infos, 0, threads * sizeof(threadInfo)); + for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infos + i; pThreadInfo->threadID = i; @@ -5857,17 +6381,59 @@ static void startMultiThreadInsertData(int threads, char* db_name, pThreadInfo->minDelay = UINT64_MAX; if ((NULL == superTblInfo) || - (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { - //pThreadInfo->taos = taos; + (superTblInfo->iface != REST_IFACE)) { + //t_info->taos = taos; pThreadInfo->taos = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == pThreadInfo->taos) { errorPrint( - "connect to server fail from insert sub thread, reason: %s\n", + "%s() LN%d, connect to server fail from insert sub thread, reason: %s\n", + __func__, __LINE__, taos_errstr(NULL)); + free(infos); exit(-1); } + + if ((g_args.iface == STMT_IFACE) + || ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) { + + int columnCount; + if (superTblInfo) { + columnCount = superTblInfo->columnCount; + } else { + columnCount = g_args.num_of_CPR; + } + + pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); + if (NULL == pThreadInfo->stmt) { + errorPrint( + "%s() LN%d, failed init stmt, reason: %s\n", + __func__, __LINE__, + taos_errstr(NULL)); + free(pids); + free(infos); + exit(-1); + } + + char buffer[3000]; + char *pstr = buffer; + pstr += sprintf(pstr, "INSERT INTO ? values(?"); + + for (int col = 0; col < columnCount; col ++) { + pstr += sprintf(pstr, ",?"); + } + pstr += sprintf(pstr, ")"); + + int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0); + if (ret != 0){ + errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", + ret, taos_errstr(NULL)); + free(pids); + free(infos); + exit(-1); + } + } } else { pThreadInfo->taos = NULL; } @@ -5875,10 +6441,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, /* if ((NULL == superTblInfo) || (0 == superTblInfo->multiThreadWriteOneTbl)) { */ - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; /* } else { pThreadInfo->start_table_from = 0; pThreadInfo->ntables = superTblInfo->childTblCount; @@ -5906,6 +6472,11 @@ static void startMultiThreadInsertData(int threads, char* db_name, for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infos + i; + tsem_destroy(&(pThreadInfo->lock_sem)); + + if (pThreadInfo->stmt) { + taos_stmt_close(pThreadInfo->stmt); + } tsem_destroy(&(pThreadInfo->lock_sem)); taos_close(pThreadInfo->taos); @@ -6182,13 +6753,13 @@ static int insertTestProcess() { } } - taosMsleep(1000); + // taosMsleep(1000); // create sub threads for inserting data //start = taosGetTimestampMs(); for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.use_metric) { if (g_Dbs.db[i].superTblCount > 0) { - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; @@ -6512,15 +7083,15 @@ static int queryTestProcess() { b = ntables % threads; } - uint64_t startFrom = 0; + uint64_t tableFrom = 0; for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infosOfSub + i; pThreadInfo->threadID = i; - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo); } @@ -6901,12 +7472,12 @@ static int subscribeTestProcess() { //==== create threads for query for specified table if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { - debugPrint("%s() LN%d, sepcified query sqlCount %d.\n", + debugPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount); } else { if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) { - errorPrint("%s() LN%d, sepcified query sqlCount %d.\n", + errorPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount); exit(-1); @@ -6939,7 +7510,7 @@ static int subscribeTestProcess() { //==== create threads for super table query if (g_queryInfo.superQueryInfo.sqlCount <= 0) { - debugPrint("%s() LN%d, super table query sqlCount %d.\n", + debugPrint("%s() LN%d, super table query sqlCount %"PRIu64".\n", __func__, __LINE__, g_queryInfo.superQueryInfo.sqlCount); } else { @@ -6975,17 +7546,17 @@ static int subscribeTestProcess() { } for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - uint64_t startFrom = 0; + uint64_t tableFrom = 0; for (int j = 0; j < threads; j++) { uint64_t seq = i * threads + j; threadInfo *pThreadInfo = infosOfStable + seq; pThreadInfo->threadID = seq; pThreadInfo->querySeq = i; - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = jend_table_to = jend_table_to + 1; + pThreadInfo->end_table_to = jend_table_to + 1; pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pidsOfStable + seq, NULL, superSubscribe, pThreadInfo); @@ -7104,7 +7675,7 @@ static void setParaFromArg(){ tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, g_args.tb_prefix, MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE); - tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE); + g_Dbs.db[0].superTbls[0].iface = g_args.iface; tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; -- GitLab