From 1489c458f01fc8e6580f30c7432f540690c83790 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 11 Mar 2021 19:34:20 +0800 Subject: [PATCH] [TD-3192] : support stb limit and offset. refactoring. stb and tb insert works. --- src/kit/taosdemo/taosdemo.c | 358 ++++++++++++++---------------------- 1 file changed, 139 insertions(+), 219 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index e214235e32..a94a4f73ab 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -764,6 +764,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { printf("# Insertion interval: %d\n", arguments->insert_interval); printf("# Number of records per req: %d\n", arguments->num_of_RPR); printf("# Max SQL length: %d\n", arguments->max_sql_len); + printf("# Length of Binary: %d\n", arguments->len_of_binary); printf("# Number of Threads: %d\n", arguments->num_of_threads); printf("# Number of Tables: %d\n", arguments->num_of_tables); printf("# Number of Data per Table: %d\n", arguments->num_of_DPT); @@ -3997,7 +3998,7 @@ static void syncWriteForNumberOfTblInOneSql( for (int i = 0; i < superTblInfo->insertRows;) { int32_t tbl_id = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { - int64_t tmp_time = 0; + int64_t start_time = 0; int inserted = i; for (int k = 0; k < g_args.num_of_RPR;) { @@ -4078,14 +4079,14 @@ static void syncWriteForNumberOfTblInOneSql( } } - tmp_time = time_counter; + start_time = time_counter; for (int j = 0; j < superTblInfo->rowsPerTbl;) { int retLen = 0; if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { retLen = getRowDataFromSample(pstr + len, superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + start_time += superTblInfo->timeStampStep, superTblInfo, &sampleUsePos); if (retLen < 0) { @@ -4096,7 +4097,7 @@ static void syncWriteForNumberOfTblInOneSql( int rand_num = rand_tinyint() % 100; if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { - int64_t d = tmp_time - rand() % superTblInfo->disorderRange; + int64_t d = start_time - rand() % superTblInfo->disorderRange; retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, d, @@ -4104,7 +4105,7 @@ static void syncWriteForNumberOfTblInOneSql( } else { retLen = generateRowData(pstr + len, superTblInfo->maxSqlLen - len, - tmp_time += superTblInfo->timeStampStep, + start_time += superTblInfo->timeStampStep, superTblInfo); } if (retLen < 0) { @@ -4200,7 +4201,7 @@ send_to_server: samplePos = sampleUsePos; } i = inserted; - time_counter = tmp_time; + time_counter = start_time; } } @@ -4216,7 +4217,7 @@ free_and_statistics: } int32_t generateData(char *res, char **data_type, - int num_of_cols, int64_t timestamp, int len_of_binary) { + int num_of_cols, int64_t timestamp, int lenOfBinary) { memset(res, 0, MAX_DATA_SIZE); char *pstr = res; pstr += sprintf(pstr, "(%" PRId64, timestamp); @@ -4251,13 +4252,13 @@ int32_t generateData(char *res, char **data_type, bool b = rand() & 1; pstr += sprintf(pstr, ", %s", b ? "true" : "false"); } else if (strcasecmp(data_type[i % c], "binary") == 0) { - char *s = malloc(len_of_binary); - rand_string(s, len_of_binary); + char *s = malloc(lenOfBinary); + rand_string(s, lenOfBinary); pstr += sprintf(pstr, ", \"%s\"", s); free(s); }else if (strcasecmp(data_type[i % c], "nchar") == 0) { - char *s = malloc(len_of_binary); - rand_string(s, len_of_binary); + char *s = malloc(lenOfBinary); + rand_string(s, lenOfBinary); pstr += sprintf(pstr, ", \"%s\"", s); free(s); } @@ -4273,142 +4274,6 @@ int32_t generateData(char *res, char **data_type, return (int32_t)(pstr - res); } -// sync insertion -/* - 1 thread: 100 tables * 2000 rows/s - 1 thread: 10 tables * 20000 rows/s - 6 thread: 300 tables * 2000 rows/s - - 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s -*/ -static void* syncWrite(void *sarg) { - - threadInfo *winfo = (threadInfo *)sarg; - - char* buffer = calloc(g_args.max_sql_len, 1); - if (NULL == buffer) { - fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", - g_args.max_sql_len, - strerror(errno)); - return NULL; - } - - char data[MAX_DATA_SIZE]; - char **data_type = g_args.datatype; - int len_of_binary = g_args.len_of_binary; - - int ncols_per_record = 1; // count first col ts - int i = 0; - while(g_args.datatype[i]) { - i ++; - ncols_per_record ++; - } - - srand((uint32_t)time(NULL)); - int64_t time_counter = winfo->start_time; - - uint64_t st = 0; - uint64_t et = 0; - - winfo->totalRowsInserted = 0; - winfo->totalAffectedRows = 0; - - for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { - int64_t tmp_time = time_counter; - - for (int i = 0; i < g_args.num_of_DPT;) { - - int tblInserted = i; - - memset(buffer, 0, g_args.max_sql_len); - char *pstr = buffer; - pstr += sprintf(pstr, - "insert into %s.%s%d values ", - winfo->db_name, g_args.tb_prefix, tID); - int k; - for (k = 0; k < g_args.num_of_RPR;) { - int rand_num = rand() % 100; - int len = -1; - - if ((g_args.disorderRatio != 0) - && (rand_num < g_args.disorderRange)) { - - int64_t d = tmp_time - rand() % 1000000 + rand_num; - len = generateData(data, data_type, - ncols_per_record, d, len_of_binary); - } else { - len = generateData(data, data_type, - ncols_per_record, tmp_time += 1000, len_of_binary); - } - - //assert(len + pstr - buffer < BUFFER_SIZE); - if (len + pstr - buffer >= BUFFER_SIZE) { // too long - break; - } - - pstr += sprintf(pstr, " %s", data); - tblInserted++; - k++; - i++; - - if (tblInserted >= g_args.num_of_DPT) - break; - } - - winfo->totalRowsInserted += k; - /* puts(buffer); */ - int64_t startTs; - int64_t endTs; - startTs = taosGetTimestampUs(); - //queryDB(winfo->taos, buffer); - if (i > 0 && g_args.insert_interval - && (g_args.insert_interval > (et - st) )) { - int sleep_time = g_args.insert_interval - (et -st); - printf("sleep: %d ms specified by insert_interval\n", sleep_time); - taosMsleep(sleep_time); // ms - } - - if (g_args.insert_interval) { - st = taosGetTimestampMs(); - } - verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); - int affectedRows = queryDbExec(winfo->taos, buffer, 1); - - if (0 < affectedRows){ - endTs = taosGetTimestampUs(); - int64_t delay = endTs - startTs; - if (delay > winfo->maxDelay) - winfo->maxDelay = delay; - if (delay < winfo->minDelay) - winfo->minDelay = delay; - winfo->cntDelay++; - winfo->totalDelay += delay; - winfo->totalAffectedRows += affectedRows; - winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; - } else { - fprintf(stderr, "queryDbExec() buffer:\n%s\naffected rows is %d", buffer, affectedRows); - } - - verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64" tblInserted=%d\n", __func__, __LINE__, winfo->totalAffectedRows, tblInserted); - if (g_args.insert_interval) { - et = taosGetTimestampMs(); - } - - if (tblInserted >= g_args.num_of_DPT) { - break; - } - } // num_of_DPT - } // tId - - tmfree(buffer); - printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", - winfo->threadID, - winfo->totalRowsInserted, - winfo->totalAffectedRows); - - return NULL; -} - static int prepareSampleData(SSuperTable *superTblInfo) { char* sampleDataBuf = NULL; @@ -4437,12 +4302,22 @@ static int prepareSampleData(SSuperTable *superTblInfo) { return 0; } -static void* syncWriteWithStb(void *sarg) { +// sync insertion +/* + 1 thread: 100 tables * 2000 rows/s + 1 thread: 10 tables * 20000 rows/s + 6 thread: 300 tables * 2000 rows/s + + 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s +*/ +static void* syncWrite(void *sarg) { uint64_t lastPrintTime = taosGetTimestampMs(); threadInfo *winfo = (threadInfo *)sarg; SSuperTable* superTblInfo = winfo->superTblInfo; + int ncols_per_record = 1; // count first col ts + int samplePos = 0; if (superTblInfo) { @@ -4454,9 +4329,16 @@ static void* syncWriteWithStb(void *sarg) { tmfree(superTblInfo->sampleDataBuf); return NULL; } + } else { + int datatypeSeq = 0; + while(g_args.datatype[datatypeSeq]) { + datatypeSeq ++; + ncols_per_record ++; + } + } - char* buffer = calloc(superTblInfo->maxSqlLen, 1); + char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); if (NULL == buffer) { fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", superTblInfo->maxSqlLen, @@ -4473,91 +4355,92 @@ static void* syncWriteWithStb(void *sarg) { int sampleUsePos; - verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); - - if (superTblInfo->childTblLimit ) { - // CBD + if (superTblInfo && superTblInfo->childTblLimit ) { + // TODO } for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { int64_t start_time = winfo->start_time; - for (int i = 0; i < superTblInfo->insertRows;) { - - int64_t tblInserted = i; + int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; + verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); - if (i > 0 && g_args.insert_interval - && (g_args.insert_interval > (et - st) )) { - int sleep_time = g_args.insert_interval - (et -st); - printf("sleep: %d ms specified by insert_interval\n", sleep_time); - taosMsleep(sleep_time); // ms - } - - if (g_args.insert_interval) { - st = taosGetTimestampMs(); - } + for (int64_t i = 0; i < insertRows;) { + int64_t prepared = i; sampleUsePos = samplePos; - verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); - memset(buffer, 0, superTblInfo->maxSqlLen); - int len = 0; + memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); char *pstr = buffer; - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { + if (superTblInfo) { + + if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; if (0 == superTblInfo->tagSource) { tagsValBuf = generateTagVaulesForStb(superTblInfo); } else { tagsValBuf = getTagValueFromTagSample( - superTblInfo, + superTblInfo, tID % superTblInfo->tagSampleCount); } if (NULL == tagsValBuf) { goto free_and_statistics_2; } - - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s%d using %s.%s tags %s values", - winfo->db_name, - superTblInfo->childTblPrefix, - tID, - winfo->db_name, - superTblInfo->sTblName, + + pstr += snprintf(pstr, + superTblInfo->maxSqlLen, + "insert into %s.%s%d using %s.%s tags %s values", + winfo->db_name, + superTblInfo->childTblPrefix, + tID, + winfo->db_name, + superTblInfo->sTblName, tagsValBuf); tmfree(tagsValBuf); - } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s values", - winfo->db_name, + } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { + pstr += snprintf(pstr, + superTblInfo->maxSqlLen, + "insert into %s.%s values", + winfo->db_name, superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN); + } else { + pstr += snprintf(pstr, + (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), + "insert into %s.%s%d values", + winfo->db_name, + superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, + tID); + } } else { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s%d values", - winfo->db_name, - superTblInfo->childTblPrefix, + + pstr += snprintf(pstr, + (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), + "insert into %s.%s%d values", + winfo->db_name, + superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tID); } int k; + int len = 0; + + verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); for (k = 0; k < g_args.num_of_RPR;) { - int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { + + if (superTblInfo) { + int retLen = 0; + + if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { retLen = getRowDataFromSample( pstr + len, superTblInfo->maxSqlLen - len, start_time + superTblInfo->timeStampStep * i, superTblInfo, &sampleUsePos); - if (retLen < 0) { - goto free_and_statistics_2; - } - } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { + } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { int rand_num = rand_tinyint() % 100; if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { @@ -4567,43 +4450,82 @@ static void* syncWriteWithStb(void *sarg) { superTblInfo->maxSqlLen - len, d, superTblInfo); - //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d); - } else { + //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d); + } else { retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, start_time + superTblInfo->timeStampStep * i, superTblInfo); } + if (retLen < 0) { goto free_and_statistics_2; } + + len += retLen; + } + } else { + int rand_num = rand() % 100; + char data[MAX_DATA_SIZE]; + char **data_type = g_args.datatype; + int lenOfBinary = g_args.len_of_binary; + + if ((g_args.disorderRatio != 0) + && (rand_num < g_args.disorderRange)) { + + int64_t d = start_time - rand() % 1000000 + rand_num; + len = generateData(data, data_type, + ncols_per_record, d, lenOfBinary); + } else { + len = generateData(data, data_type, + ncols_per_record, start_time += 1000, lenOfBinary); + } + + //assert(len + pstr - buffer < BUFFER_SIZE); + if (len + pstr - buffer >= g_args.max_sql_len) { // too long + break; + } + + pstr += sprintf(pstr, " %s", data); } - len += retLen; - verbosePrint("%s() LN%d retLen=%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, retLen, len, k, buffer); + verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); - tblInserted++; + prepared ++; k++; i++; - if (tblInserted >= superTblInfo->insertRows) - break; + if (prepared >= insertRows) + break; } winfo->totalRowsInserted += k; + if (g_args.insert_interval) { + st = taosGetTimestampMs(); + + if (i > 0 && g_args.insert_interval + && (g_args.insert_interval > (et - st) )) { + int sleep_time = g_args.insert_interval - (et -st); + printf("sleep: %d ms for insert interval\n", sleep_time); + taosMsleep(sleep_time); // ms + } + } + int64_t startTs = taosGetTimestampUs(); int64_t endTs; int affectedRows; - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { + + if (superTblInfo) { + if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); if (0 > affectedRows){ goto free_and_statistics_2; } - } else { + } else { verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); @@ -4613,6 +4535,10 @@ static void* syncWriteWithStb(void *sarg) { } affectedRows = k; + } + } else { + verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); + affectedRows = queryDbExec(winfo->taos, buffer, 1); } endTs = taosGetTimestampUs(); @@ -4637,23 +4563,21 @@ static void* syncWriteWithStb(void *sarg) { et = taosGetTimestampMs(); } - if (tblInserted >= superTblInfo->insertRows) + if (prepared >= insertRows) break; } // num_of_DPT - if (tID == winfo->end_table_id) { - if (0 == strncasecmp( - superTblInfo->dataSource, "sample", strlen("sample"))) { + if ((tID == winfo->end_table_id) && superTblInfo && + (0 == strncasecmp( + superTblInfo->dataSource, "sample", strlen("sample")))) { samplePos = sampleUsePos; - } - } - //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); } // tID free_and_statistics_2: tmfree(buffer); - tmfree(superTblInfo->sampleDataBuf); + if (superTblInfo) + tmfree(superTblInfo->sampleDataBuf); printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, @@ -4697,7 +4621,7 @@ void callBack(void *param, TAOS_RES *res, int code) { //generateData(data, datatype, ncols_per_record, d, len_of_binary); (void)generateRowData(data, MAX_DATA_SIZE, d, winfo->superTblInfo); } else { - //generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary); + //generateData(data, datatype, ncols_per_record, start_time += 1000, len_of_binary); (void)generateRowData(data, MAX_DATA_SIZE, winfo->lastTs += 1000, winfo->superTblInfo); } pstr += sprintf(pstr, "%s", data); @@ -4839,11 +4763,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, tsem_init(&(t_info->lock_sem), 0, 0); if (SYNC == g_Dbs.queryMode) { - if (superTblInfo) { - pthread_create(pids + i, NULL, syncWriteWithStb, t_info); - } else { - pthread_create(pids + i, NULL, syncWrite, t_info); - } + pthread_create(pids + i, NULL, syncWrite, t_info); } else { pthread_create(pids + i, NULL, asyncWrite, t_info); } -- GitLab