diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 5eab2fde63feaff4f8840dd53c8a7a3e0f549dc4..80cbecb96b7b2db432b996c95bc46a3db7d8cb0b 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -80,7 +80,6 @@ extern char configDir[]; #define OPT_ABORT 1 /* –abort */ #define STRING_LEN 60000 #define MAX_PREPARED_RAND 1000000 -//#define MAX_SQL_SIZE 65536 #define MAX_FILE_NAME_LEN 256 #define MAX_SAMPLES_ONCE_FROM_FILE 10000 @@ -240,7 +239,7 @@ typedef struct SSuperTable_S { StrColumn tags[MAX_TAG_COUNT]; char* childTblName; - char* colsOfCreatChildTable; + char* colsOfCreateChildTable; int lenOfOneRow; int lenOfTagOfOneRow; @@ -458,9 +457,9 @@ void resetAfterAnsiEscape(void) { } #endif -int createDatabases(); -void createChildTables(); -int queryDbExec(TAOS *taos, char *command, int type); +static int createDatabases(); +static void createChildTables(); +static int queryDbExec(TAOS *taos, char *command, int type); /* ************ Global variables ************ */ @@ -638,6 +637,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { sptr = arguments->datatype; ++i; if (strstr(argv[i], ",") == NULL) { + // only one col if (strcasecmp(argv[i], "INT") && strcasecmp(argv[i], "FLOAT") && strcasecmp(argv[i], "TINYINT") @@ -653,6 +653,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { } sptr[0] = argv[i]; } else { + // more than one col int index = 0; char *dupstr = strdup(argv[i]); char *running = dupstr; @@ -675,6 +676,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { token = strsep(&running, ","); if (index >= MAX_NUM_DATATYPE) break; } + sptr[index] = NULL; } } else if (strcmp(argv[i], "-w") == 0) { arguments->len_of_binary = atoi(argv[++i]); @@ -735,6 +737,15 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { printf("# User: %s\n", arguments->user); printf("# Password: %s\n", arguments->password); printf("# Use metric: %s\n", arguments->use_metric ? "true" : "false"); + if (*(arguments->datatype)) { + printf("# Specified data type: "); + for (int i = 0; i < MAX_NUM_DATATYPE; i++) + if (arguments->datatype[i]) + printf("%s,", arguments->datatype[i]); + else + break; + printf("\n"); + } printf("# Insertion interval: %d\n", arguments->insert_interval); printf("# Number of Columns per record: %d\n", arguments->num_of_RPR); printf("# Number of Threads: %d\n", arguments->num_of_threads); @@ -774,7 +785,7 @@ void tmfree(char *buf) { } } -int queryDbExec(TAOS *taos, char *command, int type) { +static int queryDbExec(TAOS *taos, char *command, int type) { int i; TAOS_RES *res = NULL; int32_t code = -1; @@ -784,7 +795,7 @@ int queryDbExec(TAOS *taos, char *command, int type) { taos_free_result(res); res = NULL; } - + res = taos_query(taos, command); code = taos_errno(res); if (0 == code) { @@ -793,7 +804,7 @@ int queryDbExec(TAOS *taos, char *command, int type) { } if (code != 0) { - debugPrint("DEBUG %s() %d - command: %s\n", __func__, __LINE__, command); + debugPrint("DEBUG %s() LN%d - command: %s\n", __func__, __LINE__, command); fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res)); taos_free_result(res); //taos_close(taos); @@ -1965,13 +1976,14 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, //printf("%s.%s column count:%d, column length:%d\n\n", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName, g_Dbs.db[i].superTbls[j].columnCount, lenOfOneRow); // save for creating child table - superTbls->colsOfCreatChildTable = (char*)calloc(len+20, 1); - if (NULL == superTbls->colsOfCreatChildTable) { + superTbls->colsOfCreateChildTable = (char*)calloc(len+20, 1); + if (NULL == superTbls->colsOfCreateChildTable) { printf("Failed when calloc, size:%d", len+1); taos_close(taos); exit(-1); } - snprintf(superTbls->colsOfCreatChildTable, len+20, "(ts timestamp%s)", cols); + snprintf(superTbls->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols); + debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, superTbls->colsOfCreateChildTable); if (use_metric) { char tags[STRING_LEN] = "\0"; @@ -2020,19 +2032,23 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, len += snprintf(tags + len, STRING_LEN - len, ")"); superTbls->lenOfTagOfOneRow = lenOfTagOfOneRow; - - snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s (ts timestamp%s) tags %s", dbName, superTbls->sTblName, cols, tags); - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + + snprintf(command, BUFFER_SIZE, + "create table if not exists %s.%s (ts timestamp%s) tags %s", + dbName, superTbls->sTblName, cols, tags); + debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, command); + if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { - return -1; + fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName); + return -1; } - printf("\ncreate supertable %s success!\n\n", superTbls->sTblName); + debugPrint("DEBUG - create supertable %s success!\n\n", superTbls->sTblName); } return 0; } -int createDatabases() { +static int createDatabases() { TAOS * taos = NULL; int ret = 0; taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port); @@ -2112,7 +2128,7 @@ int createDatabases() { dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "precision \'%s\';", g_Dbs.db[i].dbCfg.precision); } - + debugPrint("DEBUG %s() %d \n", __func__, __LINE__); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { taos_close(taos); @@ -2138,8 +2154,6 @@ int createDatabases() { printf("\ncreate super table %d failed!\n\n", j); taos_close(taos); return -1; - } else { - printf("\ncreate super table %d success!\n\n", j); } } } @@ -2148,7 +2162,7 @@ int createDatabases() { return 0; } -void * createTable(void *sarg) +static void* createTable(void *sarg) { threadInfo *winfo = (threadInfo *)sarg; SSuperTable* superTblInfo = winfo->superTblInfo; @@ -2161,7 +2175,7 @@ void * createTable(void *sarg) else buff_len = BUFFER_SIZE; - char *buffer = calloc(superTblInfo->maxSqlLen, 1); + char *buffer = calloc(buff_len, 1); if (buffer == NULL) { fprintf(stderr, "Memory allocated failed!"); exit(-1); @@ -2175,8 +2189,8 @@ void * createTable(void *sarg) snprintf(buffer, buff_len, "create table if not exists %s.%s%d %s;", winfo->db_name, - superTblInfo->childTblPrefix, i, - superTblInfo->colsOfCreatChildTable); + g_args.tb_prefix, i, + winfo->cols); } else { if (0 == len) { batchNum = 0; @@ -2215,7 +2229,7 @@ void * createTable(void *sarg) } len = 0; - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){ free(buffer); return NULL; @@ -2303,25 +2317,50 @@ int startMultiThreadCreateChildTable( } -void createChildTables() { +static void createChildTables() { + char tblColsBuf[MAX_SQL_SIZE]; + int len; + for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.db[i].superTblCount > 0) { + // with super table for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { continue; } + debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); startMultiThreadCreateChildTable( - g_Dbs.db[i].superTbls[j].colsOfCreatChildTable, + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, g_Dbs.threadCountByCreateTbl, g_Dbs.db[i].superTbls[j].childTblCount, g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; } } else { + // normal table + len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP"); + for (int i = 0; i < MAX_COLUMN_COUNT; i++) { + if (g_args.datatype[i]) { + if ((strncasecmp(g_args.datatype[i], "BINARY", strlen("BINARY")) == 0) + || (strncasecmp(g_args.datatype[i], "NCHAR", strlen("NCHAR")) == 0)) { + len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s(60)", i, g_args.datatype[i]); + } else { + len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s", i, g_args.datatype[i]); + } + len = strlen(tblColsBuf); + } else { + len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ")"); + break; + } + } + + debugPrint("DEBUG - %s() LN%d: %s\n", __func__, __LINE__, + tblColsBuf); startMultiThreadCreateChildTable( - g_Dbs.db[i].superTbls[j].colsOfCreatChildTable, + tblColsBuf, g_Dbs.threadCountByCreateTbl, g_args.num_of_DPT, g_Dbs.db[i].dbName, @@ -2361,7 +2400,7 @@ int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { size_t n = 0; ssize_t readLen = 0; char * line = NULL; - + FILE *fp = fopen(superTblInfo->tagsFile, "r"); if (fp == NULL) { printf("Failed to open tags file: %s, reason:%s\n", @@ -2373,7 +2412,7 @@ int readTagFromCsvFileToMem(SSuperTable * superTblInfo) { free(superTblInfo->tagDataBuf); superTblInfo->tagDataBuf = NULL; } - + int tagCount = 10000; int count = 0; char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount); @@ -3612,7 +3651,7 @@ void prePareSampleData() { //if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) { // readSampleFromFileToMem(&g_Dbs.db[i].superTbls[j]); //} - + if (g_Dbs.db[i].superTbls[j].tagsFile[0] != 0) { (void)readTagFromCsvFileToMem(&g_Dbs.db[i].superTbls[j]); } @@ -3624,9 +3663,9 @@ void postFreeResource() { tmfclose(g_fpOfInsertResult); for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - if (0 != g_Dbs.db[i].superTbls[j].colsOfCreatChildTable) { - free(g_Dbs.db[i].superTbls[j].colsOfCreatChildTable); - g_Dbs.db[i].superTbls[j].colsOfCreatChildTable = NULL; + if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) { + free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); + g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL; } if (0 != g_Dbs.db[i].superTbls[j].sampleDataBuf) { free(g_Dbs.db[i].superTbls[j].sampleDataBuf); @@ -3704,11 +3743,11 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* } dataLen -= 2; dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); - + return dataLen; } -void syncWriteForNumberOfTblInOneSql( +static void syncWriteForNumberOfTblInOneSql( threadInfo *winfo, FILE *fp, char* sampleDataBuf) { SSuperTable* superTblInfo = winfo->superTblInfo; @@ -3731,7 +3770,7 @@ void syncWriteForNumberOfTblInOneSql( numberOfTblInOneSql = tbls; } - int64_t time_counter = winfo->start_time; + uint64_t time_counter = winfo->start_time; int64_t tmp_time; int sampleUsePos; @@ -3955,6 +3994,64 @@ send_to_server: return; } +int32_t generateData(char *res, char **data_type, + int num_of_cols, int64_t timestamp, int len_of_binary) { + memset(res, 0, MAX_DATA_SIZE); + char *pstr = res; + pstr += sprintf(pstr, "(%" PRId64, timestamp); + int c = 0; + + for (; c < MAX_NUM_DATATYPE; c++) { + if (data_type[c] == NULL) { + break; + } + } + + if (0 == c) { + perror("data type error!"); + exit(-1); + } + + for (int i = 0; i < num_of_cols; i++) { + if (strcasecmp(data_type[i % c], "tinyint") == 0) { + pstr += sprintf(pstr, ", %d", rand_tinyint() ); + } else if (strcasecmp(data_type[i % c], "smallint") == 0) { + pstr += sprintf(pstr, ", %d", rand_smallint()); + } else if (strcasecmp(data_type[i % c], "int") == 0) { + pstr += sprintf(pstr, ", %d", rand_int()); + } else if (strcasecmp(data_type[i % c], "bigint") == 0) { + pstr += sprintf(pstr, ", %" PRId64, rand_bigint()); + } else if (strcasecmp(data_type[i % c], "float") == 0) { + pstr += sprintf(pstr, ", %10.4f", rand_float()); + } else if (strcasecmp(data_type[i % c], "double") == 0) { + double t = rand_double(); + pstr += sprintf(pstr, ", %20.8f", t); + } else if (strcasecmp(data_type[i % c], "bool") == 0) { + bool b = rand() & 1; + pstr += sprintf(pstr, ", %s", b ? "true" : "false"); + } else if (strcasecmp(data_type[i % c], "binary") == 0) { + char *s = malloc(len_of_binary); + rand_string(s, len_of_binary); + pstr += sprintf(pstr, ", \"%s\"", s); + free(s); + }else if (strcasecmp(data_type[i % c], "nchar") == 0) { + char *s = malloc(len_of_binary); + rand_string(s, len_of_binary); + pstr += sprintf(pstr, ", \"%s\"", s); + free(s); + } + + if (pstr - res > MAX_DATA_SIZE) { + perror("column length too long, abort"); + exit(-1); + } + } + + pstr += sprintf(pstr, ")"); + + return (int32_t)(pstr - res); +} + // sync insertion /* 1 thread: 100 tables * 2000 rows/s @@ -3963,7 +4060,88 @@ send_to_server: 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s */ -void *syncWrite(void *sarg) { +static void* syncWrite(void *sarg) { + + threadInfo *winfo = (threadInfo *)sarg; + + char buffer[BUFFER_SIZE] = "\0"; + char data[MAX_DATA_SIZE]; + char **data_type = g_args.datatype; + int len_of_binary = g_args.len_of_binary; + + int ncols_per_record = 1; // count first col ts + for (int i = 0; i < MAX_COLUMN_COUNT; i ++) { + if (NULL == g_args.datatype[i]) + break; + else + ncols_per_record ++; + } + + srand((uint32_t)time(NULL)); + int64_t time_counter = winfo->start_time; + + for (int i = 0; i < g_args.num_of_DPT;) { + for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { + int inserted = i; + int64_t tmp_time = time_counter; + + char *pstr = buffer; + pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, g_args.tb_prefix, tID); + int k; + for (k = 0; k < g_args.num_of_RPR;) { + int rand_num = rand() % 100; + int len = -1; + + if ((g_args.disorderRatio != 0) && (rand_num < g_args.disorderRange)) { + + int64_t d = tmp_time - rand() % 1000000 + rand_num; + len = generateData(data, data_type, ncols_per_record, d, len_of_binary); + } else { + len = generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary); + } + + //assert(len + pstr - buffer < BUFFER_SIZE); + if (len + pstr - buffer >= BUFFER_SIZE) { // too long + break; + } + + pstr += sprintf(pstr, " %s", data); + inserted++; + k++; + + if (inserted >= g_args.num_of_DPT) + break; + } + + /* puts(buffer); */ + int64_t startTs; + int64_t endTs; + startTs = taosGetTimestampUs(); + //queryDB(winfo->taos, buffer); + debugPrint("DEBUG - %s() LN%d %s\n", __func__, __LINE__, buffer); + int affectedRows = queryDbExec(winfo->taos, buffer, 1); + + if (0 <= affectedRows){ + endTs = taosGetTimestampUs(); + int64_t delay = endTs - startTs; + if (delay > winfo->maxDelay) winfo->maxDelay = delay; + if (delay < winfo->minDelay) winfo->minDelay = delay; + winfo->cntDelay++; + winfo->totalDelay += delay; + //winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; + } + + if (tID == winfo->end_table_id) { + i = inserted; + time_counter = tmp_time; + } + } + } + return NULL; +} + + +static void* syncWriteWithStb(void *sarg) { uint64_t totalRowsInserted = 0; uint64_t totalAffectedRows = 0; uint64_t lastPrintTime = taosGetTimestampMs(); @@ -4137,7 +4315,7 @@ void *syncWrite(void *sarg) { int64_t endTs; startTs = taosGetTimestampUs(); - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer); int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); if (0 > affectedRows){ goto free_and_statistics_2; @@ -4280,10 +4458,11 @@ void *asyncWrite(void *sarg) { void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSuperTable* superTblInfo) { - pthread_t *pids = malloc(threads * sizeof(pthread_t)); - threadInfo *infos = malloc(threads * sizeof(threadInfo)); - memset(pids, 0, threads * sizeof(pthread_t)); - memset(infos, 0, threads * sizeof(threadInfo)); + + pthread_t *pids = malloc(threads * sizeof(pthread_t)); + threadInfo *infos = malloc(threads * sizeof(threadInfo)); + memset(pids, 0, threads * sizeof(pthread_t)); + memset(infos, 0, threads * sizeof(threadInfo)); int ntables = 0; if (superTblInfo) @@ -4323,15 +4502,20 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, } } - int64_t start_time; - if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { - start_time = taosGetTimestamp(timePrec); - } else { - (void)taosParseTime( + + int64_t start_time; + if (superTblInfo) { + if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { + start_time = taosGetTimestamp(timePrec); + } else { + taosParseTime( superTblInfo->startTimestamp, &start_time, strlen(superTblInfo->startTimestamp), timePrec, 0); + } + } else { + start_time = 1500000000000; } double start = getCurrentTime(); @@ -4346,7 +4530,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, t_info->start_time = start_time; t_info->minDelay = INT16_MAX; - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { + if ((NULL == superTblInfo) || (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { //t_info->taos = taos; t_info->taos = taos_connect( g_Dbs.host, g_Dbs.user, @@ -4359,7 +4543,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, t_info->taos = NULL; } - if (0 == superTblInfo->multiThreadWriteOneTbl) { + if ((NULL == superTblInfo) || (0 == superTblInfo->multiThreadWriteOneTbl)) { t_info->start_table_id = last; t_info->end_table_id = i < b ? last + a : last + a - 1; last = t_info->end_table_id + 1; @@ -4371,7 +4555,11 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, tsem_init(&(t_info->lock_sem), 0, 0); if (SYNC == g_Dbs.queryMode) { - pthread_create(pids + i, NULL, syncWrite, t_info); + if (superTblInfo) { + pthread_create(pids + i, NULL, syncWriteWithStb, t_info); + } else { + pthread_create(pids + i, NULL, syncWrite, t_info); + } } else { pthread_create(pids + i, NULL, asyncWrite, t_info); } @@ -4393,13 +4581,15 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, tsem_destroy(&(t_info->lock_sem)); taos_close(t_info->taos); - superTblInfo->totalAffectedRows += t_info->totalAffectedRows; - superTblInfo->totalRowsInserted += t_info->totalRowsInserted; + if (superTblInfo) { + superTblInfo->totalAffectedRows += t_info->totalAffectedRows; + superTblInfo->totalRowsInserted += t_info->totalRowsInserted; - totalDelay += t_info->totalDelay; - cntDelay += t_info->cntDelay; - if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; - if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; + totalDelay += t_info->totalDelay; + cntDelay += t_info->cntDelay; + if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; + if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; + } } cntDelay -= 1; @@ -4408,16 +4598,19 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, double end = getCurrentTime(); double t = end - start; - printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", + + if (superTblInfo) { + printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", t, superTblInfo->totalRowsInserted, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, superTblInfo->totalRowsInserted / t); - fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", + fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", t, superTblInfo->totalRowsInserted, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, superTblInfo->totalRowsInserted / t); + } printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n", avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0); @@ -4436,7 +4629,7 @@ void *readTable(void *sarg) { threadInfo *rinfo = (threadInfo *)sarg; TAOS *taos = rinfo->taos; char command[BUFFER_SIZE] = "\0"; - int64_t sTime = rinfo->start_time; + uint64_t sTime = rinfo->start_time; char *tb_prefix = rinfo->tb_prefix; FILE *fp = fopen(rinfo->fp, "a"); if (NULL == fp) { @@ -4444,7 +4637,13 @@ void *readTable(void *sarg) { return NULL; } - int num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; + int num_of_DPT; + if (rinfo->superTblInfo) { + num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table; + } else { + num_of_DPT = g_args.num_of_DPT; + } + int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; bool do_aggreFunc = g_Dbs.do_aggreFunc; @@ -4594,7 +4793,7 @@ int insertTestProcess() { init_rand_data(); // create database and super tables - if( createDatabases() != 0) { + if(createDatabases() != 0) { fclose(g_fpOfInsertResult); return -1; } @@ -4762,7 +4961,7 @@ void *subQueryProcess(void *sarg) { return NULL; } -int queryTestProcess() { +static int queryTestProcess() { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, g_queryInfo.user, @@ -5052,7 +5251,7 @@ void *superSubscribeProcess(void *sarg) { return NULL; } -int subscribeTestProcess() { +static int subscribeTestProcess() { printfQueryMeta(); if (!g_args.answer_yes) { @@ -5201,6 +5400,9 @@ void setParaFromArg(){ g_Dbs.port = g_args.port; } + g_Dbs.threadCount = g_args.num_of_threads; + g_Dbs.threadCountByCreateTbl = g_args.num_of_threads; + g_Dbs.dbCount = 1; g_Dbs.db[0].drop = 1; @@ -5352,7 +5554,7 @@ void querySqlFile(TAOS* taos, char* sqlFile) } memcpy(cmd + cmd_len, line, read_len); - debugPrint("DEBUG %s() %d \n", __func__, __LINE__); + debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, cmd); queryDbExec(taos, cmd, NO_INSERT_TYPE); memset(cmd, 0, MAX_SQL_SIZE); cmd_len = 0; @@ -5367,26 +5569,24 @@ void querySqlFile(TAOS* taos, char* sqlFile) return; } - - -void testMetaFile() { +static void testMetaFile() { if (INSERT_MODE == g_args.test_mode) { if (g_Dbs.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir); insertTestProcess(); } else if (QUERY_MODE == g_args.test_mode) { if (g_queryInfo.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir); - (void)queryTestProcess(); + queryTestProcess(); } else if (SUBSCRIBE_MODE == g_args.test_mode) { if (g_queryInfo.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir); - (void)subscribeTestProcess(); + subscribeTestProcess(); } else { ; } } -void testCmdLine() { +static void testCmdLine() { g_args.test_mode = INSERT_MODE; insertTestProcess();