diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index f54237306cbf05c32b50d6f8106c61d559ba8464..f1dd8975dce1ca7b3a57b2885763a82ec9a09675 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -769,6 +769,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC index = 0; sToken = tStrGetToken(sql, &index, false); + if (sToken.type == TK_ILLEGAL) { + return tscSQLSyntaxErrMsg(pCmd->payload, "unrecognized token", sToken.z); + } + if (sToken.type == TK_RP) { break; } diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 3c528ab26f5372d362c6cacdf195eefe5aebec96..e7b2d4f16b830c05be9ebef8f14768f7d0974444 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -19,6 +19,7 @@ */ #include +#include #define _GNU_SOURCE #define CURL_STATICLIB @@ -52,6 +53,8 @@ #include "taoserror.h" #include "tutil.h" +#define STMT_IFACE_ENABLED 0 + #define REQ_EXTRA_BUF_LEN 1024 #define RESP_BUF_LEN 4096 @@ -61,6 +64,8 @@ extern char configDir[]; #define QUERY_JSON_NAME "query.json" #define SUBSCRIBE_JSON_NAME "subscribe.json" +#define STR_INSERT_INTO "INSERT INTO " + enum TEST_MODE { INSERT_TEST, // 0 QUERY_TEST, // 1 @@ -70,6 +75,8 @@ enum TEST_MODE { #define MAX_RECORDS_PER_REQ 32766 +#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into .. + #define MAX_SQL_SIZE 65536 #define BUFFER_SIZE (65536*2) #define COND_BUF_LEN BUFFER_SIZE - 30 @@ -120,17 +127,24 @@ enum enumSYNC_MODE { MODE_BUT }; +enum enum_TAOS_INTERFACE { + TAOSC_IFACE, + REST_IFACE, + STMT_IFACE, + INTERFACE_BUT +}; + typedef enum enumQUERY_CLASS { SPECIFIED_CLASS, STABLE_CLASS, CLASS_BUT } QUERY_CLASS; -typedef enum enum_INSERT_MODE { +typedef enum enum_PROGRESSIVE_OR_INTERLACE { PROGRESSIVE_INSERT_MODE, INTERLACE_INSERT_MODE, INVALID_INSERT_MODE -} INSERT_MODE; +} PROG_OR_INTERLACE_MODE; typedef enum enumQUERY_TYPE { NO_INSERT_TYPE, @@ -196,6 +210,7 @@ typedef struct SArguments_S { uint32_t test_mode; char * host; uint16_t port; + uint16_t iface; char * user; char * password; char * database; @@ -217,13 +232,13 @@ typedef struct SArguments_S { uint32_t num_of_threads; uint64_t insert_interval; int64_t query_times; - uint64_t interlace_rows; - uint64_t num_of_RPR; // num_of_records_per_req + uint32_t interlace_rows; + uint32_t num_of_RPR; // num_of_records_per_req uint64_t max_sql_len; int64_t num_of_tables; int64_t num_of_DPT; int abort; - int disorderRatio; // 0: no disorder, >0: x% + uint32_t disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision uint32_t method_of_delete; char ** arg_list; @@ -240,18 +255,19 @@ typedef struct SColumn_S { typedef struct SSuperTable_S { char sTblName[MAX_TB_NAME_SIZE+1]; + char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample + char childTblPrefix[MAX_TB_NAME_SIZE]; + char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest + uint16_t childTblExists; int64_t childTblCount; - bool childTblExists; // 0: no, 1: yes uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table - char childTblPrefix[MAX_TB_NAME_SIZE]; - char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample - char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest + uint16_t iface; // 0: taosc, 1: rest, 2: stmt int64_t childTblLimit; uint64_t childTblOffset; // int multiThreadWriteOneTbl; // 0: no, 1: yes - uint64_t interlaceRows; // + uint32_t interlaceRows; // int disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision uint64_t maxSqlLen; // @@ -420,6 +436,7 @@ typedef struct SQueryMetaInfo_S { typedef struct SThreadInfo_S { TAOS * taos; + TAOS_STMT *stmt; int threadID; char db_name[MAX_DB_NAME_SIZE+1]; uint32_t time_precision; @@ -434,6 +451,7 @@ typedef struct SThreadInfo_S { char* cols; bool use_metric; SSuperTable* superTblInfo; + char *buffer; // sql cmd buffer // for async insert tsem_t lock_sem; @@ -541,7 +559,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, char* sqlstr, threadInfo *pThreadInfo); static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, - int disorderRatio, int disorderRange); + int disorderRatio, int disorderRange); /* ************ Global variables ************ */ @@ -557,6 +575,7 @@ SArguments g_args = { 0, // test_mode "127.0.0.1", // host 6030, // port + TAOSC_IFACE, // iface "root", // user #ifdef _TD_POWER_ "powerdb", // password @@ -673,6 +692,8 @@ static void printHelp() { "The host to connect to TDengine. Default is localhost."); printf("%s%s%s%s\n", indent, "-p", indent, "The TCP/IP port number to use for the connection. Default is 0."); + printf("%s%s%s%s\n", indent, "-I", indent, + "The interface (taosc, rest, and stmt) taosdemo uses. Default is 'taosc'."); printf("%s%s%s%s\n", indent, "-d", indent, "Destination database. Default is 'test'."); printf("%s%s%s%s\n", indent, "-a", indent, @@ -761,6 +782,23 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { exit(EXIT_FAILURE); } arguments->port = atoi(argv[++i]); + } else if (strcmp(argv[i], "-I") == 0) { + if (argc == i+1) { + printHelp(); + errorPrint("%s", "\n\t-I need a valid string following!\n"); + exit(EXIT_FAILURE); + } + ++i; + if (0 == strcasecmp(argv[i], "taosc")) { + arguments->iface = TAOSC_IFACE; + } else if (0 == strcasecmp(argv[i], "rest")) { + arguments->iface = REST_IFACE; + } else if (0 == strcasecmp(argv[i], "stmt")) { + arguments->iface = STMT_IFACE; + } else { + errorPrint("%s", "\n\t-I need a valid string following!\n"); + exit(EXIT_FAILURE); + } } else if (strcmp(argv[i], "-u") == 0) { if (argc == i+1) { printHelp(); @@ -793,7 +831,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { if ((argc == i+1) || (!isStringNumber(argv[i+1]))) { printHelp(); - errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, 1: ASYNC. Default is SYNC.\n"); + errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, not-0: ASYNC. Default is SYNC.\n"); exit(EXIT_FAILURE); } arguments->async_mode = atoi(argv[++i]); @@ -897,6 +935,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { && strcasecmp(argv[i], "BIGINT") && strcasecmp(argv[i], "DOUBLE") && strcasecmp(argv[i], "BINARY") + && strcasecmp(argv[i], "TIMESTAMP") && strcasecmp(argv[i], "NCHAR")) { printHelp(); errorPrint("%s", "-b: Invalid data_type!\n"); @@ -918,6 +957,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { && strcasecmp(token, "BIGINT") && strcasecmp(token, "DOUBLE") && strcasecmp(token, "BINARY") + && strcasecmp(token, "TIMESTAMP") && strcasecmp(token, "NCHAR")) { printHelp(); free(g_dupstr); @@ -1019,6 +1059,19 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } } + int columnCount; + for (columnCount = 0; columnCount < MAX_NUM_DATATYPE; columnCount ++) { + if (g_args.datatype[columnCount] == NULL) { + break; + } + } + + if (0 == columnCount) { + perror("data type error!"); + exit(-1); + } + g_args.num_of_CPR = columnCount; + if (((arguments->debug_print) && (arguments->metaFile == NULL)) || arguments->verbose_print) { printf("###################################################################\n"); @@ -1028,7 +1081,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->port ); printf("# User: %s\n", arguments->user); printf("# Password: %s\n", arguments->password); - printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false"); + printf("# Use metric: %s\n", + arguments->use_metric ? "true" : "false"); if (*(arguments->datatype)) { printf("# Specified data type: "); for (int i = 0; i < MAX_NUM_DATATYPE; i++) @@ -1040,7 +1094,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } printf("# Insertion interval: %"PRIu64"\n", arguments->insert_interval); - printf("# Number of records per req: %"PRIu64"\n", + printf("# Number of records per req: %u\n", arguments->num_of_RPR); printf("# Max SQL length: %"PRIu64"\n", arguments->max_sql_len); @@ -1068,8 +1122,6 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { } static bool getInfoFromJsonFile(char* file); -//static int generateOneRowDataForStb(SSuperTable* stbInfo); -//static int getDataIntoMemForStb(SSuperTable* stbInfo); static void init_rand_data(); static void tmfclose(FILE *fp) { if (NULL != fp) { @@ -1088,7 +1140,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { TAOS_RES *res = NULL; int32_t code = -1; - for (i = 0; i < 5; i++) { + for (i = 0; i < 5 /* retry */; i++) { if (NULL != res) { taos_free_result(res); res = NULL; @@ -1104,7 +1156,8 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { verbosePrint("%s() LN%d - command: %s\n", __func__, __LINE__, command); if (code != 0) { if (!quiet) { - errorPrint("Failed to execute %s, reason: %s\n", command, taos_errstr(res)); + errorPrint("Failed to execute %s, reason: %s\n", + command, taos_errstr(res)); } taos_free_result(res); //taos_close(taos); @@ -1320,6 +1373,8 @@ static void init_rand_data() { static int printfInsertMeta() { SHOW_PARSE_RESULT_START(); + printf("interface: \033[33m%s\033[0m\n", + (g_args.iface==TAOSC_IFACE)?"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt"); printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); printf("user: \033[33m%s\033[0m\n", g_Dbs.user); @@ -1331,7 +1386,7 @@ static int printfInsertMeta() { g_Dbs.threadCountByCreateTbl); printf("top insert interval: \033[33m%"PRIu64"\033[0m\n", g_args.insert_interval); - printf("number of records per req: \033[33m%"PRIu64"\033[0m\n", + printf("number of records per req: \033[33m%u\033[0m\n", g_args.num_of_RPR); printf("max sql length: \033[33m%"PRIu64"\033[0m\n", g_args.max_sql_len); @@ -1417,7 +1472,8 @@ static int printfInsertMeta() { if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { printf(" autoCreateTable: \033[33m%s\033[0m\n", "no"); - } else if (AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { + } else if (AUTO_CREATE_SUBTBL == + g_Dbs.db[i].superTbls[j].autoCreateTable) { printf(" autoCreateTable: \033[33m%s\033[0m\n", "yes"); } else { printf(" autoCreateTable: \033[33m%s\033[0m\n", "error"); @@ -1437,8 +1493,9 @@ static int printfInsertMeta() { g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); - printf(" insertMode: \033[33m%s\033[0m\n", - g_Dbs.db[i].superTbls[j].insertMode); + printf(" iface: \033[33m%s\033[0m\n", + (g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc": + (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt"); if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) { printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].childTblLimit); @@ -1456,7 +1513,7 @@ static int printfInsertMeta() { printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); } */ - printf(" interlaceRows: \033[33m%"PRIu64"\033[0m\n", + printf(" interlaceRows: \033[33m%u\033[0m\n", g_Dbs.db[i].superTbls[j].interlaceRows); if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) { @@ -1534,7 +1591,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile); fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount); fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl); - fprintf(fp, "number of records per req: %"PRIu64"\n", g_args.num_of_RPR); + fprintf(fp, "number of records per req: %u\n", g_args.num_of_RPR); fprintf(fp, "max sql length: %"PRIu64"\n", g_args.max_sql_len); fprintf(fp, "database count: %d\n", g_Dbs.dbCount); @@ -1626,11 +1683,12 @@ static void printfInsertMetaToFile(FILE* fp) { g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); - fprintf(fp, " insertMode: %s\n", - g_Dbs.db[i].superTbls[j].insertMode); + fprintf(fp, " iface: %s\n", + (g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc": + (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt"); fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); - fprintf(fp, " interlace rows: %"PRIu64"\n", + fprintf(fp, " interlace rows: %u\n", g_Dbs.db[i].superTbls[j].interlaceRows); if (g_Dbs.db[i].superTbls[j].interlaceRows > 0) { fprintf(fp, " stable insert interval: %"PRIu64"\n", @@ -1643,7 +1701,7 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " multiThreadWriteOneTbl: yes\n"); } */ - fprintf(fp, " interlaceRows: %"PRIu64"\n", + fprintf(fp, " interlaceRows: %u\n", g_Dbs.db[i].superTbls[j].interlaceRows); fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); @@ -2803,7 +2861,7 @@ static int createDatabasesAndStables() { int validStbCount = 0; - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); ret = queryDbExec(taos, command, NO_INSERT_TYPE, true); @@ -2813,7 +2871,7 @@ static int createDatabasesAndStables() { &g_Dbs.db[i].superTbls[j]); if (0 != ret) { - errorPrint("create super table %d failed!\n\n", j); + errorPrint("create super table %"PRIu64" failed!\n\n", j); continue; } } @@ -2841,7 +2899,7 @@ static void* createTable(void *sarg) threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int64_t lastPrintTime = taosGetTimestampMs(); + uint64_t lastPrintTime = taosGetTimestampMs(); int buff_len; buff_len = BUFFER_SIZE / 8; @@ -2915,7 +2973,7 @@ static void* createTable(void *sarg) return NULL; } - int64_t currentPrintTime = taosGetTimestampMs(); + uint64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n", pThreadInfo->threadID, pThreadInfo->start_table_from, i); @@ -2934,11 +2992,11 @@ static void* createTable(void *sarg) } static int startMultiThreadCreateChildTable( - char* cols, int threads, uint64_t startFrom, int64_t ntables, + char* cols, int threads, uint64_t tableFrom, int64_t ntables, char* db_name, SSuperTable* superTblInfo) { pthread_t *pids = malloc(threads * sizeof(pthread_t)); - threadInfo *infos = malloc(threads * sizeof(threadInfo)); + threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { printf("malloc failed\n"); @@ -2978,10 +3036,10 @@ static int startMultiThreadCreateChildTable( return -1; } - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; pThreadInfo->use_metric = true; pThreadInfo->cols = cols; pThreadInfo->minDelay = UINT64_MAX; @@ -3007,61 +3065,61 @@ static void createChildTables() { char tblColsBuf[MAX_SQL_SIZE]; int len; - for (int i = 0; i < g_Dbs.dbCount; i++) { - if (g_Dbs.use_metric) { - if (g_Dbs.db[i].superTblCount > 0) { - // with super table - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) - || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { - continue; - } + for (int i = 0; i < g_Dbs.dbCount; i++) { + if (g_Dbs.use_metric) { + if (g_Dbs.db[i].superTblCount > 0) { + // with super table + for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) + || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { + continue; + } + verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); + uint64_t startFrom = 0; + g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; + + verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n", + __func__, __LINE__, g_totalChildTables, startFrom); + + startMultiThreadCreateChildTable( + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, + g_Dbs.threadCountByCreateTbl, + startFrom, + g_Dbs.db[i].superTbls[j].childTblCount, + g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); + } + } + } else { + // normal table + len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP"); + for (int j = 0; j < g_args.num_of_CPR; j++) { + if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0) + || (strncasecmp(g_args.datatype[j], + "NCHAR", strlen("NCHAR")) == 0)) { + snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, + ", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary); + } else { + snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, + ", COL%d %s", j, g_args.datatype[j]); + } + len = strlen(tblColsBuf); + } - verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, - g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); - uint64_t startFrom = 0; - g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; - - verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n", - __func__, __LINE__, g_totalChildTables, startFrom); - startMultiThreadCreateChildTable( - g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, - g_Dbs.threadCountByCreateTbl, - startFrom, - g_Dbs.db[i].superTbls[j].childTblCount, - g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); + snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); + + verbosePrint("%s() LN%d: dbName: %s num of tb: %"PRId64" schema: %s\n", + __func__, __LINE__, + g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); + startMultiThreadCreateChildTable( + tblColsBuf, + g_Dbs.threadCountByCreateTbl, + 0, + g_args.num_of_tables, + g_Dbs.db[i].dbName, + NULL); } - } - } else { - // normal table - len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP"); - for (int j = 0; j < g_args.num_of_CPR; j++) { - if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0) - || (strncasecmp(g_args.datatype[j], - "NCHAR", strlen("NCHAR")) == 0)) { - snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, - ", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary); - } else { - snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, - ", COL%d %s", j, g_args.datatype[j]); - } - len = strlen(tblColsBuf); - } - - snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); - - verbosePrint("%s() LN%d: dbName: %s num of tb: %"PRId64" schema: %s\n", - __func__, __LINE__, - g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); - startMultiThreadCreateChildTable( - tblColsBuf, - g_Dbs.threadCountByCreateTbl, - 0, - g_args.num_of_tables, - g_Dbs.db[i].dbName, - NULL); } - } } /* @@ -3132,10 +3190,12 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { return 0; } +#if 0 int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) { // TODO return 0; } +#endif /* Read 10000 lines at most. If more than 10000 lines, continue to read after using @@ -3520,9 +3580,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { // rows per table need be less than insert batch if (g_args.interlace_rows > g_args.num_of_RPR) { - printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n", + printf("NOTICE: interlace rows value %u > num_of_records_per_req %u\n\n", g_args.interlace_rows, g_args.num_of_RPR); - printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", + printf(" interlace rows value will be set to num_of_records_per_req %u\n\n", g_args.num_of_RPR); prompt(); g_args.interlace_rows = g_args.num_of_RPR; @@ -3753,36 +3813,40 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { // dbinfo cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name"); - if (!stbName || stbName->type != cJSON_String || stbName->valuestring == NULL) { + if (!stbName || stbName->type != cJSON_String + || stbName->valuestring == NULL) { errorPrint("%s() LN%d, failed to read json, stb name not found\n", __func__, __LINE__); goto PARSE_OVER; } - tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring, MAX_TB_NAME_SIZE); + tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring, + MAX_TB_NAME_SIZE); cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix"); if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) { printf("ERROR: failed to read json, childtable_prefix not found\n"); goto PARSE_OVER; } - tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, MAX_DB_NAME_SIZE); + tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, + MAX_DB_NAME_SIZE); - cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table"); // yes, no, null + cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table"); if (autoCreateTbl && autoCreateTbl->type == cJSON_String && autoCreateTbl->valuestring != NULL) { - if (0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) { - g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL; - } else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) { - g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; - } else { - g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; - } + if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) + && (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) { + g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL; + } else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) { + g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; + } else { + g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; + } } else if (!autoCreateTbl) { - g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; + g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; } else { - printf("ERROR: failed to read json, auto_create_table not found\n"); - goto PARSE_OVER; + printf("ERROR: failed to read json, auto_create_table not found\n"); + goto PARSE_OVER; } cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num"); @@ -3816,6 +3880,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } + if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) { + g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; + } + cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count"); if (!count || count->type != cJSON_Number || 0 >= count->valueint) { errorPrint("%s() LN%d, failed to read json, childtable_count input mistake\n", @@ -3837,15 +3905,24 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON *insertMode = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest - if (insertMode && insertMode->type == cJSON_String - && insertMode->valuestring != NULL) { - tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, - insertMode->valuestring, MAX_DB_NAME_SIZE); - } else if (!insertMode) { - tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE); + cJSON *stbIface = cJSON_GetObjectItem(stbInfo, "insert_mode"); // taosc , rest, stmt + if (stbIface && stbIface->type == cJSON_String + && stbIface->valuestring != NULL) { + if (0 == strcasecmp(stbIface->valuestring, "taosc")) { + g_Dbs.db[i].superTbls[j].iface= TAOSC_IFACE; + } else if (0 == strcasecmp(stbIface->valuestring, "rest")) { + g_Dbs.db[i].superTbls[j].iface= REST_IFACE; + } else if (0 == strcasecmp(stbIface->valuestring, "stmt")) { + g_Dbs.db[i].superTbls[j].iface= STMT_IFACE; + } else { + errorPrint("%s() LN%d, failed to read json, insert_mode %s not recognized\n", + __func__, __LINE__, stbIface->valuestring); + goto PARSE_OVER; + } + } else if (!stbIface) { + g_Dbs.db[i].superTbls[j].iface = TAOSC_IFACE; } else { - printf("ERROR: failed to read json, insert_mode not found\n"); + errorPrint("%s", "failed to read json, insert_mode not found\n"); goto PARSE_OVER; } @@ -3864,7 +3941,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset"); if ((childTbl_offset) && (g_Dbs.db[i].drop != true) && (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) { - if (childTbl_offset->type != cJSON_Number || 0 > childTbl_offset->valueint) { + if ((childTbl_offset->type != cJSON_Number) + || (0 > childTbl_offset->valueint)) { printf("ERROR: failed to read json, childtable_offset\n"); goto PARSE_OVER; } @@ -3920,7 +3998,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file"); - if (tagsFile && tagsFile->type == cJSON_String && tagsFile->valuestring != NULL) { + if ((tagsFile && tagsFile->type == cJSON_String) + && (tagsFile->valuestring != NULL)) { tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile, tagsFile->valuestring, MAX_FILE_NAME_LEN); if (0 == g_Dbs.db[i].superTbls[j].tagsFile[0]) { @@ -3936,9 +4015,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } - cJSON* maxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len"); - if (maxSqlLen && maxSqlLen->type == cJSON_Number) { - int32_t len = maxSqlLen->valueint; + cJSON* stbMaxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len"); + if (stbMaxSqlLen && stbMaxSqlLen->type == cJSON_Number) { + int32_t len = stbMaxSqlLen->valueint; if (len > TSDB_MAX_ALLOWED_SQL_LEN) { len = TSDB_MAX_ALLOWED_SQL_LEN; } else if (len < 5) { @@ -3948,7 +4027,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } else if (!maxSqlLen) { g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len; } else { - errorPrint("%s() LN%d, failed to read json, maxSqlLen input mistake\n", + errorPrint("%s() LN%d, failed to read json, stbMaxSqlLen input mistake\n", __func__, __LINE__); goto PARSE_OVER; } @@ -3970,24 +4049,25 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } */ - cJSON* interlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows"); - if (interlaceRows && interlaceRows->type == cJSON_Number) { - if (interlaceRows->valueint < 0) { + cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows"); + if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) { + if (stbInterlaceRows->valueint < 0) { errorPrint("%s() LN%d, failed to read json, interlace rows input mistake\n", __func__, __LINE__); goto PARSE_OVER; } - g_Dbs.db[i].superTbls[j].interlaceRows = interlaceRows->valueint; + g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint; // rows per table need be less than insert batch if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) { - printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n", - i, j, g_Dbs.db[i].superTbls[j].interlaceRows, g_args.num_of_RPR); - printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", + printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > num_of_records_per_req %u\n\n", + i, j, g_Dbs.db[i].superTbls[j].interlaceRows, + g_args.num_of_RPR); + printf(" interlace rows value will be set to num_of_records_per_req %u\n\n", g_args.num_of_RPR); prompt(); g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR; } - } else if (!interlaceRows) { + } else if (!stbInterlaceRows) { g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req } else { errorPrint( @@ -4613,7 +4693,7 @@ static void prepareSampleData() { static void postFreeResource() { tmfclose(g_fpOfInsertResult); for (int i = 0; i < g_Dbs.dbCount; i++) { - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) { free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL; @@ -4661,16 +4741,22 @@ static int getRowDataFromSample( return dataLen; } -static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stbInfo) { +static int64_t generateStbRowData( + SSuperTable* stbInfo, + char* recBuf, int64_t timestamp) +{ int64_t dataLen = 0; char *pstr = recBuf; int64_t maxLen = MAX_DATA_SIZE; - dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "(%" PRId64 ",", timestamp); + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, + "(%" PRId64 ",", timestamp); for (int i = 0; i < stbInfo->columnCount; i++) { - if ((0 == strncasecmp(stbInfo->columns[i].dataType, "BINARY", strlen("BINARY"))) - || (0 == strncasecmp(stbInfo->columns[i].dataType, "NCHAR", strlen("NCHAR")))) { + if ((0 == strncasecmp(stbInfo->columns[i].dataType, + "BINARY", strlen("BINARY"))) + || (0 == strncasecmp(stbInfo->columns[i].dataType, + "NCHAR", strlen("NCHAR")))) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { errorPrint( "binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); @@ -4686,23 +4772,23 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\',", buf); tmfree(buf); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "INT", 3)) { + "INT", strlen("INT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d,", rand_int()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "BIGINT", 6)) { + "BIGINT", strlen("BIGINT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%"PRId64",", rand_bigint()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "FLOAT", 5)) { + "FLOAT", strlen("FLOAT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%f,", rand_float()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "DOUBLE", 6)) { + "DOUBLE", strlen("DOUBLE"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%f,", rand_double()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, - "SMALLINT", 8)) { + "SMALLINT", strlen("SMALLINT"))) { dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d,", rand_smallint()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, @@ -4732,46 +4818,38 @@ static int64_t generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stb } static int64_t generateData(char *recBuf, char **data_type, - int num_of_cols, int64_t timestamp, int lenOfBinary) { + int64_t timestamp, int lenOfBinary) { memset(recBuf, 0, MAX_DATA_SIZE); char *pstr = recBuf; pstr += sprintf(pstr, "(%" PRId64, timestamp); - int c = 0; - - for (; c < MAX_NUM_DATATYPE; c++) { - if (data_type[c] == NULL) { - break; - } - } - if (0 == c) { - perror("data type error!"); - exit(-1); - } + int columnCount = g_args.num_of_CPR; - for (int i = 0; i < c; i++) { - if (strcasecmp(data_type[i % c], "TINYINT") == 0) { + for (int i = 0; i < columnCount; i++) { + if (strcasecmp(data_type[i % columnCount], "TINYINT") == 0) { pstr += sprintf(pstr, ",%d", rand_tinyint() ); - } else if (strcasecmp(data_type[i % c], "SMALLINT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "SMALLINT") == 0) { pstr += sprintf(pstr, ",%d", rand_smallint()); - } else if (strcasecmp(data_type[i % c], "INT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "INT") == 0) { pstr += sprintf(pstr, ",%d", rand_int()); - } else if (strcasecmp(data_type[i % c], "BIGINT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "BIGINT") == 0) { + pstr += sprintf(pstr, ",%" PRId64, rand_bigint()); + } else if (strcasecmp(data_type[i % columnCount], "TIMESTAMP") == 0) { pstr += sprintf(pstr, ",%" PRId64, rand_bigint()); - } else if (strcasecmp(data_type[i % c], "FLOAT") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "FLOAT") == 0) { pstr += sprintf(pstr, ",%10.4f", rand_float()); - } else if (strcasecmp(data_type[i % c], "DOUBLE") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "DOUBLE") == 0) { double t = rand_double(); pstr += sprintf(pstr, ",%20.8f", t); - } else if (strcasecmp(data_type[i % c], "BOOL") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "BOOL") == 0) { bool b = rand_bool() & 1; pstr += sprintf(pstr, ",%s", b ? "true" : "false"); - } else if (strcasecmp(data_type[i % c], "BINARY") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "BINARY") == 0) { char *s = malloc(lenOfBinary); rand_string(s, lenOfBinary); pstr += sprintf(pstr, ",\"%s\"", s); free(s); - } else if (strcasecmp(data_type[i % c], "NCHAR") == 0) { + } else if (strcasecmp(data_type[i % columnCount], "NCHAR") == 0) { char *s = malloc(lenOfBinary); rand_string(s, lenOfBinary); pstr += sprintf(pstr, ",\"%s\"", s); @@ -4818,146 +4896,212 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { return 0; } -static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k) +static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { - int affectedRows; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + int32_t affectedRows; + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, - __func__, __LINE__, buffer); - if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { - affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false); - } else if (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest"))) { - if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, - buffer, NULL /* not set result file */)) { - affectedRows = -1; - printf("========restful return fail, threadID[%d]\n", - pThreadInfo->threadID); - } else { - affectedRows = k; - } - } else { - errorPrint("%s() LN%d: unknown insert mode: %s\n", - __func__, __LINE__, superTblInfo->insertMode); - affectedRows = 0; + verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, + __func__, __LINE__, pThreadInfo->buffer); + + uint16_t iface; + if (superTblInfo) + iface = superTblInfo->iface; + else + iface = g_args.iface; + + debugPrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, + __func__, __LINE__, + (g_args.iface==TAOSC_IFACE)? + "taosc":(g_args.iface==REST_IFACE)?"rest":"stmt"); + + switch(iface) { + case TAOSC_IFACE: + affectedRows = queryDbExec( + pThreadInfo->taos, + pThreadInfo->buffer, INSERT_TYPE, false); + break; + + case REST_IFACE: + if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, + pThreadInfo->buffer, NULL /* not set result file */)) { + affectedRows = -1; + printf("========restful return fail, threadID[%d]\n", + pThreadInfo->threadID); + } else { + affectedRows = k; + } + break; + + case STMT_IFACE: + debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt); + if (0 != taos_stmt_execute(pThreadInfo->stmt)) { + errorPrint("%s() LN%d, failied to execute insert statement\n", + __func__, __LINE__); + exit(-1); + } + affectedRows = k; + break; + + default: + errorPrint("%s() LN%d: unknown insert mode: %d\n", + __func__, __LINE__, superTblInfo->iface); + affectedRows = 0; } - } else { - affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false); - } - return affectedRows; + return affectedRows; } static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t tableSeq) { - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - if ((superTblInfo) - && (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable)) { - if (superTblInfo->childTblLimit > 0) { - snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", - superTblInfo->childTblName + - (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + if (superTblInfo) { + if (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable) { + if (superTblInfo->childTblLimit > 0) { + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", + superTblInfo->childTblName + + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); + } else { + verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, + pThreadInfo->ntables, tableSeq); + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", + superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); + } + } else { + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", + superTblInfo->childTblPrefix, tableSeq); + } } else { - - verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n", - pThreadInfo->threadID, __func__, __LINE__, - pThreadInfo->start_table_from, - pThreadInfo->ntables, tableSeq); - snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", - superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", + g_args.tb_prefix, tableSeq); } - } else { - snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"", - g_args.tb_prefix, tableSeq); - } } -static int64_t generateDataTail( - SSuperTable* superTblInfo, - uint64_t batch, char* buffer, int64_t remainderBufLen, int64_t insertRows, - uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, int64_t *dataLen) { - uint64_t len = 0; - uint32_t ncols_per_record = 1; // count first col ts +static int32_t generateDataTailWithoutStb( + uint32_t batch, char* buffer, + int64_t remainderBufLen, int64_t insertRows, + uint64_t recordFrom, int64_t startTime, + /* int64_t *pSamplePos, */int64_t *dataLen) { + uint64_t len = 0; char *pstr = buffer; - if (superTblInfo == NULL) { - uint32_t datatypeSeq = 0; - while(g_args.datatype[datatypeSeq]) { - datatypeSeq ++; - ncols_per_record ++; + verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch); + + int32_t k = 0; + for (k = 0; k < batch;) { + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + + int64_t retLen = 0; + + char **data_type = g_args.datatype; + int lenOfBinary = g_args.len_of_binary; + + retLen = generateData(data, data_type, + startTime + getTSRandTail( + (int64_t) DEFAULT_TIMESTAMP_STEP, k, + g_args.disorderRatio, + g_args.disorderRange), + lenOfBinary); + + if (len > remainderBufLen) + break; + + pstr += sprintf(pstr, "%s", data); + k++; + len += retLen; + remainderBufLen -= retLen; + + verbosePrint("%s() LN%d len=%"PRIu64" k=%d \nbuffer=%s\n", + __func__, __LINE__, len, k, buffer); + + recordFrom ++; + + if (recordFrom >= insertRows) { + break; } } - verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch); + *dataLen = len; + return k; +} + +static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, + int disorderRatio, int disorderRange) +{ + int64_t randTail = timeStampStep * seq; + if (disorderRatio > 0) { + int rand_num = taosRandom() % 100; + if(rand_num < disorderRatio) { + randTail = (randTail + + (taosRandom() % disorderRange + 1)) * (-1); + debugPrint("rand data generated, back %"PRId64"\n", randTail); + } + } + + return randTail; +} + +static int32_t generateStbDataTail( + SSuperTable* superTblInfo, + uint32_t batch, char* buffer, + int64_t remainderBufLen, int64_t insertRows, + uint64_t recordFrom, int64_t startTime, + int64_t *pSamplePos, int64_t *dataLen) { + uint64_t len = 0; + + char *pstr = buffer; bool tsRand; - if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, - "rand", strlen("rand")))) { - tsRand = true; - } else { - tsRand = false; + if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { + tsRand = true; + } else { + tsRand = false; } + verbosePrint("%s() LN%d batch=%u\n", __func__, __LINE__, batch); - uint64_t k = 0; + int32_t k = 0; for (k = 0; k < batch;) { char data[MAX_DATA_SIZE]; memset(data, 0, MAX_DATA_SIZE); int64_t retLen = 0; - if (superTblInfo) { - if (tsRand) { - retLen = generateRowData( - data, - startTime + getTSRandTail( - superTblInfo->timeStampStep, k, - superTblInfo->disorderRatio, - superTblInfo->disorderRange), - superTblInfo); - } else { - retLen = getRowDataFromSample( - data, - remainderBufLen, - startTime + superTblInfo->timeStampStep * k, - superTblInfo, - pSamplePos); - } - if (retLen > remainderBufLen) { - break; - } - - pstr += snprintf(pstr , retLen + 1, "%s", data); - k++; - len += retLen; - remainderBufLen -= retLen; - } else { - char **data_type = g_args.datatype; - int lenOfBinary = g_args.len_of_binary; - retLen = generateData(data, data_type, - ncols_per_record, + if (tsRand) { + retLen = generateStbRowData(superTblInfo, data, startTime + getTSRandTail( - DEFAULT_TIMESTAMP_STEP, k, - g_args.disorderRatio, - g_args.disorderRange), - lenOfBinary); - if (len > remainderBufLen) - break; + superTblInfo->timeStampStep, k, + superTblInfo->disorderRatio, + superTblInfo->disorderRange) + ); + } else { + retLen = getRowDataFromSample( + data, + remainderBufLen, + startTime + superTblInfo->timeStampStep * k, + superTblInfo, + pSamplePos); + } - pstr += sprintf(pstr, "%s", data); - k++; - len += retLen; - remainderBufLen -= retLen; + if (retLen > remainderBufLen) { + break; } - verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n", + pstr += snprintf(pstr , retLen + 1, "%s", data); + k++; + len += retLen; + remainderBufLen -= retLen; + + verbosePrint("%s() LN%d len=%"PRIu64" k=%u \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); - startFrom ++; + recordFrom ++; - if (startFrom >= insertRows) { + if (recordFrom >= insertRows) { break; } } @@ -4966,17 +5110,42 @@ static int64_t generateDataTail( return k; } -static int generateSQLHead(char *tableName, int32_t tableSeq, - threadInfo* pThreadInfo, SSuperTable* superTblInfo, + +static int generateSQLHeadWithoutStb(char *tableName, + char *dbName, char *buffer, int remainderBufLen) { int len; -#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into .. char headBuf[HEAD_BUFF_LEN]; - if (superTblInfo) { - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + len = snprintf( + headBuf, + HEAD_BUFF_LEN, + "%s.%s values", + dbName, + tableName); + + if (len > remainderBufLen) + return -1; + + tstrncpy(buffer, headBuf, len + 1); + + return len; +} + +static int generateStbSQLHead( + SSuperTable* superTblInfo, + char *tableName, int32_t tableSeq, + char *dbName, + char *buffer, int remainderBufLen) +{ + int len; + + char headBuf[HEAD_BUFF_LEN]; + + if ((AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) + && (TBL_ALREADY_EXISTS != superTblInfo->childTblExists)) { char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo, tableSeq); @@ -4995,9 +5164,9 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, headBuf, HEAD_BUFF_LEN, "%s.%s using %s.%s tags %s values", - pThreadInfo->db_name, + dbName, tableName, - pThreadInfo->db_name, + dbName, superTblInfo->sTblName, tagsValBuf); tmfree(tagsValBuf); @@ -5006,22 +5175,14 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, headBuf, HEAD_BUFF_LEN, "%s.%s values", - pThreadInfo->db_name, + dbName, tableName); } else { len = snprintf( headBuf, HEAD_BUFF_LEN, "%s.%s values", - pThreadInfo->db_name, - tableName); - } - } else { - len = snprintf( - headBuf, - HEAD_BUFF_LEN, - "%s.%s values", - pThreadInfo->db_name, + dbName, tableName); } @@ -5033,8 +5194,11 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, return len; } -static int64_t generateInterlaceDataBuffer( - char *tableName, uint64_t batchPerTbl, uint64_t i, uint64_t batchPerTblTimes, +static int32_t generateStbInterlaceData( + SSuperTable *superTblInfo, + char *tableName, uint32_t batchPerTbl, + uint64_t i, + uint32_t batchPerTblTimes, uint64_t tableSeq, threadInfo *pThreadInfo, char *buffer, int64_t insertRows, @@ -5043,10 +5207,11 @@ static int64_t generateInterlaceDataBuffer( { assert(buffer); char *pstr = buffer; - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, - superTblInfo, pstr, *pRemainderBufLen); + int headLen = generateStbSQLHead( + superTblInfo, + tableName, tableSeq, pThreadInfo->db_name, + pstr, *pRemainderBufLen); if (headLen <= 0) { return 0; @@ -5060,29 +5225,25 @@ static int64_t generateInterlaceDataBuffer( int64_t dataLen = 0; - verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%"PRIu64" batchPerTbl = %"PRIu64"\n", + verbosePrint("[%d] %s() LN%d i=%"PRIu64" batchPerTblTimes=%u batchPerTbl = %u\n", pThreadInfo->threadID, __func__, __LINE__, i, batchPerTblTimes, batchPerTbl); - if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { + if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { startTime = taosGetTimestamp(pThreadInfo->time_precision); - } - } else { - startTime = 1500000000000; } - int64_t k = generateDataTail( - superTblInfo, - batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, - startTime, - &(pThreadInfo->samplePos), &dataLen); + int32_t k = generateStbDataTail( + superTblInfo, + batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0, + startTime, + &(pThreadInfo->samplePos), &dataLen); if (k == batchPerTbl) { pstr += dataLen; *pRemainderBufLen -= dataLen; } else { - debugPrint("%s() LN%d, generated data tail: %"PRIu64", not equal batch per table: %"PRIu64"\n", + debugPrint("%s() LN%d, generated data tail: %u, not equal batch per table: %u\n", __func__, __LINE__, k, batchPerTbl); pstr -= headLen; pstr[0] = '\0'; @@ -5092,50 +5253,361 @@ static int64_t generateInterlaceDataBuffer( return k; } -static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq, - int disorderRatio, int disorderRange) +static int64_t generateInterlaceDataWithoutStb( + char *tableName, uint32_t batch, + uint64_t tableSeq, + char *dbName, char *buffer, + int64_t insertRows, + int64_t startTime, + uint64_t *pRemainderBufLen) { - int64_t randTail = timeStampStep * seq; - if (disorderRatio > 0) { - int rand_num = taosRandom() % 100; - if(rand_num < disorderRatio) { - randTail = (randTail + - (taosRandom() % disorderRange + 1)) * (-1); - debugPrint("rand data generated, back %"PRId64"\n", randTail); + assert(buffer); + char *pstr = buffer; + + int headLen = generateSQLHeadWithoutStb( + tableName, dbName, + pstr, *pRemainderBufLen); + + if (headLen <= 0) { + return 0; + } + + pstr += headLen; + *pRemainderBufLen -= headLen; + + int64_t dataLen = 0; + + int32_t k = generateDataTailWithoutStb( + batch, pstr, *pRemainderBufLen, insertRows, 0, + startTime, + &dataLen); + + if (k == batch) { + pstr += dataLen; + *pRemainderBufLen -= dataLen; + } else { + debugPrint("%s() LN%d, generated data tail: %d, not equal batch per table: %u\n", + __func__, __LINE__, k, batch); + pstr -= headLen; + pstr[0] = '\0'; + k = 0; + } + + return k; +} + +#if STMT_IFACE_ENABLED == 1 +static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, + char *dataType, int32_t dataLen, char **ptr) +{ + if (0 == strncasecmp(dataType, + "BINARY", strlen("BINARY"))) { + if (dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint( "binary length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + return -1; } + char *bind_binary = (char *)*ptr; + rand_string(bind_binary, dataLen); + + bind->buffer_type = TSDB_DATA_TYPE_BINARY; + bind->buffer_length = dataLen; + bind->buffer = bind_binary; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "NCHAR", strlen("NCHAR"))) { + if (dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint( "nchar length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + return -1; + } + char *bind_nchar = (char *)*ptr; + rand_string(bind_nchar, dataLen); + + bind->buffer_type = TSDB_DATA_TYPE_NCHAR; + bind->buffer_length = strlen(bind_nchar); + bind->buffer = bind_nchar; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "INT", strlen("INT"))) { + int32_t *bind_int = (int32_t *)*ptr; + + *bind_int = rand_int(); + bind->buffer_type = TSDB_DATA_TYPE_INT; + bind->buffer_length = sizeof(int32_t); + bind->buffer = bind_int; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "BIGINT", strlen("BIGINT"))) { + int64_t *bind_bigint = (int64_t *)*ptr; + + *bind_bigint = rand_bigint(); + bind->buffer_type = TSDB_DATA_TYPE_BIGINT; + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_bigint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "FLOAT", strlen("FLOAT"))) { + float *bind_float = (float *) *ptr; + + *bind_float = rand_float(); + bind->buffer_type = TSDB_DATA_TYPE_FLOAT; + bind->buffer_length = sizeof(float); + bind->buffer = bind_float; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "DOUBLE", strlen("DOUBLE"))) { + double *bind_double = (double *)*ptr; + + *bind_double = rand_double(); + bind->buffer_type = TSDB_DATA_TYPE_DOUBLE; + bind->buffer_length = sizeof(double); + bind->buffer = bind_double; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "SMALLINT", strlen("SMALLINT"))) { + int16_t *bind_smallint = (int16_t *)*ptr; + + *bind_smallint = rand_smallint(); + bind->buffer_type = TSDB_DATA_TYPE_SMALLINT; + bind->buffer_length = sizeof(int16_t); + bind->buffer = bind_smallint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "TINYINT", strlen("TINYINT"))) { + int8_t *bind_tinyint = (int8_t *)*ptr; + + *bind_tinyint = rand_tinyint(); + bind->buffer_type = TSDB_DATA_TYPE_TINYINT; + bind->buffer_length = sizeof(int8_t); + bind->buffer = bind_tinyint; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "BOOL", strlen("BOOL"))) { + int8_t *bind_bool = (int8_t *)*ptr; + + *bind_bool = rand_bool(); + bind->buffer_type = TSDB_DATA_TYPE_BOOL; + bind->buffer_length = sizeof(int8_t); + bind->buffer = bind_bool; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else if (0 == strncasecmp(dataType, + "TIMESTAMP", strlen("TIMESTAMP"))) { + int64_t *bind_ts2 = (int64_t *) *ptr; + + *bind_ts2 = rand_bigint(); + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts2; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + *ptr += bind->buffer_length; + } else { + errorPrint( "No support data type: %s\n", + dataType); + return -1; } - return randTail; + return 0; } -static int64_t generateProgressiveDataBuffer( +static int32_t prepareStmtWithoutStb( + TAOS_STMT *stmt, char *tableName, - int64_t tableSeq, - threadInfo *pThreadInfo, char *buffer, + uint32_t batch, int64_t insertRows, - uint64_t startFrom, int64_t startTime, int64_t *pSamplePos, - int64_t *pRemainderBufLen) + int64_t recordFrom, + int64_t startTime) { - SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + int ret = taos_stmt_set_tbname(stmt, tableName); + if (ret != 0) { + errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n", + tableName, ret, taos_errstr(NULL)); + return ret; + } - int ncols_per_record = 1; // count first col ts + char **data_type = g_args.datatype; - if (superTblInfo == NULL) { - int datatypeSeq = 0; - while(g_args.datatype[datatypeSeq]) { - datatypeSeq ++; - ncols_per_record ++; + char *bindArray = malloc(sizeof(TAOS_BIND) * (g_args.num_of_CPR + 1)); + if (bindArray == NULL) { + errorPrint("Failed to allocate %d bind params\n", + (g_args.num_of_CPR + 1)); + return -1; + } + + int32_t k = 0; + for (k = 0; k < batch;) { + /* columnCount + 1 (ts) */ + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + + char *ptr = data; + TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0); + + int64_t *bind_ts; + + bind_ts = (int64_t *)ptr; + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + *bind_ts = startTime + getTSRandTail( + (int64_t)DEFAULT_TIMESTAMP_STEP, k, + g_args.disorderRatio, + g_args.disorderRange); + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + ptr += bind->buffer_length; + + for (int i = 0; i < g_args.num_of_CPR; i ++) { + bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1))); + if ( -1 == prepareStmtBindArrayByType( + bind, + data_type[i], + g_args.len_of_binary, + &ptr)) { + return -1; + } + } + taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray); + // if msg > 3MB, break + taos_stmt_add_batch(stmt); + + k++; + recordFrom ++; + if (recordFrom >= insertRows) { + break; + } + } + + return k; +} + +static int32_t prepareStbStmt(SSuperTable *stbInfo, + TAOS_STMT *stmt, + char *tableName, uint32_t batch, + uint64_t insertRows, + uint64_t recordFrom, + int64_t startTime, char *buffer) +{ + int ret = taos_stmt_set_tbname(stmt, tableName); + if (ret != 0) { + errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n", + tableName, ret, taos_errstr(NULL)); + return ret; + } + + char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); + if (bindArray == NULL) { + errorPrint("Failed to allocate %d bind params\n", + (stbInfo->columnCount + 1)); + return -1; + } + + bool tsRand; + if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) { + tsRand = true; + } else { + tsRand = false; } - } + uint32_t k; + for (k = 0; k < batch;) { + /* columnCount + 1 (ts) */ + char data[MAX_DATA_SIZE]; + memset(data, 0, MAX_DATA_SIZE); + + char *ptr = data; + TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0); + + int64_t *bind_ts; + + bind_ts = (int64_t *)ptr; + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + if (tsRand) { + *bind_ts = startTime + getTSRandTail( + stbInfo->timeStampStep, k, + stbInfo->disorderRatio, + stbInfo->disorderRange); + } else { + *bind_ts = startTime + stbInfo->timeStampStep * k; + } + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts; + bind->length = &bind->buffer_length; + bind->is_null = NULL; + + ptr += bind->buffer_length; + + for (int i = 0; i < stbInfo->columnCount; i ++) { + bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1))); + if ( -1 == prepareStmtBindArrayByType( + bind, + stbInfo->columns[i].dataType, + stbInfo->columns[i].dataLen, + &ptr)) { + return -1; + } + } + taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray); + // if msg > 3MB, break + taos_stmt_add_batch(stmt); + + k++; + recordFrom ++; + if (recordFrom >= insertRows) { + break; + } + } + + return k; +} +#endif + +static int32_t generateStbProgressiveData( + SSuperTable *superTblInfo, + char *tableName, + int64_t tableSeq, + char *dbName, char *buffer, + int64_t insertRows, + uint64_t recordFrom, int64_t startTime, int64_t *pSamplePos, + int64_t *pRemainderBufLen) +{ assert(buffer != NULL); char *pstr = buffer; - int64_t k = 0; - memset(buffer, 0, *pRemainderBufLen); - int64_t headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, + int64_t headLen = generateStbSQLHead( + superTblInfo, + tableName, tableSeq, dbName, buffer, *pRemainderBufLen); if (headLen <= 0) { @@ -5145,29 +5617,64 @@ static int64_t generateProgressiveDataBuffer( *pRemainderBufLen -= headLen; int64_t dataLen; - k = generateDataTail(superTblInfo, - g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom, + + return generateStbDataTail(superTblInfo, + g_args.num_of_RPR, pstr, *pRemainderBufLen, + insertRows, recordFrom, startTime, pSamplePos, &dataLen); +} - return k; +static int32_t generateProgressiveDataWithoutStb( + char *tableName, + /* int64_t tableSeq, */ + threadInfo *pThreadInfo, char *buffer, + int64_t insertRows, + uint64_t recordFrom, int64_t startTime, /*int64_t *pSamplePos, */ + int64_t *pRemainderBufLen) +{ + assert(buffer != NULL); + char *pstr = buffer; + + memset(buffer, 0, *pRemainderBufLen); + + int64_t headLen = generateSQLHeadWithoutStb( + tableName, pThreadInfo->db_name, + buffer, *pRemainderBufLen); + + if (headLen <= 0) { + return 0; + } + pstr += headLen; + *pRemainderBufLen -= headLen; + + int64_t dataLen; + + return generateDataTailWithoutStb( + g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, recordFrom, + startTime, + /*pSamplePos, */&dataLen); } static void printStatPerThread(threadInfo *pThreadInfo) { - fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n", + fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n", pThreadInfo->threadID, pThreadInfo->totalInsertRows, pThreadInfo->totalAffectedRows, (double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0))); } +// sync write interlace data static void* syncWriteInterlace(threadInfo *pThreadInfo) { debugPrint("[%d] %s() LN%d: ### interlace write\n", pThreadInfo->threadID, __func__, __LINE__); int64_t insertRows; - uint64_t interlaceRows; + uint32_t interlaceRows; + uint64_t maxSqlLen; + int64_t nTimeStampStep; + uint64_t insert_interval; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; @@ -5180,45 +5687,48 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { } else { interlaceRows = superTblInfo->interlaceRows; } + maxSqlLen = superTblInfo->maxSqlLen; + nTimeStampStep = superTblInfo->timeStampStep; + insert_interval = superTblInfo->insertInterval; } else { insertRows = g_args.num_of_DPT; interlaceRows = g_args.interlace_rows; + maxSqlLen = g_args.max_sql_len; + nTimeStampStep = DEFAULT_TIMESTAMP_STEP; + insert_interval = g_args.insert_interval; } + debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, + pThreadInfo->ntables, insertRows); + if (interlaceRows > insertRows) interlaceRows = insertRows; if (interlaceRows > g_args.num_of_RPR) interlaceRows = g_args.num_of_RPR; - int insertMode; + uint32_t batchPerTbl = interlaceRows; + uint32_t batchPerTblTimes; - if (interlaceRows > 0) { - insertMode = INTERLACE_INSERT_MODE; + if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { + batchPerTblTimes = + g_args.num_of_RPR / interlaceRows; } else { - insertMode = PROGRESSIVE_INSERT_MODE; + batchPerTblTimes = 1; } - // TODO: prompt tbl count multple interlace rows and batch - // - - uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; - char* buffer = calloc(maxSqlLen, 1); - if (NULL == buffer) { + pThreadInfo->buffer = calloc(maxSqlLen, 1); + if (NULL == pThreadInfo->buffer) { errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n", __func__, __LINE__, maxSqlLen, strerror(errno)); return NULL; } - char tableName[TSDB_TABLE_NAME_LEN]; - pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; - int64_t nTimeStampStep = superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; - - uint64_t insert_interval = - superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; uint64_t st = 0; uint64_t et = UINT64_MAX; @@ -5227,69 +5737,98 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { uint64_t endTs; uint64_t tableSeq = pThreadInfo->start_table_from; - - debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", - pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from, - pThreadInfo->ntables, insertRows); - int64_t startTime = pThreadInfo->start_time; - uint64_t batchPerTbl = interlaceRows; - uint64_t batchPerTblTimes; - - if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { - batchPerTblTimes = - g_args.num_of_RPR / interlaceRows; - } else { - batchPerTblTimes = 1; - } - uint64_t generatedRecPerTbl = 0; bool flagSleep = true; uint64_t sleepTimeTotal = 0; - char *strInsertInto = "insert into "; - int nInsertBufLen = strlen(strInsertInto); - while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { if ((flagSleep) && (insert_interval)) { st = taosGetTimestampMs(); flagSleep = false; } // generate data - memset(buffer, 0, maxSqlLen); + memset(pThreadInfo->buffer, 0, maxSqlLen); uint64_t remainderBufLen = maxSqlLen; - char *pstr = buffer; + char *pstr = pThreadInfo->buffer; - int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto); + int len = snprintf(pstr, + strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); pstr += len; remainderBufLen -= len; - uint64_t recOfBatch = 0; + uint32_t recOfBatch = 0; + + for (uint32_t i = 0; i < batchPerTblTimes; i ++) { + char tableName[TSDB_TABLE_NAME_LEN]; - for (uint64_t i = 0; i < batchPerTblTimes; i ++) { getTableName(tableName, pThreadInfo, tableSeq); if (0 == strlen(tableName)) { errorPrint("[%d] %s() LN%d, getTableName return null\n", pThreadInfo->threadID, __func__, __LINE__); - free(buffer); + free(pThreadInfo->buffer); return NULL; } uint64_t oldRemainderLen = remainderBufLen; - int64_t generated = generateInterlaceDataBuffer( - tableName, batchPerTbl, i, batchPerTblTimes, - tableSeq, - pThreadInfo, pstr, - insertRows, - startTime, - &remainderBufLen); - - debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n", + + int32_t generated; + if (superTblInfo) { + if (superTblInfo->iface == STMT_IFACE) { +#if STMT_IFACE_ENABLED == 1 + generated = prepareStbStmt(superTblInfo, + pThreadInfo->stmt, + tableName, + batchPerTbl, + insertRows, i, + startTime, + pThreadInfo->buffer); +#else + generated = -1; +#endif + } else { + generated = generateStbInterlaceData( + superTblInfo, + tableName, batchPerTbl, i, + batchPerTblTimes, + tableSeq, + pThreadInfo, pstr, + insertRows, + startTime, + &remainderBufLen); + } + } else { + if (g_args.iface == STMT_IFACE) { + debugPrint("[%d] %s() LN%d, tableName:%s, batch:%d startTime:%"PRId64"\n", + pThreadInfo->threadID, + __func__, __LINE__, + tableName, batchPerTbl, startTime); +#if STMT_IFACE_ENABLED == 1 + generated = prepareStmtWithoutStb( + pThreadInfo->stmt, tableName, + batchPerTbl, + insertRows, i, + startTime); +#else + generated = -1; +#endif + } else { + generated = generateInterlaceDataWithoutStb( + tableName, batchPerTbl, + tableSeq, + pThreadInfo->db_name, pstr, + insertRows, + startTime, + &remainderBufLen); + } + } + + debugPrint("[%d] %s() LN%d, generated records is %d\n", pThreadInfo->threadID, __func__, __LINE__, generated); if (generated < 0) { - errorPrint("[%d] %s() LN%d, generated records is %"PRId64"\n", + errorPrint("[%d] %s() LN%d, generated records is %d\n", pThreadInfo->threadID, __func__, __LINE__, generated); goto free_of_interlace; } else if (generated == 0) { @@ -5298,15 +5837,15 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { tableSeq ++; recOfBatch += batchPerTbl; + pstr += (oldRemainderLen - remainderBufLen); -// startTime += batchPerTbl * superTblInfo->timeStampStep; pThreadInfo->totalInsertRows += batchPerTbl; - verbosePrint("[%d] %s() LN%d batchPerTbl=%"PRId64" recOfBatch=%"PRId64"\n", + + verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", pThreadInfo->threadID, __func__, __LINE__, batchPerTbl, recOfBatch); - if (insertMode == INTERLACE_INSERT_MODE) { - if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { + if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { // turn to first table tableSeq = pThreadInfo->start_table_from; generatedRecPerTbl += batchPerTbl; @@ -5318,13 +5857,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { if (generatedRecPerTbl >= insertRows) break; - int remainRows = insertRows - generatedRecPerTbl; + int64_t remainRows = insertRows - generatedRecPerTbl; if ((remainRows > 0) && (batchPerTbl > remainRows)) batchPerTbl = remainRows; if (pThreadInfo->ntables * batchPerTbl < g_args.num_of_RPR) break; - } } verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n", @@ -5335,22 +5873,22 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { break; } - verbosePrint("[%d] %s() LN%d recOfBatch=%"PRIu64" totalInsertRows=%"PRIu64"\n", + verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n", pThreadInfo->threadID, __func__, __LINE__, recOfBatch, pThreadInfo->totalInsertRows); verbosePrint("[%d] %s() LN%d, buffer=%s\n", - pThreadInfo->threadID, __func__, __LINE__, buffer); + pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer); startTs = taosGetTimestampMs(); if (recOfBatch == 0) { - errorPrint("[%d] %s() LN%d try inserting records of batch is %"PRIu64"\n", + errorPrint("[%d] %s() LN%d try inserting records of batch is %d\n", pThreadInfo->threadID, __func__, __LINE__, recOfBatch); errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n"); goto free_of_interlace; } - int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch); + int64_t affectedRows = execInsert(pThreadInfo, recOfBatch); endTs = taosGetTimestampMs(); uint64_t delay = endTs - startTs; @@ -5366,9 +5904,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->totalDelay += delay; if (recOfBatch != affectedRows) { - errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n", + errorPrint("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n", pThreadInfo->threadID, __func__, __LINE__, - recOfBatch, affectedRows, buffer); + recOfBatch, affectedRows, pThreadInfo->buffer); goto free_of_interlace; } @@ -5387,8 +5925,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { et = taosGetTimestampMs(); if (insert_interval > (et - st) ) { - int sleepTime = insert_interval - (et -st); - performancePrint("%s() LN%d sleep: %d ms for insert interval\n", + uint64_t sleepTime = insert_interval - (et -st); + performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n", __func__, __LINE__, sleepTime); taosMsleep(sleepTime); // ms sleepTimeTotal += insert_interval; @@ -5397,27 +5935,26 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { } free_of_interlace: - tmfree(buffer); + tmfree(pThreadInfo->buffer); printStatPerThread(pThreadInfo); return NULL; } -// sync insertion -/* - 1 thread: 100 tables * 2000 rows/s - 1 thread: 10 tables * 20000 rows/s - 6 thread: 300 tables * 2000 rows/s - - 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s -*/ +// sync insertion progressive data static void* syncWriteProgressive(threadInfo *pThreadInfo) { debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); SSuperTable* superTblInfo = pThreadInfo->superTblInfo; uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; + int64_t timeStampStep = + superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; + int64_t insertRows = + (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; + verbosePrint("%s() LN%d insertRows=%"PRId64"\n", + __func__, __LINE__, insertRows); - char* buffer = calloc(maxSqlLen, 1); - if (NULL == buffer) { + pThreadInfo->buffer = calloc(maxSqlLen, 1); + if (NULL == pThreadInfo->buffer) { errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n", maxSqlLen, strerror(errno)); @@ -5428,35 +5965,17 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { uint64_t startTs = taosGetTimestampMs(); uint64_t endTs; - int64_t timeStampStep = - superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; -/* int insert_interval = - superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; - uint64_t st = 0; - uint64_t et = 0xffffffff; - */ - pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; pThreadInfo->samplePos = 0; - for (uint64_t tableSeq = - pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to; - tableSeq ++) { + for (uint64_t tableSeq = pThreadInfo->start_table_from; + tableSeq <= pThreadInfo->end_table_to; + tableSeq ++) { int64_t start_time = pThreadInfo->start_time; - int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; - - verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); - for (uint64_t i = 0; i < insertRows;) { - /* - if (insert_interval) { - st = taosGetTimestampMs(); - } - */ - char tableName[TSDB_TABLE_NAME_LEN]; getTableName(tableName, pThreadInfo, tableSeq); verbosePrint("%s() LN%d: tid=%d seq=%"PRId64" tableName=%s\n", @@ -5464,19 +5983,57 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pThreadInfo->threadID, tableSeq, tableName); int64_t remainderBufLen = maxSqlLen; - char *pstr = buffer; - int nInsertBufLen = strlen("insert into "); + char *pstr = pThreadInfo->buffer; - int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into "); + int len = snprintf(pstr, + strlen(STR_INSERT_INTO) + 1, "%s", STR_INSERT_INTO); pstr += len; remainderBufLen -= len; - int64_t generated = generateProgressiveDataBuffer( - tableName, tableSeq, pThreadInfo, pstr, insertRows, - i, start_time, - &(pThreadInfo->samplePos), - &remainderBufLen); + int32_t generated; + if (superTblInfo) { + if (superTblInfo->iface == STMT_IFACE) { +#if STMT_IFACE_ENABLED == 1 + generated = prepareStbStmt( + superTblInfo, + pThreadInfo->stmt, + tableName, + g_args.num_of_RPR, + insertRows, i, start_time, pstr); +#else + generated = -1; +#endif + } else { + generated = generateStbProgressiveData( + superTblInfo, + tableName, tableSeq, pThreadInfo->db_name, pstr, + insertRows, i, start_time, + &(pThreadInfo->samplePos), + &remainderBufLen); + } + } else { + if (g_args.iface == STMT_IFACE) { +#if STMT_IFACE_ENABLED == 1 + generated = prepareStmtWithoutStb( + pThreadInfo->stmt, + tableName, + g_args.num_of_RPR, + insertRows, i, + start_time); +#else + generated = -1; +#endif + } else { + generated = generateProgressiveDataWithoutStb( + tableName, + /* tableSeq, */ + pThreadInfo, pstr, insertRows, + i, start_time, + /* &(pThreadInfo->samplePos), */ + &remainderBufLen); + } + } if (generated > 0) i += generated; else @@ -5487,13 +6044,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { startTs = taosGetTimestampMs(); - int64_t affectedRows = execInsert(pThreadInfo, buffer, generated); + int32_t affectedRows = execInsert(pThreadInfo, generated); endTs = taosGetTimestampMs(); uint64_t delay = endTs - startTs; performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n", __func__, __LINE__, delay); - verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n", + verbosePrint("[%d] %s() LN%d affectedRows=%d\n", pThreadInfo->threadID, __func__, __LINE__, affectedRows); @@ -5503,7 +6060,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pThreadInfo->totalDelay += delay; if (affectedRows < 0) { - errorPrint("%s() LN%d, affected rows: %"PRId64"\n", + errorPrint("%s() LN%d, affected rows: %d\n", __func__, __LINE__, affectedRows); goto free_of_progressive; } @@ -5521,32 +6078,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { if (i >= insertRows) break; -/* - if (insert_interval) { - et = taosGetTimestampMs(); - - if (insert_interval > ((et - st)) ) { - int sleep_time = insert_interval - (et -st); - performancePrint("%s() LN%d sleep: %d ms for insert interval\n", - __func__, __LINE__, sleep_time); - taosMsleep(sleep_time); // ms - } - } - */ } // num_of_DPT - if (g_args.verbose_print) { - if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo && + if ((g_args.verbose_print) && + (tableSeq == pThreadInfo->ntables - 1) && (superTblInfo) && (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample")))) { verbosePrint("%s() LN%d samplePos=%"PRId64"\n", __func__, __LINE__, pThreadInfo->samplePos); - } } } // tableSeq free_of_progressive: - tmfree(buffer); + tmfree(pThreadInfo->buffer); printStatPerThread(pThreadInfo); return NULL; } @@ -5556,7 +6100,7 @@ static void* syncWrite(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; SSuperTable* superTblInfo = pThreadInfo->superTblInfo; - int interlaceRows; + uint32_t interlaceRows; if (superTblInfo) { if ((superTblInfo->interlaceRows == 0) @@ -5576,7 +6120,6 @@ static void* syncWrite(void *sarg) { // progressive mode return syncWriteProgressive(pThreadInfo); } - } static void callBack(void *param, TAOS_RES *res, int code) { @@ -5616,9 +6159,10 @@ static void callBack(void *param, TAOS_RES *res, int code) { && rand_num < pThreadInfo->superTblInfo->disorderRatio) { int64_t d = pThreadInfo->lastTs - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1); - generateRowData(data, d, pThreadInfo->superTblInfo); + generateStbRowData(pThreadInfo->superTblInfo, data, d); } else { - generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo); + generateStbRowData(pThreadInfo->superTblInfo, + data, pThreadInfo->lastTs += 1000); } pstr += sprintf(pstr, "%s", data); pThreadInfo->counter++; @@ -5686,24 +6230,6 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * static void startMultiThreadInsertData(int threads, char* db_name, char* precision,SSuperTable* superTblInfo) { - pthread_t *pids = malloc(threads * sizeof(pthread_t)); - assert(pids != NULL); - - threadInfo *infos = malloc(threads * sizeof(threadInfo)); - assert(infos != NULL); - - memset(pids, 0, threads * sizeof(pthread_t)); - memset(infos, 0, threads * sizeof(threadInfo)); - - //TAOS* taos; - //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { - // taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); - // if (NULL == taos) { - // printf("connect to server fail, reason: %s\n", taos_errstr(NULL)); - // exit(-1); - // } - //} - int32_t timePrec = TSDB_TIME_PRECISION_MILLI; if (0 != precision[0]) { if (0 == strncasecmp(precision, "ms", 2)) { @@ -5755,17 +6281,17 @@ static void startMultiThreadInsertData(int threads, char* db_name, } } - TAOS* taos = taos_connect( + TAOS* taos0 = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); - if (NULL == taos) { + if (NULL == taos0) { errorPrint("%s() LN%d, connect to server fail , reason: %s\n", __func__, __LINE__, taos_errstr(NULL)); exit(-1); } int64_t ntables = 0; - uint64_t startFrom; + uint64_t tableFrom; if (superTblInfo) { int64_t limit; @@ -5792,7 +6318,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, } ntables = limit; - startFrom = offset; + tableFrom = offset; if ((superTblInfo->childTblExists != TBL_NO_EXISTS) && ((superTblInfo->childTblOffset + superTblInfo->childTblLimit ) @@ -5811,23 +6337,23 @@ static void startMultiThreadInsertData(int threads, char* db_name, limit * TSDB_TABLE_NAME_LEN); if (superTblInfo->childTblName == NULL) { errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); - taos_close(taos); + taos_close(taos0); exit(-1); } int64_t childTblCount; getChildNameOfSuperTableWithLimitAndOffset( - taos, + taos0, db_name, superTblInfo->sTblName, &superTblInfo->childTblName, &childTblCount, limit, offset); } else { ntables = g_args.num_of_tables; - startFrom = 0; + tableFrom = 0; } - taos_close(taos); + taos_close(taos0); int64_t a = ntables / threads; if (a < 1) { @@ -5841,11 +6367,22 @@ static void startMultiThreadInsertData(int threads, char* db_name, } if ((superTblInfo) - && (0 == strncasecmp(superTblInfo->insertMode, "rest", strlen("rest")))) { - if (convertHostToServAddr(g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) - exit(-1); + && (superTblInfo->iface == REST_IFACE)) { + if (convertHostToServAddr( + g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) { + exit(-1); + } } + pthread_t *pids = malloc(threads * sizeof(pthread_t)); + assert(pids != NULL); + + threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); + assert(infos != NULL); + + memset(pids, 0, threads * sizeof(pthread_t)); + memset(infos, 0, threads * sizeof(threadInfo)); + for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infos + i; pThreadInfo->threadID = i; @@ -5857,17 +6394,59 @@ static void startMultiThreadInsertData(int threads, char* db_name, pThreadInfo->minDelay = UINT64_MAX; if ((NULL == superTblInfo) || - (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { - //pThreadInfo->taos = taos; + (superTblInfo->iface != REST_IFACE)) { + //t_info->taos = taos; pThreadInfo->taos = taos_connect( g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == pThreadInfo->taos) { errorPrint( - "connect to server fail from insert sub thread, reason: %s\n", + "%s() LN%d, connect to server fail from insert sub thread, reason: %s\n", + __func__, __LINE__, taos_errstr(NULL)); + free(infos); exit(-1); } + + if ((g_args.iface == STMT_IFACE) + || ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) { + + int columnCount; + if (superTblInfo) { + columnCount = superTblInfo->columnCount; + } else { + columnCount = g_args.num_of_CPR; + } + + pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); + if (NULL == pThreadInfo->stmt) { + errorPrint( + "%s() LN%d, failed init stmt, reason: %s\n", + __func__, __LINE__, + taos_errstr(NULL)); + free(pids); + free(infos); + exit(-1); + } + + char buffer[3000]; + char *pstr = buffer; + pstr += sprintf(pstr, "INSERT INTO ? values(?"); + + for (int col = 0; col < columnCount; col ++) { + pstr += sprintf(pstr, ",?"); + } + pstr += sprintf(pstr, ")"); + + int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0); + if (ret != 0){ + errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", + ret, taos_errstr(NULL)); + free(pids); + free(infos); + exit(-1); + } + } } else { pThreadInfo->taos = NULL; } @@ -5875,10 +6454,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, /* if ((NULL == superTblInfo) || (0 == superTblInfo->multiThreadWriteOneTbl)) { */ - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; /* } else { pThreadInfo->start_table_from = 0; pThreadInfo->ntables = superTblInfo->childTblCount; @@ -5906,6 +6485,11 @@ static void startMultiThreadInsertData(int threads, char* db_name, for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infos + i; + tsem_destroy(&(pThreadInfo->lock_sem)); + + if (pThreadInfo->stmt) { + taos_stmt_close(pThreadInfo->stmt); + } tsem_destroy(&(pThreadInfo->lock_sem)); taos_close(pThreadInfo->taos); @@ -6182,13 +6766,12 @@ static int insertTestProcess() { } } - taosMsleep(1000); // create sub threads for inserting data //start = taosGetTimestampMs(); for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.use_metric) { if (g_Dbs.db[i].superTblCount > 0) { - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; @@ -6512,15 +7095,15 @@ static int queryTestProcess() { b = ntables % threads; } - uint64_t startFrom = 0; + uint64_t tableFrom = 0; for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infosOfSub + i; pThreadInfo->threadID = i; - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo); } @@ -6826,11 +7409,14 @@ static void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume( g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]); if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) { - if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] + != 0) { sprintf(pThreadInfo->filePath, "%s-%d", g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); - fetchResult(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], pThreadInfo); + fetchResult( + g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], + pThreadInfo); } g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++; @@ -6843,16 +7429,17 @@ static void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0; taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID], g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); - g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl( - SPECIFIED_CLASS, - pThreadInfo, - g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], - g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], - g_queryInfo.specifiedQueryInfo.subscribeRestart, - g_queryInfo.specifiedQueryInfo.subscribeInterval); + g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = + subscribeImpl( + SPECIFIED_CLASS, + pThreadInfo, + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], + g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], + g_queryInfo.specifiedQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeInterval); if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) { - taos_close(pThreadInfo->taos); - return NULL; + taos_close(pThreadInfo->taos); + return NULL; } } } @@ -6975,17 +7562,17 @@ static int subscribeTestProcess() { } for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - uint64_t startFrom = 0; + uint64_t tableFrom = 0; for (int j = 0; j < threads; j++) { uint64_t seq = i * threads + j; threadInfo *pThreadInfo = infosOfStable + seq; pThreadInfo->threadID = seq; pThreadInfo->querySeq = i; - pThreadInfo->start_table_from = startFrom; + pThreadInfo->start_table_from = tableFrom; pThreadInfo->ntables = jend_table_to = jend_table_to + 1; + pThreadInfo->end_table_to = jend_table_to + 1; pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection; pthread_create(pidsOfStable + seq, NULL, superSubscribe, pThreadInfo); @@ -7065,7 +7652,7 @@ static void setParaFromArg(){ g_Dbs.threadCountByCreateTbl = g_args.num_of_threads; g_Dbs.dbCount = 1; - g_Dbs.db[0].drop = 1; + g_Dbs.db[0].drop = true; tstrncpy(g_Dbs.db[0].dbName, g_args.database, MAX_DB_NAME_SIZE); g_Dbs.db[0].dbCfg.replica = g_args.replica; @@ -7104,7 +7691,7 @@ static void setParaFromArg(){ tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, g_args.tb_prefix, MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE); - tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE); + g_Dbs.db[0].superTbls[0].iface = g_args.iface; tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; diff --git a/src/os/src/detail/osTime.c b/src/os/src/detail/osTime.c index d9d070218e2a64e5f2535c073db4852b0a8960ef..67e0c2642e42f66229c437e603f7062edf571f34 100644 --- a/src/os/src/detail/osTime.c +++ b/src/os/src/detail/osTime.c @@ -87,12 +87,12 @@ static int32_t (*parseLocaltimeFp[]) (char* timestr, int64_t* time, int32_t time int32_t taosGetTimestampSec() { return (int32_t)time(NULL); } -int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t daylight) { +int32_t taosParseTime(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t day_light) { /* parse datatime string in with tz */ if (strnchr(timestr, 'T', len, false) != NULL) { return parseTimeWithTz(timestr, time, timePrec); } else { - return (*parseLocaltimeFp[daylight])(timestr, time, timePrec); + return (*parseLocaltimeFp[day_light])(timestr, time, timePrec); } } diff --git a/src/vnode/src/vnodeMgmt.c b/src/vnode/src/vnodeMgmt.c index 32f95321383981924c5b6496bd4302edca19da5e..7e6022fc872c3a2221514169ab00874011dc3cb9 100644 --- a/src/vnode/src/vnodeMgmt.c +++ b/src/vnode/src/vnodeMgmt.c @@ -91,18 +91,18 @@ static void vnodeIncRef(void *ptNode) { } void *vnodeAcquire(int32_t vgId) { - SVnodeObj **ppVnode = NULL; + SVnodeObj *pVnode = NULL; if (tsVnodesHash != NULL) { - ppVnode = taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); + taosHashGetClone(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, &pVnode, sizeof(void *)); } - if (ppVnode == NULL || *ppVnode == NULL) { + if (pVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; vDebug("vgId:%d, not exist", vgId); return NULL; } - return *ppVnode; + return pVnode; } void vnodeRelease(void *vparam) { diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index f120bba5366e83eed86e289c6c80fe3b6819c273..24ae0ad55766d22d11e556fa99e8b92f9028e5e9 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -332,5 +332,5 @@ python3 ./test.py -f tag_lite/alter_tag.py python3 test.py -f tools/taosdemoAllTest/taosdemoTestInsertWithJson.py python3 test.py -f tools/taosdemoAllTest/taosdemoTestQueryWithJson.py - +python3 ./test.py -f tag_lite/drop_auto_create.py #======================p4-end=============== diff --git a/tests/pytest/tag_lite/drop_auto_create.py b/tests/pytest/tag_lite/drop_auto_create.py new file mode 100644 index 0000000000000000000000000000000000000000..f89b41008b167884f54cacde3eda11cbc81c0040 --- /dev/null +++ b/tests/pytest/tag_lite/drop_auto_create.py @@ -0,0 +1,47 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +from util.log import * +from util.cases import * +from util.sql import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + def run(self): + tdSql.prepare() + + tdSql.execute('create table m1(ts timestamp, k int) tags(a binary(12), b int, c double);') + tdSql.execute('insert into tm0 using m1(b,c) tags(1, 99) values(now, 1);') + tdSql.execute('insert into tm1 using m1(b,c) tags(2, 100) values(now, 2);') + tdLog.info("2 rows inserted") + tdSql.query('select * from m1;') + tdSql.checkRows(2) + tdSql.query('select *,tbname from m1;') + tdSql.execute("drop table tm0; ") + tdSql.query('select * from m1') + tdSql.checkRows(1) + + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())