diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index fa062c3f3ebf8e98f27dc27b5787b3790f8f2aae..76575a9475896377ec134e3f4a1fcc5967dd8389 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -62,6 +62,7 @@ extern char configDir[]; #define STR_INSERT_INTO "INSERT INTO " #define MAX_RECORDS_PER_REQ 32766 +#define MAX_LINE_SIZE 16384 #define HEAD_BUFF_LEN TSDB_MAX_COLUMNS*24 // 16*MAX_COLUMNS + (192+32)*2 + insert into .. @@ -316,6 +317,7 @@ typedef struct SSuperTable_S { // statistics uint64_t totalInsertRows; uint64_t totalAffectedRows; + bool schemaless; } SSuperTable; typedef struct { @@ -2137,6 +2139,17 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) { return 0; } +static int execSmlLines(TAOS* taos, char* lines[], int numLines, bool quiet) { + int32_t code = taos_insert_lines(taos, lines, numLines); + if (code != 0) { + if (!quiet) { + errorPrint2("Failed to execute schemaless line, reason: %s\n", tstrerror(code)); + } + return -1; + } + return 0; +} + static void appendResultBufToFile(char *resultBuf, threadInfo *pThreadInfo) { pThreadInfo->fp = fopen(pThreadInfo->filePath, "at"); @@ -2696,6 +2709,8 @@ static int printfInsertMeta() { printf(" stbName: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].stbName); + printf(" schemaless: \033[33m%s\033[0m\n", + g_Dbs.db[i].superTbls[j].schemaless ? "yes" : "no"); if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { printf(" autoCreateTable: \033[33m%s\033[0m\n", "no"); } else if (AUTO_CREATE_SUBTBL == @@ -4454,6 +4469,10 @@ int createDatabasesAndStables(char *command) { int validStbCount = 0; for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { + if (g_Dbs.db[i].superTbls[j].schemaless) + { + goto skip; + } sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].stbName); ret = queryDbExec(taos, command, NO_INSERT_TYPE, true); @@ -4475,6 +4494,7 @@ int createDatabasesAndStables(char *command) { continue; } } + skip: validStbCount ++; } g_Dbs.db[i].superTblCount = validStbCount; @@ -4484,6 +4504,250 @@ int createDatabasesAndStables(char *command) { return 0; } +static void* insertSchemaless(void *sargs) { + threadInfo *pThreadInfo = (threadInfo *)sargs; + SSuperTable* stbInfo = pThreadInfo->stbInfo; + int64_t timeStampStep = stbInfo?stbInfo->timeStampStep:g_args.timestamp_step; + int64_t insertRows = stbInfo?stbInfo->insertRows:g_args.insertRows; + setThreadName("insertSchemaless"); + int count = 0; + int batch = min(g_args.reqPerReq, insertRows*(pThreadInfo->ntables)); + char *lines[batch]; + int64_t timestamp = pThreadInfo->start_time; + + char *smlHead[pThreadInfo->ntables]; + for (int t = 0; t < pThreadInfo->ntables; t++) { + int64_t dataLen = 0; + smlHead[t] = (char *)calloc(MAX_LINE_SIZE, 1); + if ( NULL == smlHead[t]) { + errorPrint2("calloc failed! size:%d\n", MAX_LINE_SIZE); + exit(EXIT_FAILURE); + } + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "%s,id=\"%s%ld\"", stbInfo->stbName, stbInfo->childTblPrefix, t + pThreadInfo->start_table_from); + for (uint64_t j = 0; j < stbInfo->tagCount; j++) { + tstrncpy(smlHead[t] + dataLen, ",", 2); + dataLen += 1; + switch (stbInfo->tags[j].data_type) { + case TSDB_DATA_TYPE_TIMESTAMP: + errorPrint2("%s() LN%d, Does not support data type %s as tag\n", + __func__, __LINE__, + stbInfo->tags[j].dataType); + exit(EXIT_FAILURE); + case TSDB_DATA_TYPE_BOOL: + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "t%ld=%s", j, rand_bool_str()); + break; + case TSDB_DATA_TYPE_TINYINT: + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "t%ld=%si8", j, rand_tinyint_str()); + break; + case TSDB_DATA_TYPE_UTINYINT: + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "t%ld=%su8", j, rand_utinyint_str()); + break; + case TSDB_DATA_TYPE_SMALLINT: + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "t%ld=%si16", j, rand_smallint_str()); + break; + case TSDB_DATA_TYPE_USMALLINT: + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "t%ld=%su16", j, rand_usmallint_str()); + break; + case TSDB_DATA_TYPE_INT: + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "t%ld=%si32", j, rand_int_str()); + break; + case TSDB_DATA_TYPE_UINT: + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "t%ld=%su32", j, rand_uint_str()); + break; + case TSDB_DATA_TYPE_BIGINT: + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "t%ld=%si64", j, rand_bigint_str()); + break; + case TSDB_DATA_TYPE_UBIGINT: + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "t%ld=%su64", j, rand_ubigint_str()); + break; + case TSDB_DATA_TYPE_FLOAT: + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "t%ld=%sf32", j, rand_float_str()); + break; + case TSDB_DATA_TYPE_DOUBLE: + dataLen += snprintf(smlHead[t] + dataLen, MAX_LINE_SIZE - dataLen, + "t%ld=%sf64", j, rand_double_str()); + break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + if (stbInfo->tags[j].dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint2("binary or nchar length overflow, maxsize:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + exit(EXIT_FAILURE); + } + char* buf = (char*)calloc(stbInfo->tags[j].dataLen+1, 1); + if (NULL == buf) { + errorPrint2("calloc failed! size:%d\n", + stbInfo->tags[j].dataLen); + exit(EXIT_FAILURE); + } + rand_string(buf, stbInfo->tags[j].dataLen); + if(stbInfo->tags[j].data_type == TSDB_DATA_TYPE_BINARY) { + dataLen += snprintf(smlHead[t] + dataLen, + MAX_DATA_SIZE - dataLen, "t%ld=\"%s\"", j, buf); + } else { + dataLen += snprintf(smlHead[t] + dataLen, + MAX_DATA_SIZE - dataLen, "t%ld=L\"%s\"", j, buf); + } + tmfree(buf); + break; + + default: + errorPrint2("%s() LN%d, Unknown data type %s\n", + __func__, __LINE__, + stbInfo->tags[j].dataType); + exit(EXIT_FAILURE); + } + } + } + int currentPercent = 0; + int percentComplete = 0; + int totalAffectedRows = 0; + for (int64_t i = 0; i < insertRows; i++) { + timestamp = timestamp + i * timeStampStep; + for (uint64_t j = 0; j < pThreadInfo->ntables; j++) { + int64_t dataLen = 0; + lines[count] = calloc(MAX_LINE_SIZE, 1); + if (NULL == lines[count]) { + errorPrint2("calloc failed! size:%d\n", MAX_LINE_SIZE); + return NULL; + } + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "%s ", smlHead[j]); + for (uint32_t c = 0; c < stbInfo->columnCount; c++) { + if (c != 0) { + tstrncpy(lines[count] + dataLen, ",", 2); + dataLen += 1; + } + switch (stbInfo->columns[c].data_type) { + case TSDB_DATA_TYPE_TIMESTAMP: + errorPrint2("%s() LN%d, Does not support data type %s as tag\n", + __func__, __LINE__, + stbInfo->columns[c].dataType); + return NULL; + case TSDB_DATA_TYPE_BOOL: + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "c%d=%s", c, rand_bool_str()); + break; + case TSDB_DATA_TYPE_TINYINT: + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "c%d=%si8", c, rand_tinyint_str()); + break; + case TSDB_DATA_TYPE_UTINYINT: + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "c%d=%su8", c, rand_utinyint_str()); + break; + case TSDB_DATA_TYPE_SMALLINT: + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "c%d=%si16", c, rand_smallint_str()); + break; + case TSDB_DATA_TYPE_USMALLINT: + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "c%d=%su16", c, rand_usmallint_str()); + break; + case TSDB_DATA_TYPE_INT: + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "c%d=%si32", c, rand_int_str()); + break; + case TSDB_DATA_TYPE_UINT: + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "c%d=%su32", c, rand_uint_str()); + break; + case TSDB_DATA_TYPE_BIGINT: + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "c%d=%si64", c, rand_bigint_str()); + break; + case TSDB_DATA_TYPE_UBIGINT: + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "c%d=%su64", c, rand_ubigint_str()); + break; + case TSDB_DATA_TYPE_FLOAT: + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "c%d=%sf32", c, rand_float_str()); + break; + case TSDB_DATA_TYPE_DOUBLE: + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + "c%d=%sf64", c, rand_double_str()); + break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + if (stbInfo->columns[c].dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint2("binary or nchar length overflow, maxsize:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + exit(EXIT_FAILURE); + } + char* buf = (char*)calloc(stbInfo->columns[c].dataLen + 1, 1); + if (NULL == buf) { + errorPrint2("calloc failed! size:%d\n", + stbInfo->columns[c].dataLen); + exit(EXIT_FAILURE); + } + rand_string(buf, stbInfo->columns[c].dataLen); + if(stbInfo->columns[c].data_type == TSDB_DATA_TYPE_BINARY) { + dataLen += snprintf(lines[count] + dataLen, + MAX_DATA_SIZE - dataLen, "c%d=\"%s\"", c, buf); + } else { + dataLen += snprintf(lines[count] + dataLen, + MAX_DATA_SIZE - dataLen, "c%d=L\"%s\"", c, buf); + } + tmfree(buf); + break; + default: + errorPrint2("%s() LN%d, Unknown data type %s\n", + __func__, __LINE__, + stbInfo->columns[j].dataType); + return NULL; + } + + } + dataLen += snprintf(lines[count] + dataLen, MAX_LINE_SIZE - dataLen, + " %ld%s", timestamp, pThreadInfo->time_precision == TSDB_TIME_PRECISION_MILLI ? + "ms":(pThreadInfo->time_precision == TSDB_TIME_PRECISION_MICRO ? "us":"ns")); + count++; + if (count == batch) { + execSmlLines(pThreadInfo->taos, lines, batch, false); + totalAffectedRows += batch; + currentPercent = totalAffectedRows * g_Dbs.threadCount / insertRows; + if (currentPercent > percentComplete ) { + printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent); + percentComplete = currentPercent; + } + count = 0; + for (int index = 0; index < batch; index++) { + free(lines[index]); + } + + } + } + } + if(count != 0) { + execSmlLines(pThreadInfo->taos, lines, count, false); + totalAffectedRows += count; + currentPercent = totalAffectedRows * g_Dbs.threadCount / insertRows; + if (currentPercent > percentComplete ) { + printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent); + percentComplete = currentPercent; + } + for (int index = 0; index < count; index++) { + free(lines[index]); + } + } + for (int index = 0; index < pThreadInfo->ntables; index++) { + free(smlHead[index]); + } + return NULL; +} + static void* createTable(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; @@ -4593,6 +4857,105 @@ static void* createTable(void *sarg) return NULL; } +static int startMultiThreadInsertSchemaless(int threads, int64_t ntables, char* db_name, + char* precision, SSuperTable* stbInfo) { + uint64_t tableFrom = 0; + pthread_t *pids = calloc(1, threads * sizeof(pthread_t)); + threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); + if ((NULL == pids) || (NULL == infos)) { + ERROR_EXIT("startMultiThreadInsertSchemaless malloc failed\n"); + } + int32_t timePrec = TSDB_TIME_PRECISION_MILLI; + if (0 != precision[0]) { + if (0 == strncasecmp(precision, "ms", 2)) { + timePrec = TSDB_TIME_PRECISION_MILLI; + } else if (0 == strncasecmp(precision, "us", 2)) { + timePrec = TSDB_TIME_PRECISION_MICRO; + } else if (0 == strncasecmp(precision, "ns", 2)) { + timePrec = TSDB_TIME_PRECISION_NANO; + } else { + errorPrint2("Not support precision: %s\n", precision); + exit(EXIT_FAILURE); + } + } + + int64_t startTime; + if (stbInfo) { + if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) { + startTime = taosGetTimestamp(timePrec); + } else { + if (TSDB_CODE_SUCCESS != taosParseTime( + stbInfo->startTimestamp, + &startTime, + strlen(stbInfo->startTimestamp), + timePrec, 0)) { + ERROR_EXIT("failed to parse time!\n"); + } + } + } else { + startTime = DEFAULT_START_TIME; + } + debugPrint("%s() LN%d, startTime= %"PRId64"\n", + __func__, __LINE__, startTime); + if (threads < 1) { + threads = 1; + } + int64_t a = ntables / threads; + if (a < 1) { + threads = ntables; + a = 1; + } + int64_t b = 0; + b = ntables % threads; + + for (int64_t i = 0; i < threads; i++) { + threadInfo *pThreadInfo = infos + i; + pThreadInfo->threadID = i; + tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN); + pThreadInfo->stbInfo = stbInfo; + verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name); + pThreadInfo->taos = taos_connect( + g_Dbs.host, + g_Dbs.user, + g_Dbs.password, + db_name, + g_Dbs.port); + if (pThreadInfo->taos == NULL) { + errorPrint2("%s() LN%d, Failed to connect to TDengine, reason:%s\n", + __func__, __LINE__, taos_errstr(NULL)); + free(pids); + free(infos); + return -1; + } + pThreadInfo->time_precision = timePrec; + pThreadInfo->start_table_from = tableFrom; + pThreadInfo->ntables = iend_table_to = i < b ? tableFrom + a : tableFrom + a - 1; + tableFrom = pThreadInfo->end_table_to + 1; + pThreadInfo->start_time = startTime; + pThreadInfo->use_metric = true; + pThreadInfo->minDelay = UINT64_MAX; + pThreadInfo->tables_created = 0; + // generateSMLhead(pThreadInfo); + pthread_create(pids + i, NULL, insertSchemaless, pThreadInfo); + } + for (int i = 0; i < threads; i++) { + pthread_join(pids[i], NULL); + } + + for (int i = 0; i < threads; i++) { + threadInfo *pThreadInfo = infos + i; + taos_close(pThreadInfo->taos); + + g_actualChildTables += pThreadInfo->tables_created; + } + + free(pids); + free(infos); + + return 0; +} + static int startMultiThreadCreateChildTable( char* cols, int threads, uint64_t tableFrom, int64_t ntables, char* db_name, SSuperTable* stbInfo) { @@ -5513,6 +5876,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, TBNAME_PREFIX_LEN); + cJSON *schemaless = cJSON_GetObjectItem(stbInfo, "schemaless"); + if (schemaless + && schemaless->type == cJSON_String + && schemaless->valuestring != NULL) { + if (0 == strncasecmp(schemaless->valuestring, "yes", 3)) { + g_Dbs.db[i].superTbls[j].schemaless = true; + } else { + g_Dbs.db[i].superTbls[j].schemaless = false; + } + } else if (!schemaless) { + g_Dbs.db[i].superTbls[j].schemaless = false; + } else { + errorPrint("%s", "failed to read json, schemaless not found\n"); + goto PARSE_OVER; + } + cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table"); if (autoCreateTbl && autoCreateTbl->type == cJSON_String @@ -10852,6 +11231,46 @@ static int insertTestProcess() { double start; double end; + for (int i = 0; i < g_Dbs.dbCount; i++) { + if (g_Dbs.use_metric) { + if (g_Dbs.db[i].superTblCount > 0) { + for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { + SSuperTable* stbInfo = &g_Dbs.db[i].superTbls[j]; + if (stbInfo->schemaless) { + fprintf(stderr, + "start schemaless insert into %"PRId64" table(s) for %ld records each with %d thread(s)\n\n", + g_totalChildTables, stbInfo->insertRows, g_Dbs.threadCount); + if (g_fpOfInsertResult) { + fprintf(g_fpOfInsertResult, + "start schemaless insert into %"PRId64" table(s) for %ld records each with %d thread(s)\n\n", + g_totalChildTables, stbInfo->insertRows, g_Dbs.threadCount); + } + start = taosGetTimestampMs(); + startMultiThreadInsertSchemaless( + g_Dbs.threadCount, + g_Dbs.db[i].superTbls[j].childTblCount, + g_Dbs.db[i].dbName, + g_Dbs.db[i].dbCfg.precision, + stbInfo); + end = taosGetTimestampMs(); + fprintf(stderr, + "\nSpent %.4f seconds schemaless insert into %"PRId64" table(s) with %d thread(s) and %"PRId64" records each\n\n", + (end - start)/1000.0, g_totalChildTables, + g_Dbs.threadCount, stbInfo->insertRows); + if (g_fpOfInsertResult) { + fprintf(g_fpOfInsertResult, + "\nSpent %.4f seconds schemaless insert into %"PRId64" table(s) with %d thread(s),and %"PRId64" records each\n\n", + (end - start)/1000.0, g_totalChildTables, + g_Dbs.threadCount, stbInfo->insertRows); + } + return 0; + } + + } + } + } + } + if (g_totalChildTables > 0) { fprintf(stderr, "creating %"PRId64" table(s) with %d thread(s)\n\n",