diff --git a/src/kit/taosdemo/insert-interlace.json b/src/kit/taosdemo/insert-interlace.json new file mode 100644 index 0000000000000000000000000000000000000000..b05c1ee3128a27094b6a7870ca478c67c67451e2 --- /dev/null +++ b/src/kit/taosdemo/insert-interlace.json @@ -0,0 +1,58 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 4, + "thread_count_create_tbl": 4, + "result_file": "./insert_res.txt", + "confirm_parameter_prompt": "no", + "insert_interval": 5000, + "rows_per_tbl": 50, + "num_of_records_per_req": 100, + "max_sql_len": 1024000, + "databases": [{ + "dbinfo": { + "name": "db", + "drop": "yes", + "replica": 1, + "days": 10, + "cache": 16, + "blocks": 8, + "precision": "ms", + "keep": 365, + "minRows": 100, + "maxRows": 4096, + "comp":2, + "walLevel":1, + "cachelast":0, + "quorum":1, + "fsync":3000, + "update": 0 + }, + "super_tables": [{ + "name": "stb", + "child_table_exists":"no", + "childtable_count": 10, + "childtable_prefix": "stb_", + "auto_create_table": "no", + "data_source": "rand", + "insert_mode": "taosc", + "insert_rows": 200, + "multi_thread_write_one_tbl": "no", + "rows_per_tbl": 20, + "max_sql_len": 1024000, + "disorder_ratio": 0, + "disorder_range": 1000, + "timestamp_step": 1, + "start_timestamp": "2020-10-01 00:00:00.000", + "sample_format": "csv", + "sample_file": "./sample.csv", + "tags_file": "", + "columns": [{"type": "INT"}, {"type": "DOUBLE", "count":10}, {"type": "BINARY", "len": 16, "count":3}, {"type": "BINARY", "len": 32, "count":6}], + "tags": [{"type": "TINYINT", "count":2}, {"type": "BINARY", "len": 16, "count":5}] + }] + }] +} diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index b03f47bfafd7b04c97774c8a8293634db09eb787..846eea5062a72116467fa0c8f8cb443a9da70f00 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -60,9 +60,12 @@ extern char configDir[]; #define QUERY_JSON_NAME "query.json" #define SUBSCRIBE_JSON_NAME "subscribe.json" -#define INSERT_MODE 0 -#define QUERY_MODE 1 -#define SUBSCRIBE_MODE 2 +enum TEST_MODE { + INSERT_TEST, // 0 + QUERY_TEST, // 1 + SUBSCRIBE_TEST, // 2 + INVAID_TEST +}; #define MAX_SQL_SIZE 65536 #define BUFFER_SIZE (65536*2) @@ -109,6 +112,12 @@ enum MODE { MODE_BUT }; +typedef enum enum_INSERT_MODE { + PROGRESSIVE_INSERT_MODE, + INTERLACE_INSERT_MODE, + INVALID_INSERT_MODE +} INSERT_MODE; + enum QUERY_TYPE { NO_INSERT_TYPE, INSERT_TYPE, @@ -148,7 +157,8 @@ enum _show_stables_index { TSDB_SHOW_STABLES_TID_INDEX, TSDB_SHOW_STABLES_VGID_INDEX, TSDB_MAX_SHOW_STABLES -}; +}; + enum _describe_table_index { TSDB_DESCRIBE_METRIC_FIELD_INDEX, TSDB_DESCRIBE_METRIC_TYPE_INDEX, @@ -188,6 +198,7 @@ typedef struct SArguments_S { int num_of_CPR; int num_of_threads; int insert_interval; + int rows_per_tbl; int num_of_RPR; int max_sql_len; int num_of_tables; @@ -197,6 +208,8 @@ typedef struct SArguments_S { int disorderRange; int method_of_delete; char ** arg_list; + int64_t totalInsertRows; + int64_t totalAffectedRows; } SArguments; typedef struct SColumn_S { @@ -379,8 +392,9 @@ typedef struct SThreadInfo_S { char db_name[MAX_DB_NAME_SIZE+1]; char fp[4096]; char tb_prefix[MAX_TB_NAME_SIZE]; - int start_table_id; - int end_table_id; + int start_table_from; + int end_table_to; + int ntables; int data_of_rate; uint64_t start_time; char* cols; @@ -531,6 +545,7 @@ SArguments g_args = { 10, // num_of_CPR 10, // num_of_connections/thread 0, // insert_interval + 0, // rows_per_tbl; 100, // num_of_RPR TSDB_PAYLOAD_SIZE, // max_sql_len 10000, // num_of_tables @@ -553,7 +568,8 @@ static FILE * g_fpOfInsertResult = NULL; do { if (g_args.debug_print || g_args.verbose_print) \ fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0) #define verbosePrint(fmt, ...) \ - do { if (g_args.verbose_print) fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0) + do { if (g_args.verbose_print) \ + fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0) /////////////////////////////////////////////////// @@ -652,6 +668,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->num_of_threads = atoi(argv[++i]); } else if (strcmp(argv[i], "-i") == 0) { arguments->insert_interval = atoi(argv[++i]); + } else if (strcmp(argv[i], "-B") == 0) { + arguments->rows_per_tbl = atoi(argv[++i]); } else if (strcmp(argv[i], "-r") == 0) { arguments->num_of_RPR = atoi(argv[++i]); } else if (strcmp(argv[i], "-t") == 0) { @@ -1299,33 +1317,45 @@ static void printfInsertMetaToFile(FILE* fp) { fprintf(fp, " columnCount: %d\n ", g_Dbs.db[i].superTbls[j].columnCount); for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) { //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); - if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "binary", 6)) || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "nchar", 5))) { - fprintf(fp, "column[%d]:%s(%d) ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); + if ((0 == strncasecmp( + g_Dbs.db[i].superTbls[j].columns[k].dataType, + "binary", strlen("binary"))) + || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, + "nchar", strlen("nchar")))) { + fprintf(fp, "column[%d]:%s(%d) ", k, + g_Dbs.db[i].superTbls[j].columns[k].dataType, + g_Dbs.db[i].superTbls[j].columns[k].dataLen); } else { fprintf(fp, "column[%d]:%s ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType); } } fprintf(fp, "\n"); - - fprintf(fp, " tagCount: %d\n ", g_Dbs.db[i].superTbls[j].tagCount); + + fprintf(fp, " tagCount: %d\n ", + g_Dbs.db[i].superTbls[j].tagCount); for (int k = 0; k < g_Dbs.db[i].superTbls[j].tagCount; k++) { //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen); - if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "binary", 6)) || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", 5))) { - fprintf(fp, "tag[%d]:%s(%d) ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen); + if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, + "binary", strlen("binary"))) + || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, + "nchar", strlen("nchar")))) { + fprintf(fp, "tag[%d]:%s(%d) ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType, + g_Dbs.db[i].superTbls[j].tags[k].dataLen); } else { fprintf(fp, "tag[%d]:%s ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType); - } + } } fprintf(fp, "\n"); } fprintf(fp, "\n"); } - SHOW_PARSE_RESULT_END_TO_FILE(fp); + SHOW_PARSE_RESULT_END_TO_FILE(fp); } static void printfQueryMeta() { - SHOW_PARSE_RESULT_START(); - printf("host: \033[33m%s:%u\033[0m\n", g_queryInfo.host, g_queryInfo.port); + SHOW_PARSE_RESULT_START(); + printf("host: \033[33m%s:%u\033[0m\n", + g_queryInfo.host, g_queryInfo.port); printf("user: \033[33m%s\033[0m\n", g_queryInfo.user); printf("password: \033[33m%s\033[0m\n", g_queryInfo.password); printf("database name: \033[33m%s\033[0m\n", g_queryInfo.dbName); @@ -1336,7 +1366,7 @@ static void printfQueryMeta() { printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.concurrent); printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount); - if (SUBSCRIBE_MODE == g_args.test_mode) { + if (SUBSCRIBE_TEST == g_args.test_mode) { printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode); printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval); printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart); @@ -1353,7 +1383,7 @@ static void printfQueryMeta() { printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.childTblCount); printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.subQueryInfo.sTblName); - if (SUBSCRIBE_MODE == g_args.test_mode) { + if (SUBSCRIBE_TEST == g_args.test_mode) { printf("mod: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeMode); printf("interval: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeInterval); printf("restart: \033[33m%d\033[0m\n", g_queryInfo.subQueryInfo.subscribeRestart); @@ -1515,15 +1545,19 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { return -1; } - tstrncpy(dbInfos[count]->name, (char *)row[TSDB_SHOW_DB_NAME_INDEX], fields[TSDB_SHOW_DB_NAME_INDEX].bytes); - xFormatTimestamp(dbInfos[count]->create_time, *(int64_t*)row[TSDB_SHOW_DB_CREATED_TIME_INDEX], TSDB_TIME_PRECISION_MILLI); + tstrncpy(dbInfos[count]->name, (char *)row[TSDB_SHOW_DB_NAME_INDEX], + fields[TSDB_SHOW_DB_NAME_INDEX].bytes); + xFormatTimestamp(dbInfos[count]->create_time, + *(int64_t*)row[TSDB_SHOW_DB_CREATED_TIME_INDEX], + TSDB_TIME_PRECISION_MILLI); dbInfos[count]->ntables = *((int32_t *)row[TSDB_SHOW_DB_NTABLES_INDEX]); dbInfos[count]->vgroups = *((int32_t *)row[TSDB_SHOW_DB_VGROUPS_INDEX]); dbInfos[count]->replica = *((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]); dbInfos[count]->quorum = *((int16_t *)row[TSDB_SHOW_DB_QUORUM_INDEX]); dbInfos[count]->days = *((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]); - tstrncpy(dbInfos[count]->keeplist, (char *)row[TSDB_SHOW_DB_KEEP_INDEX], fields[TSDB_SHOW_DB_KEEP_INDEX].bytes); + tstrncpy(dbInfos[count]->keeplist, (char *)row[TSDB_SHOW_DB_KEEP_INDEX], + fields[TSDB_SHOW_DB_KEEP_INDEX].bytes); dbInfos[count]->cache = *((int32_t *)row[TSDB_SHOW_DB_CACHE_INDEX]); dbInfos[count]->blocks = *((int32_t *)row[TSDB_SHOW_DB_BLOCKS_INDEX]); dbInfos[count]->minrows = *((int32_t *)row[TSDB_SHOW_DB_MINROWS_INDEX]); @@ -1537,8 +1571,9 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) { (char *)row[TSDB_SHOW_DB_PRECISION_INDEX], fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes); dbInfos[count]->update = *((int8_t *)row[TSDB_SHOW_DB_UPDATE_INDEX]); - tstrncpy(dbInfos[count]->status, (char *)row[TSDB_SHOW_DB_STATUS_INDEX], fields[TSDB_SHOW_DB_STATUS_INDEX].bytes); - + tstrncpy(dbInfos[count]->status, (char *)row[TSDB_SHOW_DB_STATUS_INDEX], + fields[TSDB_SHOW_DB_STATUS_INDEX].bytes); + count++; if (count > MAX_DATABASE_COUNT) { fprintf(stderr, "The database count overflow than %d\n", MAX_DATABASE_COUNT); @@ -1593,8 +1628,10 @@ static void printfQuerySystemInfo(TAOS * taos) { struct tm* lt; time(&t); lt = localtime(&t); - snprintf(filename, MAX_QUERY_SQL_LENGTH, "querySystemInfo-%d-%d-%d %d:%d:%d", lt->tm_year+1900, lt->tm_mon, lt->tm_mday, lt->tm_hour, lt->tm_min, lt->tm_sec); - + snprintf(filename, MAX_QUERY_SQL_LENGTH, "querySystemInfo-%d-%d-%d %d:%d:%d", + lt->tm_year+1900, lt->tm_mon, lt->tm_mday, lt->tm_hour, lt->tm_min, + lt->tm_sec); + // show variables res = taos_query(taos, "show variables;"); //getResult(res, filename); @@ -1604,7 +1641,7 @@ static void printfQuerySystemInfo(TAOS * taos) { res = taos_query(taos, "show dnodes;"); xDumpResultToFile(filename, res); //getResult(res, filename); - + // show databases res = taos_query(taos, "show databases;"); SDbInfo** dbInfos = (SDbInfo **)calloc(MAX_DATABASE_COUNT, sizeof(SDbInfo *)); @@ -1621,7 +1658,7 @@ static void printfQuerySystemInfo(TAOS * taos) { for (int i = 0; i < dbCount; i++) { // printf database info printfDbInfoForQueryToFile(filename, dbInfos[i], i); - + // show db.vgroups snprintf(buffer, MAX_QUERY_SQL_LENGTH, "show %s.vgroups;", dbInfos[i]->name); res = taos_query(taos, buffer); @@ -1639,9 +1676,9 @@ static void printfQuerySystemInfo(TAOS * taos) { } -void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); } +static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); } -int postProceSql(char* host, uint16_t port, char* sqlstr) +static int postProceSql(char* host, uint16_t port, char* sqlstr) { char *req_fmt = "POST %s HTTP/1.1\r\nHost: %s:%d\r\nAccept: */*\r\nAuthorization: Basic %s\r\nContent-Length: %d\r\nContent-Type: application/x-www-form-urlencoded\r\n\r\n%s"; @@ -2346,7 +2383,8 @@ static int createDatabases() { } printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName); - debugPrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); + debugPrint("%s() %d supertbl count:%d\n", + __func__, __LINE__, g_Dbs.db[i].superTblCount); for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // describe super table, if exists sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, @@ -2395,8 +2433,12 @@ static void* createTable(void *sarg) int len = 0; int batchNum = 0; - //printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); - for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { + + verbosePrint("%s() LN%d: Creating table from %d to %d\n", + __func__, __LINE__, + winfo->start_table_from, winfo->end_table_to); + + for (int i = winfo->start_table_from; i <= winfo->end_table_to; i++) { if (0 == g_Dbs.use_metric) { snprintf(buffer, buff_len, "create table if not exists %s.%s%d %s;", @@ -2451,7 +2493,7 @@ static void* createTable(void *sarg) int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] already create %d - %d tables\n", - winfo->threadID, winfo->start_table_id, i); + winfo->threadID, winfo->start_table_from, i); lastPrintTime = currentPrintTime; } } @@ -2467,7 +2509,7 @@ static void* createTable(void *sarg) return NULL; } -int startMultiThreadCreateChildTable( +static int startMultiThreadCreateChildTable( char* cols, int threads, int startFrom, int ntables, char* db_name, SSuperTable* superTblInfo) { pthread_t *pids = malloc(threads * sizeof(pthread_t)); @@ -2491,7 +2533,6 @@ int startMultiThreadCreateChildTable( int b = 0; b = ntables % threads; - int last = startFrom; for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; @@ -2510,9 +2551,11 @@ int startMultiThreadCreateChildTable( free(infos); return -1; } - t_info->start_table_id = last; - t_info->end_table_id = i < b ? last + a : last + a - 1; - last = t_info->end_table_id + 1; + + t_info->start_table_from = startFrom; + t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; + startFrom = t_info->end_table_to + 1; t_info->use_metric = 1; t_info->cols = cols; t_info->minDelay = INT16_MAX; @@ -2599,7 +2642,7 @@ static void createChildTables() { /* Read 10000 lines at most. If more than 10000 lines, continue to read after using */ -int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { +static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { size_t n = 0; ssize_t readLen = 0; char * line = NULL; @@ -2669,7 +2712,6 @@ int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) { return 0; } - /* Read 10000 lines at most. If more than 10000 lines, continue to read after using */ @@ -2716,7 +2758,8 @@ static int readSampleFromCsvFileToMem( continue; } - verbosePrint("readLen=%ld stb->lenOfOneRow=%d getRows=%d\n", readLen, superTblInfo->lenOfOneRow, getRows); + verbosePrint("readLen=%ld stb->lenOfOneRow=%d getRows=%d\n", readLen, + superTblInfo->lenOfOneRow, getRows); memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow, line, readLen); @@ -2958,6 +3001,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } + cJSON* rowsPerTbl = cJSON_GetObjectItem(root, "rows_per_tbl"); + if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) { + g_args.rows_per_tbl = rowsPerTbl->valueint; + } else if (!rowsPerTbl) { + g_args.rows_per_tbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req + } else { + fprintf(stderr, "ERROR: failed to read json, rows_per_tbl input mistake\n"); + goto PARSE_OVER; + } + cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len"); if (maxSqlLen && maxSqlLen->type == cJSON_Number) { g_args.max_sql_len = maxSqlLen->valueint; @@ -3005,7 +3058,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { int dbSize = cJSON_GetArraySize(dbs); if (dbSize > MAX_DB_COUNT) { - printf("ERROR: failed to read json, databases size overflow, max database is %d\n", MAX_DB_COUNT); + fprintf(stderr, + "ERROR: failed to read json, databases size overflow, max database is %d\n", + MAX_DB_COUNT); goto PARSE_OVER; } @@ -3202,7 +3257,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { int stbSize = cJSON_GetArraySize(stables); if (stbSize > MAX_SUPER_TABLE_COUNT) { - printf("ERROR: failed to read json, databases size overflow, max database is %d\n", MAX_SUPER_TABLE_COUNT); + fprintf(stderr, + "ERROR: failed to read json, databases size overflow, max database is %d\n", + MAX_SUPER_TABLE_COUNT); goto PARSE_OVER; } @@ -3429,6 +3486,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n"); goto PARSE_OVER; } + cJSON* rowsPerTbl = cJSON_GetObjectItem(stbInfo, "rows_per_tbl"); if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].rowsPerTbl = rowsPerTbl->valueint; @@ -3768,7 +3826,9 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } cJSON* subkeepProgress = cJSON_GetObjectItem(subQuery, "keepProgress"); - if (subkeepProgress && subkeepProgress->type == cJSON_String && subkeepProgress->valuestring != NULL) { + if (subkeepProgress && + subkeepProgress->type == cJSON_String + && subkeepProgress->valuestring != NULL) { if (0 == strcmp("yes", subkeepProgress->valuestring)) { g_queryInfo.subQueryInfo.subscribeKeepProgress = 1; } else if (0 == strcmp("no", subkeepProgress->valuestring)) { @@ -3859,27 +3919,27 @@ static bool getInfoFromJsonFile(char* file) { cJSON* filetype = cJSON_GetObjectItem(root, "filetype"); if (filetype && filetype->type == cJSON_String && filetype->valuestring != NULL) { if (0 == strcasecmp("insert", filetype->valuestring)) { - g_args.test_mode = INSERT_MODE; + g_args.test_mode = INSERT_TEST; } else if (0 == strcasecmp("query", filetype->valuestring)) { - g_args.test_mode = QUERY_MODE; + g_args.test_mode = QUERY_TEST; } else if (0 == strcasecmp("subscribe", filetype->valuestring)) { - g_args.test_mode = SUBSCRIBE_MODE; + g_args.test_mode = SUBSCRIBE_TEST; } else { printf("ERROR: failed to read json, filetype not support\n"); goto PARSE_OVER; } } else if (!filetype) { - g_args.test_mode = INSERT_MODE; + g_args.test_mode = INSERT_TEST; } else { printf("ERROR: failed to read json, filetype not found\n"); goto PARSE_OVER; } - if (INSERT_MODE == g_args.test_mode) { + if (INSERT_TEST == g_args.test_mode) { ret = getMetaFromInsertJsonFile(root); - } else if (QUERY_MODE == g_args.test_mode) { + } else if (QUERY_TEST == g_args.test_mode) { ret = getMetaFromQueryJsonFile(root); - } else if (SUBSCRIBE_MODE == g_args.test_mode) { + } else if (SUBSCRIBE_TEST == g_args.test_mode) { ret = getMetaFromQueryJsonFile(root); } else { printf("ERROR: input json file type error! please input correct file type: insert or query or subscribe\n"); @@ -4125,13 +4185,34 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) return affectedRows; } -static int generateDataBuffer(int32_t tableSeq, - threadInfo *pThreadInfo, char *buffer, - int64_t insertRows, - int64_t startFrom, int64_t startTime, int *pSampleUsePos) +static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq) { SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + if (superTblInfo) { + if ((superTblInfo->childTblOffset >= 0) + && (superTblInfo->childTblLimit > 0)) { + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", + superTblInfo->childTblName + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); + } else { + verbosePrint("%s() LN%d: from=%d count=%d seq=%d\n", + __func__, __LINE__, pThreadInfo->start_table_from, + pThreadInfo->ntables, tableSeq); + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", + superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); + } + } else { + snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%d", + superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq); + } +} + +static int generateDataTail(char *tableName, int32_t tableSeq, + threadInfo* pThreadInfo, + SSuperTable* superTblInfo, + int batch, char* buffer, int64_t insertRows, + int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) { + int len = 0; int ncols_per_record = 1; // count first col ts if (superTblInfo == NULL) { @@ -4142,91 +4223,21 @@ static int generateDataBuffer(int32_t tableSeq, } } - assert(buffer != NULL); - - char *pChildTblName; - - pChildTblName = calloc(TSDB_TABLE_NAME_LEN, 1); - if (NULL == pChildTblName) { - fprintf(stderr, "failed to alloc memory %d\n", TSDB_TABLE_NAME_LEN); - return -1; - } - - if (superTblInfo && (superTblInfo->childTblOffset >= 0) - && (superTblInfo->childTblLimit > 0)) { - snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s", - superTblInfo->childTblName + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); - } else { - snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d", - superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq); - } - - memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); - - char *pstr = buffer; - - if (superTblInfo) { - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { - char* tagsValBuf = NULL; - if (0 == superTblInfo->tagSource) { - tagsValBuf = generateTagVaulesForStb(superTblInfo); - } else { - tagsValBuf = getTagValueFromTagSample( - superTblInfo, - tableSeq % superTblInfo->tagSampleCount); - } - if (NULL == tagsValBuf) { - free(pChildTblName); - fprintf(stderr, "tag buf failed to allocate memory\n"); - return -1; - } - - pstr += snprintf(pstr, - superTblInfo->maxSqlLen, - "insert into %s.%s using %s.%s tags %s values", - pThreadInfo->db_name, - pChildTblName, - pThreadInfo->db_name, - superTblInfo->sTblName, - tagsValBuf); - tmfree(tagsValBuf); - } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { - pstr += snprintf(pstr, - superTblInfo->maxSqlLen, - "insert into %s.%s values", - pThreadInfo->db_name, - superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); - } else { - pstr += snprintf(pstr, - (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), - "insert into %s.%s values", - pThreadInfo->db_name, - pChildTblName); - } - } else { - pstr += snprintf(pstr, - g_args.max_sql_len, - "insert into %s.%s values", - pThreadInfo->db_name, - pChildTblName); - } - - int k; - int len = 0; + verbosePrint("%s() LN%d batch=%d\n", __func__, __LINE__, batch); - verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); - for (k = 0; k < g_args.num_of_RPR;) { + int k = 0; + for (k = 0; k < batch;) { if (superTblInfo) { int retLen = 0; if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { retLen = getRowDataFromSample( - pstr + len, + buffer + len, superTblInfo->maxSqlLen - len, startTime + superTblInfo->timeStampStep * startFrom, superTblInfo, - pSampleUsePos); + pSamplePos); } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { int rand_num = rand_tinyint() % 100; @@ -4234,13 +4245,13 @@ static int generateDataBuffer(int32_t tableSeq, && rand_num < superTblInfo->disorderRatio) { int64_t d = startTime - taosRandom() % superTblInfo->disorderRange; retLen = generateRowData( - pstr + len, + buffer + len, superTblInfo->maxSqlLen - len, d, superTblInfo); } else { retLen = generateRowData( - pstr + len, + buffer + len, superTblInfo->maxSqlLen - len, startTime + superTblInfo->timeStampStep * startFrom, superTblInfo); @@ -4248,7 +4259,6 @@ static int generateDataBuffer(int32_t tableSeq, } if (retLen < 0) { - free(pChildTblName); return -1; } @@ -4277,28 +4287,288 @@ static int generateDataBuffer(int32_t tableSeq, lenOfBinary); } - pstr += sprintf(pstr, " %s", data); - if (len + pstr - buffer >= (g_args.max_sql_len - 256)) { // too long + buffer += sprintf(buffer, " %s", data); + if (strlen(buffer) >= (g_args.max_sql_len - 256)) { // too long k++; break; } } - verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); + verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", + __func__, __LINE__, len, k, buffer); k++; startFrom ++; - if (startFrom >= insertRows) + if (startFrom >= insertRows) { break; + } } - if (pChildTblName) - free(pChildTblName); + *dataLen = len; + return k; +} +static int generateSQLHead(char *tableName, int32_t tableSeq, threadInfo* pThreadInfo, SSuperTable* superTblInfo, char *buffer) +{ + int len; + if (superTblInfo) { + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + char* tagsValBuf = NULL; + if (0 == superTblInfo->tagSource) { + tagsValBuf = generateTagVaulesForStb(superTblInfo); + } else { + tagsValBuf = getTagValueFromTagSample( + superTblInfo, + tableSeq % superTblInfo->tagSampleCount); + } + if (NULL == tagsValBuf) { + fprintf(stderr, "tag buf failed to allocate memory\n"); + return -1; + } + + len = snprintf(buffer, + superTblInfo->maxSqlLen, + "insert into %s.%s using %s.%s tags %s values", + pThreadInfo->db_name, + tableName, + pThreadInfo->db_name, + superTblInfo->sTblName, + tagsValBuf); + tmfree(tagsValBuf); + } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { + len = snprintf(buffer, + superTblInfo->maxSqlLen, + "insert into %s.%s values", + pThreadInfo->db_name, + superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); + } else { + len = snprintf(buffer, + (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), + "insert into %s.%s values", + pThreadInfo->db_name, + tableName); + } + } else { + len = snprintf(buffer, + g_args.max_sql_len, + "insert into %s.%s values", + pThreadInfo->db_name, + tableName); + } + + return len; +} + +static int generateDataBuffer(char *pTblName, + int32_t tableSeq, + threadInfo *pThreadInfo, char *buffer, + int64_t insertRows, + int64_t startFrom, int64_t startTime, int *pSamplePos) +{ + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + + int ncols_per_record = 1; // count first col ts + + if (superTblInfo == NULL) { + int datatypeSeq = 0; + while(g_args.datatype[datatypeSeq]) { + datatypeSeq ++; + ncols_per_record ++; + } + } + + assert(buffer != NULL); + + memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); + + char *pstr = buffer; + + int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo, buffer); + pstr += headLen; + + int k; + int dataLen; + k = generateDataTail(pTblName, tableSeq, pThreadInfo, superTblInfo, + g_args.num_of_RPR, pstr, insertRows, startFrom, startTime, + pSamplePos, &dataLen); return k; } +static void* syncWriteInterlace(threadInfo *pThreadInfo) { + printf("### CBD: interlace write\n"); + + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; + + char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); + if (NULL == buffer) { + fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n", + superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, + strerror(errno)); + return NULL; + } + + int insertMode; + char tableName[TSDB_TABLE_NAME_LEN]; + + int rowsPerTbl = superTblInfo?superTblInfo->rowsPerTbl:g_args.rows_per_tbl; + + if (rowsPerTbl > 0) { + insertMode = INTERLACE_INSERT_MODE; + } else { + insertMode = PROGRESSIVE_INSERT_MODE; + } + + // rows per table need be less than insert batch + if (rowsPerTbl > g_args.num_of_RPR) + rowsPerTbl = g_args.num_of_RPR; + + pThreadInfo->totalInsertRows = 0; + pThreadInfo->totalAffectedRows = 0; + + int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; + uint64_t st = 0; + uint64_t et = 0xffffffff; + + int64_t lastPrintTime = taosGetTimestampMs(); + int64_t startTs = taosGetTimestampUs(); + int64_t endTs; + + int tableSeq = pThreadInfo->start_table_from; + + debugPrint("%s() LN%d: start_table_from=%d ntables=%d insertRows=%"PRId64"\n", + __func__, __LINE__, pThreadInfo->start_table_from, + pThreadInfo->ntables, insertRows); + + int64_t startTime = pThreadInfo->start_time; + + int batchPerTblTimes; + int prevBatchPerTbl, lastBatchPerTbl; + + if (pThreadInfo->ntables == 1) { + batchPerTblTimes = 1; + lastBatchPerTbl = rowsPerTbl; + prevBatchPerTbl = rowsPerTbl; + } else if (rowsPerTbl > 0) { + batchPerTblTimes = g_args.num_of_RPR / rowsPerTbl; + lastBatchPerTbl = g_args.num_of_RPR % rowsPerTbl; + + if (lastBatchPerTbl > 0) + batchPerTblTimes += 1; + else + lastBatchPerTbl = rowsPerTbl; + prevBatchPerTbl = rowsPerTbl; + } else { + batchPerTblTimes = 1; + prevBatchPerTbl = g_args.num_of_RPR; + lastBatchPerTbl = g_args.num_of_RPR; + } + + while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { + if (insert_interval) { + st = taosGetTimestampUs(); + } + // generate data + memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); + + char *pstr = buffer; + int recGenerated = 0; + + for (int i = 0; i < batchPerTblTimes; i ++) { + if (insertMode == INTERLACE_INSERT_MODE) { + if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { + // turn to first table + tableSeq = pThreadInfo->start_table_from; + } + } + getTableName(tableName, pThreadInfo, tableSeq); + + int headLen; + if (i == 0) { + headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo, pstr); + } else { + headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values", + pThreadInfo->db_name, + tableName); + } + + // generate data buffer + verbosePrint("%s() LN%d i=%d buffer:\n%s\n", + __func__, __LINE__, i, buffer); + + pstr += headLen; + int dataLen = 0; + + int batchPerTbl; + if (i == batchPerTblTimes - 1) { + batchPerTbl = lastBatchPerTbl; + } else { + batchPerTbl = prevBatchPerTbl; + } + + verbosePrint("%s() LN%d batchPerTbl = %d\n", + __func__, __LINE__, batchPerTbl); + int numOfRecGenerated = generateDataTail( + tableName, tableSeq, pThreadInfo, superTblInfo, + batchPerTbl, pstr, insertRows, 0, + startTime + pThreadInfo->totalInsertRows * superTblInfo->timeStampStep, + &(pThreadInfo->samplePos), &dataLen); + verbosePrint("%s() LN%d numOfRecGenerated= %d\n", + __func__, __LINE__, numOfRecGenerated); + pstr += dataLen; + recGenerated += numOfRecGenerated; + + tableSeq ++; + } + verbosePrint("%s() LN%d buffer:\n%s\n", + __func__, __LINE__, buffer); + pThreadInfo->totalInsertRows += recGenerated; + + int affectedRows = execInsert(pThreadInfo, buffer, recGenerated); + if (affectedRows < 0) + goto free_and_statistics_interlace; + + pThreadInfo->totalAffectedRows += affectedRows; + + endTs = taosGetTimestampUs(); + int64_t delay = endTs - startTs; + if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; + if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; + pThreadInfo->cntDelay++; + pThreadInfo->totalDelay += delay; + + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 30*1000) { + printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); + lastPrintTime = currentPrintTime; + } + + if (insert_interval) { + et = taosGetTimestampUs(); + + if (insert_interval > ((et - st)/1000) ) { + int sleep_time = insert_interval - (et -st)/1000; + verbosePrint("%s() LN%d sleep: %d ms for insert interval\n", + __func__, __LINE__, sleep_time); + taosMsleep(sleep_time); // ms + } + } + } + +free_and_statistics_interlace: + tmfree(buffer); + + printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); + return NULL; +} + // sync insertion /* 1 thread: 100 tables * 2000 rows/s @@ -4307,10 +4577,9 @@ static int generateDataBuffer(int32_t tableSeq, 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s */ -static void* syncWrite(void *sarg) { +static void* syncWriteProgressive(threadInfo *pThreadInfo) { - threadInfo *winfo = (threadInfo *)sarg; - SSuperTable* superTblInfo = winfo->superTblInfo; + SSuperTable* superTblInfo = pThreadInfo->superTblInfo; char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); if (NULL == buffer) { @@ -4324,19 +4593,18 @@ static void* syncWrite(void *sarg) { int64_t startTs = taosGetTimestampUs(); int64_t endTs; - int insert_interval = superTblInfo?superTblInfo->insertInterval: - g_args.insert_interval; + int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; uint64_t st = 0; uint64_t et = 0xffffffff; - winfo->totalInsertRows = 0; - winfo->totalAffectedRows = 0; + pThreadInfo->totalInsertRows = 0; + pThreadInfo->totalAffectedRows = 0; - winfo->samplePos = 0; + pThreadInfo->samplePos = 0; - for (uint32_t tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; + for (uint32_t tableSeq = pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to; tableSeq ++) { - int64_t start_time = winfo->start_time; + int64_t start_time = pThreadInfo->start_time; int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); @@ -4346,34 +4614,39 @@ static void* syncWrite(void *sarg) { st = taosGetTimestampUs(); } - int generated = generateDataBuffer(tableSeq, winfo, buffer, insertRows, - i, start_time, &(winfo->samplePos)); + char tableName[TSDB_TABLE_NAME_LEN]; + getTableName(tableName, pThreadInfo, tableSeq); + verbosePrint("%s() LN%d: tid=%d seq=%d tableName=%s\n", + __func__, __LINE__, + pThreadInfo->threadID, tableSeq, tableName); + + int generated = generateDataBuffer(tableName, tableSeq, pThreadInfo, buffer, insertRows, + i, start_time, &(pThreadInfo->samplePos)); if (generated > 0) i += generated; else goto free_and_statistics_2; - int affectedRows = execInsert(winfo, buffer, generated); + int affectedRows = execInsert(pThreadInfo, buffer, generated); if (affectedRows < 0) goto free_and_statistics_2; - - winfo->totalInsertRows += generated; - winfo->totalAffectedRows += affectedRows; + pThreadInfo->totalInsertRows += generated; + pThreadInfo->totalAffectedRows += affectedRows; endTs = taosGetTimestampUs(); int64_t delay = endTs - startTs; - if (delay > winfo->maxDelay) winfo->maxDelay = delay; - if (delay < winfo->minDelay) winfo->minDelay = delay; - winfo->cntDelay++; - winfo->totalDelay += delay; + if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; + if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; + pThreadInfo->cntDelay++; + pThreadInfo->totalDelay += delay; int64_t currentPrintTime = taosGetTimestampMs(); if (currentPrintTime - lastPrintTime > 30*1000) { printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", - winfo->threadID, - winfo->totalInsertRows, - winfo->totalAffectedRows); + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); lastPrintTime = currentPrintTime; } @@ -4391,10 +4664,11 @@ static void* syncWrite(void *sarg) { } } // num_of_DPT - if ((tableSeq == winfo->end_table_id) && superTblInfo && + if ((tableSeq == pThreadInfo->ntables - 1) && superTblInfo && (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample")))) { - printf("%s() LN%d samplePos=%d\n", __func__, __LINE__, winfo->samplePos); + printf("%s() LN%d samplePos=%d\n", + __func__, __LINE__, pThreadInfo->samplePos); } } // tableSeq @@ -4402,12 +4676,28 @@ free_and_statistics_2: tmfree(buffer); printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", - winfo->threadID, - winfo->totalInsertRows, - winfo->totalAffectedRows); + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); return NULL; } +static void* syncWrite(void *sarg) { + + threadInfo *winfo = (threadInfo *)sarg; + SSuperTable* superTblInfo = winfo->superTblInfo; + + int rowsPerTbl = superTblInfo?superTblInfo->rowsPerTbl:g_args.rows_per_tbl; + + if (rowsPerTbl > 0) { + // interlace mode + return syncWriteInterlace(winfo); + } else { + // progressive mode + return syncWriteProgressive(winfo); + } +} + void callBack(void *param, TAOS_RES *res, int code) { threadInfo* winfo = (threadInfo*)param; SSuperTable* superTblInfo = winfo->superTblInfo; @@ -4423,13 +4713,14 @@ void callBack(void *param, TAOS_RES *res, int code) { char *buffer = calloc(1, winfo->superTblInfo->maxSqlLen); char *data = calloc(1, MAX_DATA_SIZE); char *pstr = buffer; - pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id); + pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, + winfo->start_table_from); // if (winfo->counter >= winfo->superTblInfo->insertRows) { if (winfo->counter >= g_args.num_of_RPR) { - winfo->start_table_id++; + winfo->start_table_from++; winfo->counter = 0; } - if (winfo->start_table_id > winfo->end_table_id) { + if (winfo->start_table_from > winfo->end_table_to) { tsem_post(&winfo->lock_sem); free(buffer); free(data); @@ -4553,7 +4844,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, &start_time, strlen(superTblInfo->startTimestamp), timePrec, 0)) { - printf("ERROR to parse time!\n"); + fprintf(stderr, "ERROR to parse time!\n"); exit(-1); } } @@ -4563,12 +4854,12 @@ static void startMultiThreadInsertData(int threads, char* db_name, double start = getCurrentTime(); - int last; + int startFrom; if ((superTblInfo) && (superTblInfo->childTblOffset >= 0)) - last = superTblInfo->childTblOffset; + startFrom = superTblInfo->childTblOffset; else - last = 0; + startFrom = 0; // read sample data from file first if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, @@ -4609,6 +4900,55 @@ static void startMultiThreadInsertData(int threads, char* db_name, taos_close(taos); } + // read sample data from file first + if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, + "sample", strlen("sample")))) { + if (0 != prepareSampleDataForSTable(superTblInfo)) { + fprintf(stderr, "prepare sample data for stable failed!\n"); + exit(-1); + } + } + + + TAOS* taos = taos_connect( + g_Dbs.host, g_Dbs.user, + g_Dbs.password, db_name, g_Dbs.port); + if (NULL == taos) { + fprintf(stderr, "connect to server fail , reason: %s\n", + taos_errstr(NULL)); + exit(-1); + } + + if (superTblInfo) { + + int limit, offset; + if (superTblInfo && (superTblInfo->childTblOffset >= 0) + && (superTblInfo->childTblLimit > 0)) { + limit = superTblInfo->childTblLimit; + offset = superTblInfo->childTblOffset; + } else { + limit = superTblInfo->childTblCount; + offset = 0; + } + + superTblInfo->childTblName = (char*)calloc(1, + limit * TSDB_TABLE_NAME_LEN); + if (superTblInfo->childTblName == NULL) { + fprintf(stderr, "alloc memory failed!"); + taos_close(taos); + exit(-1); + } + + int childTblCount; + getChildNameOfSuperTableWithLimitAndOffset( + taos, + db_name, superTblInfo->sTblName, + &superTblInfo->childTblName, &childTblCount, + limit, + offset); + } + taos_close(taos); + for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; @@ -4625,7 +4965,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == t_info->taos) { - printf("connect to server fail from insert sub thread, reason: %s\n", + fprintf(stderr, "connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL)); exit(-1); } @@ -4635,12 +4975,13 @@ static void startMultiThreadInsertData(int threads, char* db_name, if ((NULL == superTblInfo) || (0 == superTblInfo->multiThreadWriteOneTbl)) { - t_info->start_table_id = last; - t_info->end_table_id = i < b ? last + a : last + a - 1; - last = t_info->end_table_id + 1; + t_info->start_table_from = startFrom; + t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; + startFrom = t_info->end_table_to + 1; } else { - t_info->start_table_id = 0; - t_info->end_table_id = superTblInfo->childTblCount - 1; + t_info->start_table_from = 0; + t_info->ntables = superTblInfo->childTblCount; t_info->start_time = t_info->start_time + rand_int() % 10000 - rand_tinyint(); } @@ -4671,6 +5012,9 @@ static void startMultiThreadInsertData(int threads, char* db_name, if (superTblInfo) { superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalInsertRows += t_info->totalInsertRows; + } else { + g_args.totalAffectedRows += t_info->totalAffectedRows; + g_args.totalInsertRows += t_info->totalInsertRows; } totalDelay += t_info->totalDelay; @@ -4698,6 +5042,18 @@ static void startMultiThreadInsertData(int threads, char* db_name, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, superTblInfo->totalInsertRows/ t); + } else { + printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s %2.f records/second\n\n", + t, g_args.totalInsertRows, + g_args.totalAffectedRows, + threads, db_name, + g_args.totalInsertRows / t); + fprintf(g_fpOfInsertResult, + "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s %2.f records/second\n\n", + t, g_args.totalInsertRows, + g_args.totalAffectedRows, + threads, db_name, + g_args.totalInsertRows / t); } printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n", @@ -4720,7 +5076,7 @@ void *readTable(void *sarg) { char *tb_prefix = rinfo->tb_prefix; FILE *fp = fopen(rinfo->fp, "a"); if (NULL == fp) { - printf("fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno)); + fprintf(stderr, "fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno)); return NULL; } @@ -4732,7 +5088,7 @@ void *readTable(void *sarg) { num_of_DPT = g_args.num_of_DPT; // } - int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; + int num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1; int totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -4794,7 +5150,7 @@ void *readMetric(void *sarg) { } int num_of_DPT = rinfo->superTblInfo->insertRows; - int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; + int num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1; int totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -4842,7 +5198,8 @@ void *readMetric(void *sarg) { } t = getCurrentTime() - t; - fprintf(fp, "| Speed: %12.2f(per s) | Latency: %.4f(ms) |\n", num_of_tables * num_of_DPT / t, t * 1000); + fprintf(fp, "| Speed: %12.2f(per s) | Latency: %.4f(ms) |\n", + num_of_tables * num_of_DPT / t, t * 1000); printf("select %10s took %.6f second(s)\n\n", aggreFunc[j], t); taos_free_result(pSql); @@ -4956,7 +5313,7 @@ void *superQueryProcess(void *sarg) { while (1) { if (g_queryInfo.superQueryInfo.rate && (et - st) < (int64_t)g_queryInfo.superQueryInfo.rate*1000) { taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms - //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); + //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); } st = taosGetTimestampUs(); @@ -5020,13 +5377,14 @@ static void *subQueryProcess(void *sarg) { int64_t st = 0; int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000; while (1) { - if (g_queryInfo.subQueryInfo.rate && (et - st) < (int64_t)g_queryInfo.subQueryInfo.rate*1000) { + if (g_queryInfo.subQueryInfo.rate + && (et - st) < (int64_t)g_queryInfo.subQueryInfo.rate*1000) { taosMsleep(g_queryInfo.subQueryInfo.rate*1000 - (et - st)); // ms - //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); + //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); } st = taosGetTimestampUs(); - for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { + for (int i = winfo->start_table_from; i <= winfo->end_table_to; i++) { for (int j = 0; j < g_queryInfo.subQueryInfo.sqlCount; j++) { memset(sqlstr,0,sizeof(sqlstr)); replaceSubTblName(g_queryInfo.subQueryInfo.sql[j], sqlstr, i); @@ -5042,8 +5400,8 @@ static void *subQueryProcess(void *sarg) { et = taosGetTimestampUs(); printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", taosGetSelfPthreadId(), - winfo->start_table_id, - winfo->end_table_id, + winfo->start_table_from, + winfo->end_table_to, (double)(et - st)/1000000.0); } return NULL; @@ -5081,7 +5439,8 @@ static int queryTestProcess() { pthread_t *pids = NULL; threadInfo *infos = NULL; //==== create sub threads for query from specify table - if (g_queryInfo.superQueryInfo.sqlCount > 0 && g_queryInfo.superQueryInfo.concurrent > 0) { + if (g_queryInfo.superQueryInfo.sqlCount > 0 + && g_queryInfo.superQueryInfo.concurrent > 0) { pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t)); infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); @@ -5139,14 +5498,15 @@ static int queryTestProcess() { b = ntables % threads; } - int last = 0; + int startFrom = 0; for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; t_info->threadID = i; - t_info->start_table_id = last; - t_info->end_table_id = i < b ? last + a : last + a - 1; - last = t_info->end_table_id + 1; + t_info->start_table_from = startFrom; + t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; + startFrom = t_info->end_table_to + 1; t_info->taos = taos; pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info); } @@ -5222,7 +5582,7 @@ void *subSubscribeProcess(void *sarg) { do { //if (g_queryInfo.superQueryInfo.rate && (et - st) < g_queryInfo.superQueryInfo.rate*1000) { // taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms - // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); + // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); //} //st = taosGetTimestampMs(); @@ -5288,7 +5648,7 @@ void *superSubscribeProcess(void *sarg) { do { //if (g_queryInfo.superQueryInfo.rate && (et - st) < g_queryInfo.superQueryInfo.rate*1000) { // taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms - // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); + // //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); //} //st = taosGetTimestampMs(); @@ -5417,13 +5777,15 @@ static int subscribeTestProcess() { b = ntables % threads; } - int last = 0; + int startFrom = 0; for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; t_info->threadID = i; - t_info->start_table_id = last; - t_info->end_table_id = i < b ? last + a : last + a - 1; + t_info->start_table_from = startFrom; + t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; + startFrom = t_info->end_table_to + 1; t_info->taos = taos; pthread_create(pidsOfSub + i, NULL, subSubscribeProcess, t_info); } @@ -5665,14 +6027,14 @@ void querySqlFile(TAOS* taos, char* sqlFile) } static void testMetaFile() { - if (INSERT_MODE == g_args.test_mode) { + if (INSERT_TEST == g_args.test_mode) { if (g_Dbs.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir); insertTestProcess(); - } else if (QUERY_MODE == g_args.test_mode) { + } else if (QUERY_TEST == g_args.test_mode) { if (g_queryInfo.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir); queryTestProcess(); - } else if (SUBSCRIBE_MODE == g_args.test_mode) { + } else if (SUBSCRIBE_TEST == g_args.test_mode) { if (g_queryInfo.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir); subscribeTestProcess(); @@ -5689,16 +6051,18 @@ static void queryResult() { pthread_t read_id; threadInfo *rInfo = malloc(sizeof(threadInfo)); rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000 - rInfo->start_table_id = 0; + rInfo->start_table_from = 0; //rInfo->do_aggreFunc = g_Dbs.do_aggreFunc; if (g_args.use_metric) { - rInfo->end_table_id = g_Dbs.db[0].superTbls[0].childTblCount - 1; + rInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount; + rInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1; rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0]; strcpy(rInfo->tb_prefix, g_Dbs.db[0].superTbls[0].childTblPrefix); } else { - rInfo->end_table_id = g_args.num_of_tables -1; + rInfo->ntables = g_args.num_of_tables; + rInfo->end_table_to = g_args.num_of_tables -1; strcpy(rInfo->tb_prefix, g_args.tb_prefix); } @@ -5729,7 +6093,7 @@ static void queryResult() { static void testCmdLine() { - g_args.test_mode = INSERT_MODE; + g_args.test_mode = INSERT_TEST; insertTestProcess(); if (g_Dbs.insert_only) diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index 409d684d0180aefeb519d2387c177f50194391ed..46a1abf12c9f127a94757bc3b64995c68532f201 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -231,12 +231,13 @@ python3 test.py -f query/queryInterval.py python3 test.py -f query/queryFillTest.py # tools -python3 test.py -f tools/lowaTest.py python3 test.py -f tools/taosdemoTest.py python3 test.py -f tools/taosdemoTestWithoutMetric.py +python3 test.py -f tools/taosdemoTestWithJson.py python3 test.py -f tools/taosdemoTestLimitOffset.py python3 test.py -f tools/taosdumpTest.py python3 test.py -f tools/taosdemoTest2.py +python3 test.py -f tools/taosdemoTestSampleData.py # subscribe python3 test.py -f subscribe/singlemeter.py @@ -291,6 +292,5 @@ python3 ./test.py -f insert/boundary2.py python3 ./test.py -f alter/alter_debugFlag.py python3 ./test.py -f query/queryBetweenAnd.py python3 ./test.py -f tag_lite/alter_tag.py -python3 test.py -f tools/taosdemoTestSampleData.py -#======================p4-end=============== \ No newline at end of file +#======================p4-end=============== diff --git a/tests/pytest/tools/insert-interlace.json b/tests/pytest/tools/insert-interlace.json new file mode 100644 index 0000000000000000000000000000000000000000..a2ff2c001cf5aad24d2ecd7d9766f5ee62d6f3a5 --- /dev/null +++ b/tests/pytest/tools/insert-interlace.json @@ -0,0 +1,58 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 4, + "thread_count_create_tbl": 4, + "result_file": "./insert_res.txt", + "confirm_parameter_prompt": "no", + "insert_interval": 5000, + "rows_per_tbl": 50, + "num_of_records_per_req": 100, + "max_sql_len": 1024000, + "databases": [{ + "dbinfo": { + "name": "db", + "drop": "yes", + "replica": 1, + "days": 10, + "cache": 16, + "blocks": 8, + "precision": "ms", + "keep": 365, + "minRows": 100, + "maxRows": 4096, + "comp":2, + "walLevel":1, + "cachelast":0, + "quorum":1, + "fsync":3000, + "update": 0 + }, + "super_tables": [{ + "name": "stb", + "child_table_exists":"no", + "childtable_count": 9, + "childtable_prefix": "stb_", + "auto_create_table": "no", + "data_source": "rand", + "insert_mode": "taosc", + "insert_rows": 250, + "multi_thread_write_one_tbl": "no", + "rows_per_tbl": 80, + "max_sql_len": 1024000, + "disorder_ratio": 0, + "disorder_range": 1000, + "timestamp_step": 1, + "start_timestamp": "2020-10-01 00:00:00.000", + "sample_format": "csv", + "sample_file": "./sample.csv", + "tags_file": "", + "columns": [{"type": "INT"}], + "tags": [{"type": "INT", "count":1}] + }] + }] +} diff --git a/tests/pytest/tools/insert-tblimit-tboffset0.json b/tests/pytest/tools/insert-tblimit-tboffset0.json index 302744cab933297d8455aed9c45210385f1ec56f..7dcb2e052745e545d1693d97ce28350d00745e55 100644 --- a/tests/pytest/tools/insert-tblimit-tboffset0.json +++ b/tests/pytest/tools/insert-tblimit-tboffset0.json @@ -44,7 +44,6 @@ "childtable_offset": 0, "multi_thread_write_one_tbl": "no", "number_of_tbl_in_one_sql": 0, - "rows_per_tbl": 100, "max_sql_len": 1024000, "disorder_ratio": 0, "disorder_range": 1000, diff --git a/tests/pytest/tools/insert-tblimit1-tboffset.json b/tests/pytest/tools/insert-tblimit1-tboffset.json new file mode 100644 index 0000000000000000000000000000000000000000..a33dc22d5d5e2c88c9b6220fbe53390dfcd4f1ce --- /dev/null +++ b/tests/pytest/tools/insert-tblimit1-tboffset.json @@ -0,0 +1,59 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 4, + "thread_count_create_tbl": 4, + "result_file": "./insert_res.txt", + "confirm_parameter_prompt": "no", + "insert_interval": 0, + "num_of_records_per_req": 100, + "max_sql_len": 1024000, + "databases": [{ + "dbinfo": { + "name": "db", + "drop": "yes", + "replica": 1, + "days": 10, + "cache": 16, + "blocks": 8, + "precision": "ms", + "keep": 365, + "minRows": 100, + "maxRows": 4096, + "comp":2, + "walLevel":1, + "cachelast":0, + "quorum":1, + "fsync":3000, + "update": 0 + }, + "super_tables": [{ + "name": "stb", + "child_table_exists":"no", + "childtable_count": 100, + "childtable_prefix": "stb_", + "auto_create_table": "no", + "data_source": "rand", + "insert_mode": "taosc", + "insert_rows": 1000, + "childtable_limit": 1, + "childtable_offset": 50, + "multi_thread_write_one_tbl": "no", + "number_of_tbl_in_one_sql": 0, + "max_sql_len": 1024000, + "disorder_ratio": 0, + "disorder_range": 1000, + "timestamp_step": 1, + "start_timestamp": "2020-10-01 00:00:00.000", + "sample_format": "csv", + "sample_file": "./sample.csv", + "tags_file": "", + "columns": [{"type": "INT"}], + "tags": [{"type": "TINYINT", "count":1}] + }] + }] +} diff --git a/tests/pytest/tools/insert.json b/tests/pytest/tools/insert.json index c3fa78076b2a25f73ebc50f6a35bcc5afddb246d..51e9dcc8205bbe69ca74bd8f3a5c95de4c130b5f 100644 --- a/tests/pytest/tools/insert.json +++ b/tests/pytest/tools/insert.json @@ -1,50 +1,51 @@ { "filetype":"insert", "cfgdir": "/etc/taos", - "host": "127.0.0.1", - "port": 6030, - "user": "root", - "password": "taosdata", - "thread_count": 1, - "databases": [{ - "dbinfo": { - "name": "db01", - "replica": 1, - "days": 10, - "cache": 16, - "blocks": 8, - "precision": "ms", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 4, + "thread_count_create_tbl": 4, + "databases": [{ + "dbinfo": { + "name": "db01", + "replica": 1, + "days": 10, + "cache": 16, + "blocks": 8, + "precision": "ms", "update": 0, - "maxtablesPerVnode": 1000 - }, - "super_tables": [{ - "name": "stb01", - "childtable_count": 100, - "childtable_prefix": "stb01_", - "auto_create_table": "no", - "data_source": "rand", - "insert_mode": "taosc", - "insert_rate": 0, - "insert_rows": 1000, - "timestamp_step": 1000, - "start_timestamp": "2020-10-01 00:00:00.000", - "sample_format": "csv", - "sample_file": "/home/data/sample.csv", - "tags_file": "", - "columns": [{ - "type": "SMALLINT" - }, { - "type": "BOOL" - }, { - "type": "BINARY", - "len": 6 - }], - "tags": [{ - "type": "INT" - },{ - "type": "BINARY", - "len": 4 - }] - }] - }] + "maxtablesPerVnode": 1000 + }, + "super_tables": [{ + "name": "stb01", + "childtable_count": 100, + "childtable_prefix": "stb01_", + "auto_create_table": "no", + "data_source": "rand", + "insert_mode": "taosc", + "insert_rate": 0, + "insert_rows": 1000, + "timestamp_step": 1000, + "start_timestamp": "2020-10-01 00:00:00.000", + "sample_format": "csv", + "sample_file": "/home/data/sample.csv", + "tags_file": "", + "columns": [{ + "type": "SMALLINT" + }, { + "type": "BOOL" + }, { + "type": "BINARY", + "len": 6 + }], + "tags": [{ + "type": "INT" + },{ + "type": "BINARY", + "len": 4 + }] + }] + }] } diff --git a/tests/pytest/tools/taosdemoTestInterlace.py b/tests/pytest/tools/taosdemoTestInterlace.py new file mode 100644 index 0000000000000000000000000000000000000000..9ceb8c5cbbf5405035749972154cea2be8bfbcee --- /dev/null +++ b/tests/pytest/tools/taosdemoTestInterlace.py @@ -0,0 +1,68 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.numberOfTables = 10000 + self.numberOfRecords = 100 + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + return buildPath + + def run(self): + tdSql.prepare() + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + binPath = buildPath+ "/build/bin/" + os.system("%staosdemo -f tools/insert-interlace.json" % binPath) + + tdSql.execute("use db") + tdSql.query("select count(tbname) from db.stb") + tdSql.checkData(0, 0, 100) + tdSql.query("select count(*) from db.stb") + tdSql.checkData(0, 0, 33000) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/tools/taosdemoTestLimitOffset.py b/tests/pytest/tools/taosdemoTestLimitOffset.py index 69a81b166b69098bf4f7bba275cd80e291c934b8..bce41e1c75817c2939d4c1104419771483a9a689 100644 --- a/tests/pytest/tools/taosdemoTestLimitOffset.py +++ b/tests/pytest/tools/taosdemoTestLimitOffset.py @@ -68,6 +68,15 @@ class TDTestCase: tdSql.query("select count(*) from db.stb") tdSql.checkData(0, 0, 20000) + os.system("%staosdemo -f tools/insert-tblimit1-tboffset.json" % binPath) + + tdSql.execute("reset query cache") + tdSql.execute("use db") + tdSql.query("select count(tbname) from db.stb") + tdSql.checkData(0, 0, 100) + tdSql.query("select count(*) from db.stb") + tdSql.checkData(0, 0, 1000) + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/pytest/tools/lowaTest.py b/tests/pytest/tools/taosdemoTestWithJson.py similarity index 100% rename from tests/pytest/tools/lowaTest.py rename to tests/pytest/tools/taosdemoTestWithJson.py