diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 372571233974b2a8c7a24098f7828fd7e7e00a41..9f367b41f8aaefd5871d206093105657d6c25378 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -124,7 +124,7 @@ typedef enum enum_INSERT_MODE { typedef enum enumQUERY_TYPE { NO_INSERT_TYPE, - INSERT_TYPE, + INSERT_TYPE, QUERY_TYPE_BUT } QUERY_TYPE; @@ -229,7 +229,7 @@ typedef struct SColumn_S { typedef struct SSuperTable_S { char sTblName[MAX_TB_NAME_SIZE+1]; int childTblCount; - bool childTblExists; // 0: no, 1: yes + bool childTblExists; // 0: no, 1: yes int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table char childTblPrefix[MAX_TB_NAME_SIZE]; @@ -239,15 +239,15 @@ typedef struct SSuperTable_S { int childTblOffset; int multiThreadWriteOneTbl; // 0: no, 1: yes - int interlaceRows; // + int interlaceRows; // int disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision - int maxSqlLen; // + int maxSqlLen; // int insertInterval; // insert interval, will override global insert interval int64_t insertRows; // 0: no limit int timeStampStep; - char startTimestamp[MAX_TB_NAME_SIZE]; // + char startTimestamp[MAX_TB_NAME_SIZE]; // char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json char sampleFile[MAX_FILE_NAME_LEN+1]; char tagsFile[MAX_FILE_NAME_LEN+1]; @@ -539,7 +539,7 @@ SArguments g_args = { true, // insert_only false, // debug_print false, // verbose_print - false, // performance statistic print + false, // performance statistic print false, // answer_yes; "./output.txt", // output_file 0, // mode : sync or async @@ -641,7 +641,7 @@ static void printHelp() { "The password to use when connecting to the server. Default is 'taosdata'."); printf("%s%s%s%s\n", indent, "-c", indent, "Configuration directory. Default is '/etc/taos/'."); -#endif +#endif printf("%s%s%s%s\n", indent, "-h", indent, "The host to connect to TDengine. Default is localhost."); printf("%s%s%s%s\n", indent, "-p", indent, @@ -684,7 +684,7 @@ static void printHelp() { "Print debug info."); printf("%s%s%s%s\n", indent, "-V, --version", indent, "Print version info."); -/* printf("%s%s%s%s\n", indent, "-D", indent, +/* printf("%s%s%s%s\n", indent, "-D", indent, "if elete database if exists. 0: no, 1: yes, default is 1"); */ } @@ -749,7 +749,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { && strcasecmp(argv[i], "SMALLINT") && strcasecmp(argv[i], "BIGINT") && strcasecmp(argv[i], "DOUBLE") - && strcasecmp(argv[i], "BINARY") + && strcasecmp(argv[i], "BINARY") && strcasecmp(argv[i], "NCHAR")) { printHelp(); ERROR_EXIT( "Invalid data_type!\n"); @@ -1762,7 +1762,7 @@ static void printfQuerySystemInfo(TAOS * taos) { } for (int i = 0; i < dbCount; i++) { - // printf database info + // printf database info printfDbInfoForQueryToFile(filename, dbInfos[i], i); // show db.vgroups @@ -2098,7 +2098,7 @@ static int calcRowLen(SSuperTable* superTbls) { lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 6; } else if (strcasecmp(dataType, "FLOAT") == 0) { lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 22; - } else if (strcasecmp(dataType, "DOUBLE") == 0) { + } else if (strcasecmp(dataType, "DOUBLE") == 0) { lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 42; } else { printf("get error tag type : %s\n", dataType); @@ -2262,7 +2262,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, /* if (TBL_ALREADY_EXISTS == superTbls->childTblExists) { - //get all child table name use cmd: select tbname from superTblName; + //get all child table name use cmd: select tbname from superTblName; int childTblCount = 10000; superTbls->childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN); if (superTbls->childTblName == NULL) { @@ -2289,7 +2289,7 @@ static int createSuperTable(TAOS * taos, char* dbName, int lenOfOneRow = 0; for (colIndex = 0; colIndex < superTbls->columnCount; colIndex++) { char* dataType = superTbls->columns[colIndex].dataType; - + if (strcasecmp(dataType, "BINARY") == 0) { len += snprintf(cols + len, STRING_LEN - len, ", col%d %s(%d)", colIndex, "BINARY", @@ -2386,7 +2386,7 @@ static int createSuperTable(TAOS * taos, char* dbName, len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "FLOAT"); lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 22; - } else if (strcasecmp(dataType, "DOUBLE") == 0) { + } else if (strcasecmp(dataType, "DOUBLE") == 0) { len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "DOUBLE"); lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 42; @@ -2638,7 +2638,7 @@ static void* createTable(void *sarg) lastPrintTime = currentPrintTime; } } - + if (0 != len) { verbosePrint("%s() %d buffer: %s\n", __func__, __LINE__, buffer); if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE, false)) { @@ -2703,7 +2703,7 @@ static int startMultiThreadCreateChildTable( t_info->minDelay = INT16_MAX; pthread_create(pids + i, NULL, createTable, t_info); } - + for (int i = 0; i < threads; i++) { pthread_join(pids[i], NULL); } @@ -2920,7 +2920,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile( cJSON* stbInfo, SSuperTable* superTbls) { bool ret = false; - // columns + // columns cJSON *columns = cJSON_GetObjectItem(stbInfo, "columns"); if (columns && columns->type != cJSON_Array) { printf("ERROR: failed to read json, columns not found\n"); @@ -2958,7 +2958,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile( count = 1; } - // column info + // column info memset(&columnCase, 0, sizeof(StrColumn)); cJSON *dataType = cJSON_GetObjectItem(column, "type"); if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { @@ -2989,7 +2989,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile( count = 1; index = 0; - // tags + // tags cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags"); if (!tags || tags->type != cJSON_Array) { debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__); @@ -3018,7 +3018,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile( count = 1; } - // column info + // column info memset(&columnCase, 0, sizeof(StrColumn)); cJSON *dataType = cJSON_GetObjectItem(tag, "type"); if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { @@ -3166,7 +3166,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { if (numRecPerReq && numRecPerReq->type == cJSON_Number) { g_args.num_of_RPR = numRecPerReq->valueint; } else if (!numRecPerReq) { - g_args.num_of_RPR = 100; + g_args.num_of_RPR = 0xffff; } else { errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", __func__, __LINE__); goto PARSE_OVER; @@ -3209,7 +3209,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { cJSON* dbinfos = cJSON_GetArrayItem(dbs, i); if (dbinfos == NULL) continue; - // dbinfo + // dbinfo cJSON *dbinfo = cJSON_GetObjectItem(dbinfos, "dbinfo"); if (!dbinfo || dbinfo->type != cJSON_Object) { printf("ERROR: failed to read json, dbinfo not found\n"); @@ -3615,7 +3615,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { } g_Dbs.db[i].superTbls[j].maxSqlLen = len; } else if (!maxSqlLen) { - g_Dbs.db[i].superTbls[j].maxSqlLen = TSDB_MAX_SQL_LEN; + g_Dbs.db[i].superTbls[j].maxSqlLen = g_args.max_sql_len; } else { printf("ERROR: failed to read json, maxSqlLen not found\n"); goto PARSE_OVER; @@ -3748,7 +3748,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { cJSON* user = cJSON_GetObjectItem(root, "user"); if (user && user->type == cJSON_String && user->valuestring != NULL) { - tstrncpy(g_queryInfo.user, user->valuestring, MAX_USERNAME_SIZE); + tstrncpy(g_queryInfo.user, user->valuestring, MAX_USERNAME_SIZE); } else if (!user) { tstrncpy(g_queryInfo.user, "root", MAX_USERNAME_SIZE); ; } @@ -3805,7 +3805,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { goto PARSE_OVER; } - // super_table_query + // super_table_query cJSON *specifiedQuery = cJSON_GetObjectItem(root, "specified_table_query"); if (!specifiedQuery) { g_queryInfo.specifiedQueryInfo.concurrent = 0; @@ -3930,7 +3930,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } } - // sub_table_query + // sub_table_query cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query"); if (!superQuery) { g_queryInfo.superQueryInfo.threadCnt = 0; @@ -3996,7 +3996,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { cJSON* subinterval = cJSON_GetObjectItem(superQuery, "interval"); if (subinterval && subinterval->type == cJSON_Number) { g_queryInfo.superQueryInfo.subscribeInterval = subinterval->valueint; - } else if (!subinterval) { + } else if (!subinterval) { //printf("failed to read json, subscribe interval no found\n"); //goto PARSE_OVER; g_queryInfo.superQueryInfo.subscribeInterval = 10000; @@ -4200,71 +4200,77 @@ static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); (*sampleUsePos)++; - + return dataLen; } -static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* stbInfo) { - int dataLen = 0; - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); +static int generateRowData(char* recBuf, int64_t timestamp, SSuperTable* stbInfo) { + int dataLen = 0; + char *pstr = recBuf; + int maxLen = MAX_DATA_SIZE; + + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); + for (int i = 0; i < stbInfo->columnCount; i++) { if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6)) || (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { errorPrint( "binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); - return (-1); + return -1; } char* buf = (char*)calloc(stbInfo->columns[i].dataLen+1, 1); if (NULL == buf) { errorPrint( "calloc failed! size:%d\n", stbInfo->columns[i].dataLen); - return (-1); + return -1; } rand_string(buf, stbInfo->columns[i].dataLen); - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "\'%s\', ", buf); + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\', ", buf); tmfree(buf); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "int", 3)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d, ", rand_int()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "bigint", 6)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "float", 5)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%f, ", rand_float()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "double", 6)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%f, ", rand_double()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "smallint", 8)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_smallint()); + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d, ", rand_smallint()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "tinyint", 7)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_tinyint()); + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d, ", rand_tinyint()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "bool", 4)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_bool()); + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%d, ", rand_bool()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "timestamp", 9)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint()); + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint()); } else { errorPrint( "No support data type: %s\n", stbInfo->columns[i].dataType); - return (-1); + return -1; } } dataLen -= 2; - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); + dataLen += snprintf(pstr + dataLen, maxLen - dataLen, ")"); - return dataLen; + verbosePrint("%s() LN%d, recBuf:\n\t%s\n", __func__, __LINE__, recBuf); + + return strlen(recBuf); } -static int32_t generateData(char *res, char **data_type, +static int32_t generateData(char *recBuf, char **data_type, int num_of_cols, int64_t timestamp, int lenOfBinary) { - memset(res, 0, MAX_DATA_SIZE); - char *pstr = res; + memset(recBuf, 0, MAX_DATA_SIZE); + char *pstr = recBuf; pstr += sprintf(pstr, "(%" PRId64, timestamp); int c = 0; @@ -4285,7 +4291,7 @@ static int32_t generateData(char *res, char **data_type, } else if (strcasecmp(data_type[i % c], "smallint") == 0) { pstr += sprintf(pstr, ", %d", rand_smallint()); } else if (strcasecmp(data_type[i % c], "int") == 0) { - pstr += sprintf(pstr, ", %d", rand_int()); + pstr += sprintf(pstr, ", %d", rand_int()); } else if (strcasecmp(data_type[i % c], "bigint") == 0) { pstr += sprintf(pstr, ", %" PRId64, rand_bigint()); } else if (strcasecmp(data_type[i % c], "float") == 0) { @@ -4308,7 +4314,7 @@ static int32_t generateData(char *res, char **data_type, free(s); } - if (pstr - res > MAX_DATA_SIZE) { + if (strlen(recBuf) > MAX_DATA_SIZE) { perror("column length too long, abort"); exit(-1); } @@ -4316,7 +4322,7 @@ static int32_t generateData(char *res, char **data_type, pstr += sprintf(pstr, ")"); - return (int32_t)(pstr - res); + return (int32_t)strlen(recBuf); } static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { @@ -4325,9 +4331,9 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { sampleDataBuf = calloc( superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); if (sampleDataBuf == NULL) { - errorPrint("%s() LN%d, Failed to calloc %d Bytes, reason:%s\n", + errorPrint("%s() LN%d, Failed to calloc %d Bytes, reason:%s\n", __func__, __LINE__, - superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, + superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, strerror(errno)); return -1; } @@ -4396,7 +4402,7 @@ static void getTableName(char *pTblName, threadInfo* pThreadInfo, int tableSeq) static int generateDataTail(char *tableName, int32_t tableSeq, threadInfo* pThreadInfo, SSuperTable* superTblInfo, - int batch, char* buffer, int64_t insertRows, + int batch, char* buffer, int remainderBufLen, int64_t insertRows, int64_t startFrom, uint64_t startTime, int *pSamplePos, int *dataLen) { int len = 0; int ncols_per_record = 1; // count first col ts @@ -4413,18 +4419,19 @@ static int generateDataTail(char *tableName, int32_t tableSeq, int k = 0; for (k = 0; k < batch;) { - if (superTblInfo) { - int retLen = 0; + char data[MAX_DATA_SIZE]; + int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, + if (superTblInfo) { + if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { retLen = getRowDataFromSample( - buffer + len, - superTblInfo->maxSqlLen - len, + data, + remainderBufLen, startTime + superTblInfo->timeStampStep * k, superTblInfo, pSamplePos); - } else if (0 == strncasecmp(superTblInfo->dataSource, + } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { int rand_num = rand_tinyint() % 100; if (0 != superTblInfo->disorderRatio @@ -4433,60 +4440,56 @@ static int generateDataTail(char *tableName, int32_t tableSeq, + superTblInfo->timeStampStep * k - taosRandom() % superTblInfo->disorderRange; retLen = generateRowData( - buffer + len, - superTblInfo->maxSqlLen - len, + data, d, superTblInfo); } else { retLen = generateRowData( - buffer + len, - superTblInfo->maxSqlLen - len, + data, startTime + superTblInfo->timeStampStep * k, superTblInfo); - } - } - - if (retLen < 0) { - return -1; - } + } + } - len += retLen; + if (retLen > remainderBufLen) { + break; + } - if (len >= (superTblInfo->maxSqlLen - 256)) { // reserve for overwrite - k++; - break; - } + buffer += sprintf(buffer, " %s", data); + k++; + len += retLen; + remainderBufLen -= retLen; } else { int rand_num = taosRandom() % 100; - char data[MAX_DATA_SIZE]; - char **data_type = g_args.datatype; - int lenOfBinary = g_args.len_of_binary; + char **data_type = g_args.datatype; + int lenOfBinary = g_args.len_of_binary; if ((g_args.disorderRatio != 0) && (rand_num < g_args.disorderRange)) { - int64_t d = startTime + DEFAULT_TIMESTAMP_STEP * k + int64_t d = startTime + DEFAULT_TIMESTAMP_STEP * k - taosRandom() % 1000000 + rand_num; - len = generateData(data, data_type, + retLen = generateData(data, data_type, ncols_per_record, d, lenOfBinary); } else { - len = generateData(data, data_type, + retLen = generateData(data, data_type, ncols_per_record, startTime + DEFAULT_TIMESTAMP_STEP * k, lenOfBinary); } + if (len > remainderBufLen) + break; + buffer += sprintf(buffer, " %s", data); - if (strlen(buffer) >= (g_args.max_sql_len - 256)) { // too long - k++; - break; - } + k++; + len += retLen; + remainderBufLen -= retLen; } verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); - k++; startFrom ++; if (startFrom >= insertRows) { @@ -4570,20 +4573,25 @@ static int generateProgressiveDataBuffer(char *pTblName, assert(buffer != NULL); - memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); + int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; + int remainderBufLen = maxSqlLen; + + memset(buffer, 0, maxSqlLen); char *pstr = buffer; int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo, buffer); pstr += headLen; + remainderBufLen -= headLen; int k; int dataLen; k = generateDataTail(pTblName, tableSeq, pThreadInfo, superTblInfo, - g_args.num_of_RPR, pstr, insertRows, startFrom, + g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom, startTime, pSamplePos, &dataLen); + return k; } @@ -4656,13 +4664,18 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int generatedRecPerTbl = 0; bool flagSleep = true; int sleepTimeTotal = 0; + + int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; + int remainderBufLen; + while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { if ((flagSleep) && (insert_interval)) { st = taosGetTimestampUs(); flagSleep = false; } // generate data - memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); + memset(buffer, 0, maxSqlLen); + remainderBufLen = maxSqlLen; char *pstr = buffer; int recOfBatch = 0; @@ -4685,6 +4698,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->threadID, __func__, __LINE__, i, buffer); pstr += headLen; + remainderBufLen -= headLen; + int dataLen = 0; verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n", @@ -4698,13 +4713,20 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { } else { startTime = 1500000000000; } - generateDataTail( + int generated = generateDataTail( tableName, tableSeq, pThreadInfo, superTblInfo, - batchPerTbl, pstr, insertRows, 0, + batchPerTbl, pstr, remainderBufLen, insertRows, 0, startTime, &(pThreadInfo->samplePos), &dataLen); + if (generated < 0) { + debugPrint("[%d] %s() LN%d, generated data is %d\n", + pThreadInfo->threadID, __func__, __LINE__, generated); + goto free_and_statistics_interlace; + } pstr += dataLen; + remainderBufLen -= dataLen; + recOfBatch += batchPerTbl; startTime += batchPerTbl * superTblInfo->timeStampStep; pThreadInfo->totalInsertRows += batchPerTbl; @@ -4796,9 +4818,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { free_and_statistics_interlace: tmfree(buffer); - printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", - pThreadInfo->threadID, - pThreadInfo->totalInsertRows, + printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, pThreadInfo->totalAffectedRows); return NULL; } @@ -4929,16 +4951,16 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { free_and_statistics_2: tmfree(buffer); - printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", - pThreadInfo->threadID, - pThreadInfo->totalInsertRows, + printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, pThreadInfo->totalAffectedRows); return NULL; } static void* syncWrite(void *sarg) { - threadInfo *winfo = (threadInfo *)sarg; + threadInfo *winfo = (threadInfo *)sarg; SSuperTable* superTblInfo = winfo->superTblInfo; int interlaceRows = superTblInfo?superTblInfo->interlaceRows:g_args.interlace_rows; @@ -4953,7 +4975,7 @@ static void* syncWrite(void *sarg) { } static void callBack(void *param, TAOS_RES *res, int code) { - threadInfo* winfo = (threadInfo*)param; + threadInfo* winfo = (threadInfo*)param; SSuperTable* superTblInfo = winfo->superTblInfo; int insert_interval = @@ -4966,7 +4988,7 @@ static void callBack(void *param, TAOS_RES *res, int code) { } char *buffer = calloc(1, winfo->superTblInfo->maxSqlLen); - char *data = calloc(1, MAX_DATA_SIZE); + char data[MAX_DATA_SIZE]; char *pstr = buffer; pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_from); @@ -4978,7 +5000,6 @@ static void callBack(void *param, TAOS_RES *res, int code) { if (winfo->start_table_from > winfo->end_table_to) { tsem_post(&winfo->lock_sem); free(buffer); - free(data); taos_free_result(res); return; } @@ -4988,11 +5009,9 @@ static void callBack(void *param, TAOS_RES *res, int code) { if (0 != winfo->superTblInfo->disorderRatio && rand_num < winfo->superTblInfo->disorderRatio) { int64_t d = winfo->lastTs - taosRandom() % 1000000 + rand_num; - //generateData(data, datatype, ncols_per_record, d, len_of_binary); - generateRowData(data, MAX_DATA_SIZE, d, winfo->superTblInfo); + generateRowData(data, d, winfo->superTblInfo); } else { - //generateData(data, datatype, ncols_per_record, start_time += 1000, len_of_binary); - generateRowData(data, MAX_DATA_SIZE, winfo->lastTs += 1000, winfo->superTblInfo); + generateRowData(data, winfo->lastTs += 1000, winfo->superTblInfo); } pstr += sprintf(pstr, "%s", data); winfo->counter++; @@ -5007,7 +5026,6 @@ static void callBack(void *param, TAOS_RES *res, int code) { } taos_query_a(winfo->taos, buffer, callBack, winfo); free(buffer); - free(data); taos_free_result(res); } @@ -5373,7 +5391,7 @@ static void *readTable(void *sarg) { } static void *readMetric(void *sarg) { -#if 1 +#if 1 threadInfo *rinfo = (threadInfo *)sarg; TAOS *taos = rinfo->taos; char command[BUFFER_SIZE] = "\0"; @@ -5524,7 +5542,7 @@ static int insertTestProcess() { //int64_t totalInsertRows = 0; //int64_t totalAffectedRows = 0; - //for (int i = 0; i < g_Dbs.dbCount; i++) { + //for (int i = 0; i < g_Dbs.dbCount; i++) { // for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // totalInsertRows+= g_Dbs.db[i].superTbls[j].totalInsertRows; // totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows; @@ -5921,7 +5939,7 @@ static void *subSubscribeProcess(void *sarg) { sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[i], winfo->threadID); } - tsub[i] = subscribeImpl(winfo->taos, subSqlstr, topic, tmpFile); + tsub[i] = subscribeImpl(winfo->taos, subSqlstr, topic, tmpFile); if (NULL == tsub[i]) { taos_close(winfo->taos); return NULL; @@ -6109,7 +6127,7 @@ static int subscribeTestProcess() { && (g_queryInfo.superQueryInfo.threadCnt > 0)) { pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t)); - infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * + infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo)); if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { errorPrint("%s() LN%d, malloc failed for create threads\n", @@ -6256,7 +6274,7 @@ static void setParaFromArg(){ g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; - g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; + g_Dbs.db[0].superTbls[0].maxSqlLen = g_args.max_sql_len; g_Dbs.db[0].superTbls[0].columnCount = 0; for (int i = 0; i < MAX_NUM_DATATYPE; i++) { diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index c3282ce6f7b4a43a4c56599fc604efb7a9068c73..8e15d1b5ecb1554a3b7f5144566a0de0efff28b5 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -139,6 +139,18 @@ python3 ./test.py -f import_merge/importInsertThenImport.py python3 ./test.py -f import_merge/importCSV.py #======================p1-end=============== #======================p2-start=============== +# tools +python3 test.py -f tools/taosdumpTest.py + +python3 test.py -f tools/taosdemoTest.py +python3 test.py -f tools/taosdemoTestWithoutMetric.py +python3 test.py -f tools/taosdemoTestWithJson.py +python3 test.py -f tools/taosdemoTestLimitOffset.py +python3 test.py -f tools/taosdemoTestTblAlt.py +python3 test.py -f tools/taosdemoTestSampleData.py +python3 test.py -f tools/taosdemoTestInterlace.py +python3 test.py -f tools/taosdemoTestQuery.py + # update python3 ./test.py -f update/allow_update.py python3 ./test.py -f update/allow_update-0.py @@ -247,18 +259,6 @@ python3 test.py -f subscribe/supertable.py #======================p3-end=============== #======================p4-start=============== -# tools -python3 test.py -f tools/taosdumpTest.py - -python3 test.py -f tools/taosdemoTest.py -python3 test.py -f tools/taosdemoTestWithoutMetric.py -python3 test.py -f tools/taosdemoTestWithJson.py -python3 test.py -f tools/taosdemoTestLimitOffset.py -python3 test.py -f tools/taosdemoTest2.py -python3 test.py -f tools/taosdemoTestSampleData.py -python3 test.py -f tools/taosdemoTestInterlace.py -python3 test.py -f tools/taosdemoTestQuery.py - python3 ./test.py -f update/merge_commit_data-0.py # wal python3 ./test.py -f wal/addOldWalTest.py diff --git a/tests/pytest/tools/taosdemoTest2.py b/tests/pytest/tools/taosdemoTestTblAlt.py similarity index 63% rename from tests/pytest/tools/taosdemoTest2.py rename to tests/pytest/tools/taosdemoTestTblAlt.py index 74b05faf8b2297477153a5ed482cadc9726b3761..bb367817cf082a4a4b75f2deac6883d72d7ba145 100644 --- a/tests/pytest/tools/taosdemoTest2.py +++ b/tests/pytest/tools/taosdemoTestTblAlt.py @@ -29,10 +29,33 @@ class TDTestCase: self.numberOfTables = 10 self.numberOfRecords = 1000000 + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + def insertDataAndAlterTable(self, threadID): + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + binPath = buildPath + "/build/bin/" + if(threadID == 0): - os.system("taosdemo -y -t %d -n %d" % - (self.numberOfTables, self.numberOfRecords)) + os.system("%staosdemo -y -t %d -n %d" % + (binPath, self.numberOfTables, self.numberOfRecords)) if(threadID == 1): time.sleep(2) print("use test") @@ -47,7 +70,13 @@ class TDTestCase: # check if all the tables have heen created while True: - tdSql.query("show tables") + try: + tdSql.query("show tables") + except Exception as e: + tdLog.info("show tables test failed") + time.sleep(1) + continue + rows = tdSql.queryRows print("number of tables: %d" % rows) if(rows == self.numberOfTables): @@ -56,16 +85,23 @@ class TDTestCase: # check if there are any records in the last created table while True: print("query started") - tdSql.query("select * from test.t9") + try: + tdSql.query("select * from test.t9") + except Exception as e: + tdLog.info("select * test failed") + time.sleep(2) + continue + rows = tdSql.queryRows print("number of records: %d" % rows) if(rows > 0): break time.sleep(1) + print("alter table test.meters add column col10 int") tdSql.execute("alter table test.meters add column col10 int") - print("insert into test.t0 values (now, 1, 2, 3, 4, 0.1, 0.01,'test', '测试', TRUE, 1610000000000, 0)") - tdSql.execute("insert into test.t0 values (now, 1, 2, 3, 4, 0.1, 0.01,'test', '测试', TRUE, 1610000000000, 0)") + print("insert into test.t9 values (now, 1, 2, 3, 4, 0.1, 0.01,'test', '测试', TRUE, 1610000000000, 0)") + tdSql.execute("insert into test.t9 values (now, 1, 2, 3, 4, 0.1, 0.01,'test', '测试', TRUE, 1610000000000, 0)") def run(self): tdSql.prepare() @@ -78,6 +114,8 @@ class TDTestCase: t1.join() t2.join() + time.sleep(3) + tdSql.query("select count(*) from test.meters") tdSql.checkData(0, 0, self.numberOfRecords * self.numberOfTables + 1)