diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 083752ec7eb4bc0d5298b5129004890656d553ad..b34757432d40436d99d559f768343e1c71f01824 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -1797,21 +1797,21 @@ int postProceSql(char* host, uint16_t port, char* sqlstr) return 0; } - -char* getTagValueFromTagSample( SSuperTable* stbInfo, int tagUsePos) { +static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) { char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); if (NULL == dataBuf) { printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1); return NULL; } - + int dataLen = 0; - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "(%s)", stbInfo->tagDataBuf + stbInfo->lenOfTagOfOneRow * tagUsePos); - + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "(%s)", stbInfo->tagDataBuf + stbInfo->lenOfTagOfOneRow * tagUsePos); + return dataBuf; } -char* generateTagVaulesForStb(SSuperTable* stbInfo) { +static char* generateTagVaulesForStb(SSuperTable* stbInfo) { char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); if (NULL == dataBuf) { printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1); @@ -1821,13 +1821,15 @@ char* generateTagVaulesForStb(SSuperTable* stbInfo) { int dataLen = 0; dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "("); for (int i = 0; i < stbInfo->tagCount; i++) { - if ((0 == strncasecmp(stbInfo->tags[i].dataType, "binary", 6)) || (0 == strncasecmp(stbInfo->tags[i].dataType, "nchar", 5))) { + if ((0 == strncasecmp(stbInfo->tags[i].dataType, "binary", strlen("binary"))) + || (0 == strncasecmp(stbInfo->tags[i].dataType, "nchar", strlen("nchar")))) { if (stbInfo->tags[i].dataLen > TSDB_MAX_BINARY_LEN) { - printf("binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); + printf("binary or nchar length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); tmfree(dataBuf); return NULL; } - + char* buf = (char*)calloc(stbInfo->tags[i].dataLen+1, 1); if (NULL == buf) { printf("calloc failed! size:%d\n", stbInfo->tags[i].dataLen); @@ -1835,30 +1837,48 @@ char* generateTagVaulesForStb(SSuperTable* stbInfo) { return NULL; } rand_string(buf, stbInfo->tags[i].dataLen); - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "\'%s\', ", buf); + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "\'%s\', ", buf); tmfree(buf); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "int", 3)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_int()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "bigint", 6)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%"PRId64", ", rand_bigint()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "float", 5)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%f, ", rand_float()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "double", 6)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%f, ", rand_double()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "smallint", 8)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_smallint()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "tinyint", 7)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_tinyint()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "bool", 4)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_bool()); - } else if (0 == strncasecmp(stbInfo->tags[i].dataType, "timestamp", 4)) { - dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%"PRId64", ", rand_bigint()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "int", strlen("int"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%d, ", rand_int()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "bigint", strlen("bigint"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%"PRId64", ", rand_bigint()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "float", strlen("float"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%f, ", rand_float()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "double", strlen("double"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%f, ", rand_double()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "smallint", strlen("smallint"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%d, ", rand_smallint()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "tinyint", strlen("tinyint"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%d, ", rand_tinyint()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "bool", strlen("bool"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%d, ", rand_bool()); + } else if (0 == strncasecmp(stbInfo->tags[i].dataType, + "timestamp", strlen("timestamp"))) { + dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, + "%"PRId64", ", rand_bigint()); } else { printf("No support data type: %s\n", stbInfo->tags[i].dataType); tmfree(dataBuf); return NULL; } } + dataLen -= 2; dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, ")"); return dataBuf; @@ -2012,7 +2032,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, TAOS_RES * res; TAOS_ROW row = NULL; int count = 0; - + //get schema use cmd: describe superTblName; snprintf(command, BUFFER_SIZE, "describe %s.%s", dbName, superTbls->sTblName); res = taos_query(taos, command); @@ -2423,7 +2443,7 @@ static void* createTable(void *sarg) } int startMultiThreadCreateChildTable( - char* cols, int threads, int ntables, + char* cols, int threads, int startFrom, int ntables, char* db_name, SSuperTable* superTblInfo) { pthread_t *pids = malloc(threads * sizeof(pthread_t)); threadInfo *infos = malloc(threads * sizeof(threadInfo)); @@ -2446,7 +2466,7 @@ int startMultiThreadCreateChildTable( int b = 0; b = ntables % threads; - int last = 0; + int last = startFrom; for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; @@ -2505,12 +2525,23 @@ static void createChildTables() { verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); + int startFrom; + if (g_Dbs.db[i].superTbls[j].childTblOffset) { + startFrom = g_Dbs.db[i].superTbls[j].childTblOffset; + g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblLimit; + } else { + startFrom = 0; + g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; + } + + verbosePrint("%s() LN%d: create %d child tables from %d\n", __func__, __LINE__, + g_totalChildTables, startFrom); startMultiThreadCreateChildTable( g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, g_Dbs.threadCountByCreateTbl, - g_Dbs.db[i].superTbls[j].childTblCount, + startFrom, + g_totalChildTables, g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); - g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; } } else { // normal table @@ -2538,6 +2569,7 @@ static void createChildTables() { startMultiThreadCreateChildTable( tblColsBuf, g_Dbs.threadCountByCreateTbl, + 0, g_args.num_of_tables, g_Dbs.db[i].dbName, NULL); @@ -2545,30 +2577,6 @@ static void createChildTables() { } } -/* -static int taosGetLineNum(const char *fileName) -{ - int lineNum = 0; - char cmd[1024] = { 0 }; - char buf[1024] = { 0 }; - sprintf(cmd, "wc -l %s", fileName); - - FILE *fp = popen(cmd, "r"); - if (fp == NULL) { - fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); - return lineNum; - } - - if (fgets(buf, sizeof(buf), fp)) { - int index = strchr((const char*)buf, ' ') - buf; - buf[index] = '\0'; - lineNum = atoi(buf); - } - pclose(fp); - return lineNum; -} -*/ - /* Read 10000 lines at most. If more than 10000 lines, continue to read after using */ @@ -2655,7 +2663,7 @@ static int readSampleFromCsvFileToMem( FILE* fp = fopen(superTblInfo->sampleFile, "r"); if (fp == NULL) { - fprintf(stderr, "Failed to open sample file: %s, reason:%s\n", + fprintf(stderr, "Failed to open sample file: %s, reason:%s\n", superTblInfo->sampleFile, strerror(errno)); return -1; } @@ -2907,7 +2915,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("ERROR: failed to read json, threads not found\n"); goto PARSE_OVER; } - + cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl"); if (threads2 && threads2->type == cJSON_Number) { g_Dbs.threadCountByCreateTbl = threads2->valueint; @@ -4010,7 +4018,7 @@ static void syncWriteForNumberOfTblInOneSql( int64_t et = 0xffffffff; for (int i = 0; i < superTblInfo->insertRows;) { int32_t tbl_id = 0; - for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { + for (int tableID = winfo->start_table_id; tableID <= winfo->end_table_id; ) { int64_t start_time = 0; int inserted = i; @@ -4019,12 +4027,12 @@ static void syncWriteForNumberOfTblInOneSql( memset(buffer, 0, superTblInfo->maxSqlLen); char *pstr = buffer; - int32_t end_tbl_id = tID + numberOfTblInOneSql; + int32_t end_tbl_id = tableID + numberOfTblInOneSql; if (end_tbl_id > winfo->end_table_id) { end_tbl_id = winfo->end_table_id+1; } - for (tbl_id = tID; tbl_id < end_tbl_id; tbl_id++) { + for (tbl_id = tableID; tbl_id < end_tbl_id; tbl_id++) { sampleUsePos = samplePos; if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; @@ -4097,10 +4105,10 @@ static void syncWriteForNumberOfTblInOneSql( int retLen = 0; if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { - retLen = getRowDataFromSample(pstr + len, - superTblInfo->maxSqlLen - len, - start_time += superTblInfo->timeStampStep, - superTblInfo, + retLen = getRowDataFromSample(pstr + len, + superTblInfo->maxSqlLen - len, + start_time += superTblInfo->timeStampStep, + superTblInfo, &sampleUsePos); if (retLen < 0) { goto free_and_statistics; @@ -4111,13 +4119,13 @@ static void syncWriteForNumberOfTblInOneSql( if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { int64_t d = start_time - rand() % superTblInfo->disorderRange; - retLen = generateRowData(pstr + len, - superTblInfo->maxSqlLen - len, - d, + retLen = generateRowData(pstr + len, + superTblInfo->maxSqlLen - len, + d, superTblInfo); } else { - retLen = generateRowData(pstr + len, - superTblInfo->maxSqlLen - len, + retLen = generateRowData(pstr + len, + superTblInfo->maxSqlLen - len, start_time += superTblInfo->timeStampStep, superTblInfo); } @@ -4132,7 +4140,7 @@ static void syncWriteForNumberOfTblInOneSql( if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { - tID = tbl_id + 1; + tableID = tbl_id + 1; printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", superTblInfo->lenOfOneRow); goto send_to_server; @@ -4140,7 +4148,7 @@ static void syncWriteForNumberOfTblInOneSql( } } - tID = tbl_id; + tableID = tbl_id; inserted += superTblInfo->rowsPerTbl; send_to_server: @@ -4209,7 +4217,7 @@ send_to_server: break; } - if (tID > winfo->end_table_id) { + if (tableID > winfo->end_table_id) { if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { samplePos = sampleUsePos; } @@ -4346,7 +4354,7 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) return affectedRows; } -static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *buffer, +static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *buffer, int64_t insertRows, int64_t startFrom, int64_t startTime, int *pSampleUsePos) { @@ -4362,6 +4370,8 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b } } + assert(buffer != NULL); + memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); char *pstr = buffer; @@ -4374,7 +4384,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b } else { tagsValBuf = getTagValueFromTagSample( superTblInfo, - threadID % superTblInfo->tagSampleCount); + tableID % superTblInfo->tagSampleCount); } if (NULL == tagsValBuf) { fprintf(stderr, "tag buf failed to allocate memory\n"); @@ -4386,7 +4396,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b "insert into %s.%s%d using %s.%s tags %s values", pThreadInfo->db_name, superTblInfo->childTblPrefix, - threadID, + tableID, pThreadInfo->db_name, superTblInfo->sTblName, tagsValBuf); @@ -4396,14 +4406,14 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b superTblInfo->maxSqlLen, "insert into %s.%s values", pThreadInfo->db_name, - superTblInfo->childTblName + threadID * TSDB_TABLE_NAME_LEN); + superTblInfo->childTblName + tableID * TSDB_TABLE_NAME_LEN); } else { pstr += snprintf(pstr, (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), "insert into %s.%s%d values", pThreadInfo->db_name, superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, - threadID); + tableID); } } else { pstr += snprintf(pstr, @@ -4411,7 +4421,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b "insert into %s.%s%d values", pThreadInfo->db_name, superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, - threadID); + tableID); } int k; @@ -4444,7 +4454,7 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, - startTime + superTblInfo->timeStampStep * startFrom, + startTime + superTblInfo->timeStampStep * startFrom, superTblInfo); } @@ -4486,7 +4496,6 @@ static int generateDataBuffer(int32_t threadID, threadInfo *pThreadInfo, char *b k++; startFrom ++; - debugPrint("%s() LN%d k=%d startFrom=%ld insertRows=%ld\n", __func__, __LINE__, k, startFrom, insertRows); if (startFrom >= insertRows) break; } @@ -4523,7 +4532,7 @@ static void* syncWrite(void *sarg) { char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1); if (NULL == buffer) { - fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", + fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n", superTblInfo->maxSqlLen, strerror(errno)); tmfree(superTblInfo->sampleDataBuf); @@ -4534,7 +4543,8 @@ static void* syncWrite(void *sarg) { int64_t startTs = taosGetTimestampUs(); int64_t endTs; - int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; + int insert_interval = superTblInfo?superTblInfo->insertInterval: + g_args.insert_interval; uint64_t st = 0; uint64_t et = 0xffffffff; @@ -4543,12 +4553,12 @@ static void* syncWrite(void *sarg) { int sampleUsePos; - if (superTblInfo && superTblInfo->childTblLimit ) { + if (superTblInfo && superTblInfo->childTblOffset) { // TODO } - for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; - tID++) { + for (uint32_t tableID = winfo->start_table_id; tableID <= winfo->end_table_id; + tableID++) { int64_t start_time = winfo->start_time; int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; @@ -4561,7 +4571,7 @@ static void* syncWrite(void *sarg) { sampleUsePos = samplePos; - int generated = generateDataBuffer(tID, winfo, buffer, insertRows, + int generated = generateDataBuffer(tableID, winfo, buffer, insertRows, i, start_time, &sampleUsePos); if (generated > 0) i += generated; @@ -4605,12 +4615,12 @@ static void* syncWrite(void *sarg) { } } // num_of_DPT - if ((tID == winfo->end_table_id) && superTblInfo && + if ((tableID == winfo->end_table_id) && superTblInfo && (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample")))) { samplePos = sampleUsePos; } - } // tID + } // tableID free_and_statistics_2: tmfree(buffer); @@ -4705,13 +4715,20 @@ static void startMultiThreadInsertData(int threads, char* db_name, char* precision,SSuperTable* superTblInfo) { pthread_t *pids = malloc(threads * sizeof(pthread_t)); + assert(pids != NULL); + threadInfo *infos = malloc(threads * sizeof(threadInfo)); + assert(infos != NULL); + memset(pids, 0, threads * sizeof(pthread_t)); memset(infos, 0, threads * sizeof(threadInfo)); int ntables = 0; if (superTblInfo) - ntables = superTblInfo->childTblCount; + if (superTblInfo->childTblOffset) + ntables = superTblInfo->childTblLimit; + else + ntables = superTblInfo->childTblCount; else ntables = g_args.num_of_tables; @@ -4767,7 +4784,13 @@ static void startMultiThreadInsertData(int threads, char* db_name, double start = getCurrentTime(); - int last = 0; + int last; + + if ((superTblInfo) && (superTblInfo->childTblOffset)) + last = superTblInfo->childTblOffset; + else + last = 0; + for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; @@ -5490,7 +5513,7 @@ void *superSubscribeProcess(void *sarg) { } } taos_free_result(res); - + for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress); @@ -5533,12 +5556,12 @@ static int subscribeTestProcess() { pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t)); infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { - printf("malloc failed for create threads\n"); + printf("malloc failed for create threads\n"); taos_close(taos); exit(-1); } - - for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { + + for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; t_info->taos = taos; @@ -5549,14 +5572,14 @@ static int subscribeTestProcess() { //==== create sub threads for query from sub table pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; - if ((g_queryInfo.subQueryInfo.sqlCount > 0) + if ((g_queryInfo.subQueryInfo.sqlCount > 0) && (g_queryInfo.subQueryInfo.threadCnt > 0)) { - pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * + pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t)); infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo)); if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { - printf("malloc failed for create threads\n"); + printf("malloc failed for create threads\n"); taos_close(taos); exit(-1); } @@ -5576,7 +5599,7 @@ static int subscribeTestProcess() { } int last = 0; - for (int i = 0; i < threads; i++) { + for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; t_info->threadID = i; @@ -5593,14 +5616,14 @@ static int subscribeTestProcess() { } tmfree((char*)pids); - tmfree((char*)infos); + tmfree((char*)infos); for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) { pthread_join(pidsOfSub[i], NULL); } tmfree((char*)pidsOfSub); - tmfree((char*)infosOfSub); + tmfree((char*)infosOfSub); taos_close(taos); return 0; }