未验证 提交 8e455df2 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 3327 taosdemo segfault sampledata (#5458)

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

check offset 0.

* [TD-3316] <fix>: add testcase for taosdemo limit and offset.

fix sample file import bug.

* [TD-3316] <fix>: add test case for limit and offset. fix sample data issue.

* [TD-3327] <fix>: fix taosdemo segfault when import data from sample data file.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 fcae1ef2
...@@ -41,8 +41,7 @@ ...@@ -41,8 +41,7 @@
"insert_mode": "taosc", "insert_mode": "taosc",
"insert_rows": 100000, "insert_rows": 100000,
"multi_thread_write_one_tbl": "no", "multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0, "rows_per_tbl": 0,
"rows_per_tbl": 100,
"max_sql_len": 1024000, "max_sql_len": 1024000,
"disorder_ratio": 0, "disorder_ratio": 0,
"disorder_range": 1000, "disorder_range": 1000,
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
#ifdef LINUX #ifdef LINUX
#include <argp.h> #include <argp.h>
#include <assert.h>
#include <inttypes.h> #include <inttypes.h>
#ifndef _ALPINE #ifndef _ALPINE
#include <error.h> #include <error.h>
...@@ -39,11 +38,11 @@ ...@@ -39,11 +38,11 @@
#include <wordexp.h> #include <wordexp.h>
#include <regex.h> #include <regex.h>
#else #else
#include <assert.h>
#include <regex.h> #include <regex.h>
#include <stdio.h> #include <stdio.h>
#endif #endif
#include <assert.h>
#include <stdlib.h> #include <stdlib.h>
#include "cJSON.h" #include "cJSON.h"
...@@ -221,7 +220,6 @@ typedef struct SSuperTable_S { ...@@ -221,7 +220,6 @@ typedef struct SSuperTable_S {
int childTblOffset; int childTblOffset;
int multiThreadWriteOneTbl; // 0: no, 1: yes int multiThreadWriteOneTbl; // 0: no, 1: yes
int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl
int rowsPerTbl; // int rowsPerTbl; //
int disorderRatio; // 0: no disorder, >0: x% int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision int disorderRange; // ms or us by database precision
...@@ -396,6 +394,8 @@ typedef struct SThreadInfo_S { ...@@ -396,6 +394,8 @@ typedef struct SThreadInfo_S {
uint64_t et; uint64_t et;
int64_t lastTs; int64_t lastTs;
// sample data
int samplePos;
// statistics // statistics
int64_t totalInsertRows; int64_t totalInsertRows;
int64_t totalAffectedRows; int64_t totalAffectedRows;
...@@ -1126,8 +1126,6 @@ static int printfInsertMeta() { ...@@ -1126,8 +1126,6 @@ static int printfInsertMeta() {
}else { }else {
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
} }
printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].numberOfTblInOneSql);
printf(" rowsPerTbl: \033[33m%d\033[0m\n", printf(" rowsPerTbl: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].rowsPerTbl); g_Dbs.db[i].superTbls[j].rowsPerTbl);
printf(" disorderRange: \033[33m%d\033[0m\n", printf(" disorderRange: \033[33m%d\033[0m\n",
...@@ -1287,7 +1285,6 @@ static void printfInsertMetaToFile(FILE* fp) { ...@@ -1287,7 +1285,6 @@ static void printfInsertMetaToFile(FILE* fp) {
}else { }else {
fprintf(fp, " multiThreadWriteOneTbl: yes\n"); fprintf(fp, " multiThreadWriteOneTbl: yes\n");
} }
fprintf(fp, " numberOfTblInOneSql: %d\n", g_Dbs.db[i].superTbls[j].numberOfTblInOneSql);
fprintf(fp, " rowsPerTbl: %d\n", g_Dbs.db[i].superTbls[j].rowsPerTbl); fprintf(fp, " rowsPerTbl: %d\n", g_Dbs.db[i].superTbls[j].rowsPerTbl);
fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange);
fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio); fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio);
...@@ -2335,7 +2332,8 @@ static int createDatabases() { ...@@ -2335,7 +2332,8 @@ static int createDatabases() {
" fsync %d", g_Dbs.db[i].dbCfg.fsync); " fsync %d", g_Dbs.db[i].dbCfg.fsync);
} }
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms"))) if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms")))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", strlen("us")))) { || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"us", strlen("us")))) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" precision \'%s\';", g_Dbs.db[i].dbCfg.precision); " precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
} }
...@@ -2351,14 +2349,17 @@ static int createDatabases() { ...@@ -2351,14 +2349,17 @@ static int createDatabases() {
debugPrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); debugPrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// describe super table, if exists // describe super table, if exists
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].sTblName);
verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command); verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS; g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS;
ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); ret = createSuperTable(taos, g_Dbs.db[i].dbName,
&g_Dbs.db[i].superTbls[j], g_Dbs.use_metric);
} else { } else {
g_Dbs.db[i].superTbls[j].superTblExists = TBL_ALREADY_EXISTS; g_Dbs.db[i].superTbls[j].superTblExists = TBL_ALREADY_EXISTS;
ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j]); ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName,
&g_Dbs.db[i].superTbls[j]);
} }
if (0 != ret) { if (0 != ret) {
...@@ -2715,6 +2716,8 @@ static int readSampleFromCsvFileToMem( ...@@ -2715,6 +2716,8 @@ static int readSampleFromCsvFileToMem(
continue; continue;
} }
verbosePrint("readLen=%ld stb->lenOfOneRow=%d getRows=%d\n", readLen, superTblInfo->lenOfOneRow, getRows);
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow, memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
line, readLen); line, readLen);
getRows++; getRows++;
...@@ -3426,24 +3429,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3426,24 +3429,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n"); printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* numberOfTblInOneSql = cJSON_GetObjectItem(stbInfo, "number_of_tbl_in_one_sql");
if (numberOfTblInOneSql && numberOfTblInOneSql->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].numberOfTblInOneSql = numberOfTblInOneSql->valueint;
} else if (!numberOfTblInOneSql) {
g_Dbs.db[i].superTbls[j].numberOfTblInOneSql = 0;
} else {
printf("ERROR: failed to read json, numberOfTblInOneSql not found\n");
goto PARSE_OVER;
}
cJSON* rowsPerTbl = cJSON_GetObjectItem(stbInfo, "rows_per_tbl"); cJSON* rowsPerTbl = cJSON_GetObjectItem(stbInfo, "rows_per_tbl");
if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) { if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].rowsPerTbl = rowsPerTbl->valueint; g_Dbs.db[i].superTbls[j].rowsPerTbl = rowsPerTbl->valueint;
} else if (!rowsPerTbl) { } else if (!rowsPerTbl) {
g_Dbs.db[i].superTbls[j].rowsPerTbl = 1; g_Dbs.db[i].superTbls[j].rowsPerTbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else { } else {
printf("ERROR: failed to read json, rowsPerTbl not found\n"); fprintf(stderr, "ERROR: failed to read json, rowsPerTbl input mistake\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3901,7 +3893,7 @@ PARSE_OVER: ...@@ -3901,7 +3893,7 @@ PARSE_OVER:
return ret; return ret;
} }
void prepareSampleData() { static void prepareSampleData() {
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
//if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) { //if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) {
...@@ -3915,7 +3907,7 @@ void prepareSampleData() { ...@@ -3915,7 +3907,7 @@ void prepareSampleData() {
} }
} }
void postFreeResource() { static void postFreeResource() {
tmfclose(g_fpOfInsertResult); tmfclose(g_fpOfInsertResult);
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
...@@ -3942,16 +3934,18 @@ void postFreeResource() { ...@@ -3942,16 +3934,18 @@ void postFreeResource() {
static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp,
SSuperTable* superTblInfo, int* sampleUsePos) { SSuperTable* superTblInfo, int* sampleUsePos) {
if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
int ret = readSampleFromCsvFileToMem(superTblInfo); /* int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) { if (0 != ret) {
tmfree(superTblInfo->sampleDataBuf); tmfree(superTblInfo->sampleDataBuf);
superTblInfo->sampleDataBuf = NULL; superTblInfo->sampleDataBuf = NULL;
return -1; return -1;
} }
*/
*sampleUsePos = 0; *sampleUsePos = 0;
} }
int dataLen = 0; int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"(%" PRId64 ", ", timestamp); "(%" PRId64 ", ", timestamp);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
...@@ -3967,12 +3961,14 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper ...@@ -3967,12 +3961,14 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
int dataLen = 0; int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp);
for (int i = 0; i < stbInfo->columnCount; i++) { for (int i = 0; i < stbInfo->columnCount; i++) {
if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6)) || (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) { if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6))
|| (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
printf("binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); printf("binary or nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return (-1); return (-1);
} }
char* buf = (char*)calloc(stbInfo->columns[i].dataLen+1, 1); char* buf = (char*)calloc(stbInfo->columns[i].dataLen+1, 1);
if (NULL == buf) { if (NULL == buf) {
printf("calloc failed! size:%d\n", stbInfo->columns[i].dataLen); printf("calloc failed! size:%d\n", stbInfo->columns[i].dataLen);
...@@ -3981,15 +3977,24 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper ...@@ -3981,15 +3977,24 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
rand_string(buf, stbInfo->columns[i].dataLen); rand_string(buf, stbInfo->columns[i].dataLen);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "\'%s\', ", buf); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "\'%s\', ", buf);
tmfree(buf); tmfree(buf);
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "int", 3)) { } else if (0 == strncasecmp(stbInfo->columns[i].dataType,
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_int()); "int", 3)) {
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "bigint", 6)) { dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint()); "%d, ", rand_int());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "float", 5)) { } else if (0 == strncasecmp(stbInfo->columns[i].dataType,
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%f, ", rand_float()); "bigint", 6)) {
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "double", 6)) { dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%f, ", rand_double()); "%"PRId64", ", rand_bigint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "smallint", 8)) { } else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"float", 5)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%f, ", rand_float());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"double", 6)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%f, ", rand_double());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"smallint", 8)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_smallint()); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_smallint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType, "tinyint", 7)) { } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "tinyint", 7)) {
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_tinyint()); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_tinyint());
...@@ -4009,255 +4014,6 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper ...@@ -4009,255 +4014,6 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper
return dataLen; return dataLen;
} }
static void syncWriteForNumberOfTblInOneSql(
threadInfo *winfo, char* sampleDataBuf) {
SSuperTable* superTblInfo = winfo->superTblInfo;
int samplePos = 0;
//printf("========threadID[%d], table rang: %d - %d \n", winfo->threadID, winfo->start_table_id, winfo->end_table_id);
int64_t lastPrintTime = taosGetTimestampMs();
char* buffer = calloc(superTblInfo->maxSqlLen+1, 1);
if (NULL == buffer) {
printf("========calloc size[ %d ] fail!\n", superTblInfo->maxSqlLen);
return;
}
int32_t numberOfTblInOneSql = superTblInfo->numberOfTblInOneSql;
int32_t tbls = winfo->end_table_id - winfo->start_table_id + 1;
if (numberOfTblInOneSql > tbls) {
numberOfTblInOneSql = tbls;
}
uint64_t time_counter = winfo->start_time;
int sampleUsePos;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
int64_t st = 0;
int64_t et = 0xffffffff;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
for (int i = 0; i < insertRows;) {
int32_t tbl_id = 0;
for (int tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; ) {
int64_t start_time = 0;
int inserted = i;
for (int k = 0; k < g_args.num_of_RPR;) {
int len = 0;
memset(buffer, 0, superTblInfo->maxSqlLen);
char *pstr = buffer;
int32_t end_tbl_id = tableSeq + numberOfTblInOneSql;
if (end_tbl_id > winfo->end_table_id) {
end_tbl_id = winfo->end_table_id+1;
}
for (tbl_id = tableSeq ; tbl_id < end_tbl_id; tbl_id++) {
sampleUsePos = samplePos;
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo);
} else {
tagsValBuf = getTagValueFromTagSample(
superTblInfo, tbl_id % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
goto free_and_statistics;
}
if (0 == len) {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s%d using %s.%s tags %s values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id,
winfo->db_name,
superTblInfo->sTblName,
tagsValBuf);
} else {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
" %s.%s%d using %s.%s tags %s values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id,
winfo->db_name,
superTblInfo->sTblName,
tagsValBuf);
}
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
if (0 == len) {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s values ",
winfo->db_name,
superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN);
} else {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
" %s.%s values ",
winfo->db_name,
superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN);
}
} else { // pre-create child table
if (0 == len) {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s%d values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id);
} else {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
" %s.%s%d values ",
winfo->db_name,
superTblInfo->childTblPrefix,
tbl_id);
}
}
start_time = time_counter;
for (int j = 0; j < superTblInfo->rowsPerTbl;) {
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) {
retLen = getRowDataFromSample(pstr + len,
superTblInfo->maxSqlLen - len,
start_time += superTblInfo->timeStampStep,
superTblInfo,
&sampleUsePos);
if (retLen < 0) {
goto free_and_statistics;
}
} else if (0 == strncasecmp(
superTblInfo->dataSource, "rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) {
int64_t d = start_time - taosRandom() % superTblInfo->disorderRange;
retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len,
d,
superTblInfo);
} else {
retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len,
start_time += superTblInfo->timeStampStep,
superTblInfo);
}
if (retLen < 0) {
goto free_and_statistics;
}
}
len += retLen;
//inserted++;
j++;
winfo->totalInsertRows++;
if (inserted >= superTblInfo->insertRows ||
(superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) {
tableSeq = tbl_id + 1;
printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n",
superTblInfo->lenOfOneRow);
goto send_to_server;
}
}
}
tableSeq = tbl_id;
inserted += superTblInfo->rowsPerTbl;
send_to_server:
if (insert_interval) {
st = taosGetTimestampUs();
if (insert_interval > ((et - st)/1000)) {
int sleep_time = insert_interval - (et -st);
printf("sleep: %d ms insert interval\n", sleep_time);
taosMsleep(sleep_time); // ms
}
}
if (0 == strncasecmp(superTblInfo->insertMode,
"taosc",
strlen("taosc"))) {
//printf("multi table===== sql: %s \n\n", buffer);
//int64_t t1 = taosGetTimestampMs();
int64_t startTs;
int64_t endTs;
startTs = taosGetTimestampUs();
debugPrint("%s() LN%d buff: %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(
winfo->taos, buffer, INSERT_TYPE);
if (0 < affectedRows) {
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
if (delay > winfo->maxDelay) winfo->maxDelay = delay;
if (delay < winfo->minDelay) winfo->minDelay = delay;
winfo->cntDelay++;
winfo->totalDelay += delay;
winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
winfo->totalAffectedRows += affectedRows;
} else {
fprintf(stderr, "queryDbExec() buffer:\n%s\naffected rows is %d", buffer, affectedRows);
goto free_and_statistics;
}
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID,
winfo->totalInsertRows,
winfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
//int64_t t2 = taosGetTimestampMs();
//printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0);
} else {
//int64_t t1 = taosGetTimestampMs();
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
//int64_t t2 = taosGetTimestampMs();
//printf("http insert sql return, Spent %ld ms \n", t2 - t1);
if (0 != retCode) {
printf("========restful return fail, threadID[%d]\n", winfo->threadID);
goto free_and_statistics;
}
}
if (insert_interval) {
et = taosGetTimestampUs();
}
break;
}
if (tableSeq > winfo->end_table_id) {
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
samplePos = sampleUsePos;
}
i = inserted;
time_counter = start_time;
}
}
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
}
free_and_statistics:
tmfree(buffer);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n",
winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows);
return;
}
int32_t generateData(char *res, char **data_type, int32_t generateData(char *res, char **data_type,
int num_of_cols, int64_t timestamp, int lenOfBinary) { int num_of_cols, int64_t timestamp, int lenOfBinary) {
memset(res, 0, MAX_DATA_SIZE); memset(res, 0, MAX_DATA_SIZE);
...@@ -4319,26 +4075,23 @@ int32_t generateData(char *res, char **data_type, ...@@ -4319,26 +4075,23 @@ int32_t generateData(char *res, char **data_type,
static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
char* sampleDataBuf = NULL; char* sampleDataBuf = NULL;
// each thread read sample data from csv file sampleDataBuf = calloc(
if (0 == strncasecmp(superTblInfo->dataSource,
"sample",
strlen("sample"))) {
sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) { if (sampleDataBuf == NULL) {
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno)); strerror(errno));
return -1; return -1;
} }
superTblInfo->sampleDataBuf = sampleDataBuf; superTblInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(superTblInfo); int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
if (0 != ret) {
fprintf(stderr, "read sample from csv file failed.\n");
tmfree(sampleDataBuf); tmfree(sampleDataBuf);
superTblInfo->sampleDataBuf = NULL; superTblInfo->sampleDataBuf = NULL;
return -1; return -1;
}
} }
return 0; return 0;
...@@ -4400,7 +4153,8 @@ static int generateDataBuffer(int32_t tableSeq, ...@@ -4400,7 +4153,8 @@ static int generateDataBuffer(int32_t tableSeq,
return -1; return -1;
} }
if (superTblInfo && (superTblInfo->childTblOffset > 0)) { if (superTblInfo && (superTblInfo->childTblOffset >= 0)
&& (superTblInfo->childTblLimit > 0)) {
// select tbname from stb limit 1 offset tableSeq // select tbname from stb limit 1 offset tableSeq
getChildNameOfSuperTableWithLimitAndOffset(pThreadInfo->taos, getChildNameOfSuperTableWithLimitAndOffset(pThreadInfo->taos,
pThreadInfo->db_name, superTblInfo->sTblName, pThreadInfo->db_name, superTblInfo->sTblName,
...@@ -4467,7 +4221,7 @@ static int generateDataBuffer(int32_t tableSeq, ...@@ -4467,7 +4221,7 @@ static int generateDataBuffer(int32_t tableSeq,
verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
for (k = 0; k < g_args.num_of_RPR;) { for (k = 0; k < g_args.num_of_RPR;) {
if (superTblInfo) { if (superTblInfo) {
int retLen = 0; int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) { "sample", strlen("sample"))) {
...@@ -4488,27 +4242,26 @@ static int generateDataBuffer(int32_t tableSeq, ...@@ -4488,27 +4242,26 @@ static int generateDataBuffer(int32_t tableSeq,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
d, d,
superTblInfo); superTblInfo);
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d); } else {
} else { retLen = generateRowData(
retLen = generateRowData(
pstr + len, pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * startFrom, startTime + superTblInfo->timeStampStep * startFrom,
superTblInfo); superTblInfo);
} }
}
if (retLen < 0) { if (retLen < 0) {
free(pChildTblName); free(pChildTblName);
return -1; return -1;
} }
len += retLen; len += retLen;
if (len >= (superTblInfo->maxSqlLen - 256)) { // reserve for overwrite if (len >= (superTblInfo->maxSqlLen - 256)) { // reserve for overwrite
k++; k++;
break; break;
} }
}
} else { } else {
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
char data[MAX_DATA_SIZE]; char data[MAX_DATA_SIZE];
...@@ -4529,14 +4282,13 @@ static int generateDataBuffer(int32_t tableSeq, ...@@ -4529,14 +4282,13 @@ static int generateDataBuffer(int32_t tableSeq,
} }
pstr += sprintf(pstr, " %s", data); pstr += sprintf(pstr, " %s", data);
//assert(len + pstr - buffer < BUFFER_SIZE);
if (len + pstr - buffer >= (g_args.max_sql_len - 256)) { // too long if (len + pstr - buffer >= (g_args.max_sql_len - 256)) { // too long
k++; k++;
break; break;
} }
} }
verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%p\n", __func__, __LINE__, len, k, buffer); verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer);
k++; k++;
startFrom ++; startFrom ++;
...@@ -4571,21 +4323,7 @@ static void* syncWrite(void *sarg) { ...@@ -4571,21 +4323,7 @@ static void* syncWrite(void *sarg) {
strerror(errno)); strerror(errno));
return NULL; return NULL;
} }
if (superTblInfo) {
if (0 != prepareSampleDataForSTable(superTblInfo))
return NULL;
if (superTblInfo->numberOfTblInOneSql > 0) {
syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf);
tmfree(superTblInfo->sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
return NULL;
}
}
int samplePos = 0;
int64_t lastPrintTime = taosGetTimestampMs(); int64_t lastPrintTime = taosGetTimestampMs();
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
int64_t endTs; int64_t endTs;
...@@ -4598,7 +4336,7 @@ static void* syncWrite(void *sarg) { ...@@ -4598,7 +4336,7 @@ static void* syncWrite(void *sarg) {
winfo->totalInsertRows = 0; winfo->totalInsertRows = 0;
winfo->totalAffectedRows = 0; winfo->totalAffectedRows = 0;
int sampleUsePos; winfo->samplePos = 0;
for (uint32_t tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; for (uint32_t tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id;
tableSeq ++) { tableSeq ++) {
...@@ -4612,10 +4350,8 @@ static void* syncWrite(void *sarg) { ...@@ -4612,10 +4350,8 @@ static void* syncWrite(void *sarg) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
} }
sampleUsePos = samplePos;
int generated = generateDataBuffer(tableSeq, winfo, buffer, insertRows, int generated = generateDataBuffer(tableSeq, winfo, buffer, insertRows,
i, start_time, &sampleUsePos); i, start_time, &(winfo->samplePos));
if (generated > 0) if (generated > 0)
i += generated; i += generated;
else else
...@@ -4662,16 +4398,12 @@ static void* syncWrite(void *sarg) { ...@@ -4662,16 +4398,12 @@ static void* syncWrite(void *sarg) {
if ((tableSeq == winfo->end_table_id) && superTblInfo && if ((tableSeq == winfo->end_table_id) && superTblInfo &&
(0 == strncasecmp( (0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) { superTblInfo->dataSource, "sample", strlen("sample")))) {
samplePos = sampleUsePos; printf("%s() LN%d samplePos=%d\n", __func__, __LINE__, winfo->samplePos);
} }
} // tableSeq } // tableSeq
free_and_statistics_2: free_and_statistics_2:
tmfree(buffer); tmfree(buffer);
if (superTblInfo) {
tmfree(superTblInfo->sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
}
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID, winfo->threadID,
...@@ -4842,6 +4574,15 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -4842,6 +4574,15 @@ static void startMultiThreadInsertData(int threads, char* db_name,
else else
last = 0; last = 0;
// read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) {
fprintf(stderr, "prepare sample data for stable failed!\n");
exit(-1);
}
}
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
t_info->threadID = i; t_info->threadID = i;
...@@ -4877,6 +4618,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -4877,6 +4618,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
t_info->start_time = t_info->start_time + rand_int() % 10000 - rand_tinyint(); t_info->start_time = t_info->start_time + rand_int() % 10000 - rand_tinyint();
} }
tsem_init(&(t_info->lock_sem), 0, 0); tsem_init(&(t_info->lock_sem), 0, 0);
if (SYNC == g_Dbs.queryMode) { if (SYNC == g_Dbs.queryMode) {
pthread_create(pids + i, NULL, syncWrite, t_info); pthread_create(pids + i, NULL, syncWrite, t_info);
......
...@@ -249,6 +249,7 @@ python3 test.py -f tools/taosdemoTest.py ...@@ -249,6 +249,7 @@ python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdemoTest2.py python3 test.py -f tools/taosdemoTest2.py
python3 test.py -f tools/taosdemoTestWithoutMetric.py python3 test.py -f tools/taosdemoTestWithoutMetric.py
python3 test.py -f tools/taosdemoTestLimitOffset.py python3 test.py -f tools/taosdemoTestLimitOffset.py
python3 test.py -f tools/taosdemoTestSampleData.py
# subscribe # subscribe
python3 test.py -f subscribe/singlemeter.py python3 test.py -f subscribe/singlemeter.py
......
...@@ -82,6 +82,7 @@ python3 test.py -f tools/lowaTest.py ...@@ -82,6 +82,7 @@ python3 test.py -f tools/lowaTest.py
python3 test.py -f tools/taosdemoTest.py python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdemoTestWithoutMetric.py python3 test.py -f tools/taosdemoTestWithoutMetric.py
python3 test.py -f tools/taosdemoTestLimitOffset.py python3 test.py -f tools/taosdemoTestLimitOffset.py
python3 test.py -f tools/taosdemoTestSampleData.py
python3 test.py -f tools/taosdumpTest.py python3 test.py -f tools/taosdumpTest.py
#python3 test.py -f tools/taosdemoTest2.py #python3 test.py -f tools/taosdemoTest2.py
......
...@@ -44,7 +44,6 @@ ...@@ -44,7 +44,6 @@
"childtable_offset": 33, "childtable_offset": 33,
"multi_thread_write_one_tbl": "no", "multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0, "number_of_tbl_in_one_sql": 0,
"rows_per_tbl": 100,
"max_sql_len": 1024000, "max_sql_len": 1024000,
"disorder_ratio": 0, "disorder_ratio": 0,
"disorder_range": 1000, "disorder_range": 1000,
......
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 10,
"confirm_parameter_prompt": "no",
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes"
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"childtable_count": 20,
"childtable_limit": 10,
"childtable_offset": 0,
"childtable_prefix": "t_",
"auto_create_table": "no",
"data_source": "sample",
"insert_mode": "taosc",
"insert_rate": 0,
"insert_rows": 20,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1048000,
"timestamp_step": 1000,
"start_timestamp": "2020-1-1 00:00:00",
"sample_format": "csv",
"sample_file": "./tools/sampledata.csv",
"columns": [{"type": "INT"}],
"tags": [{"type": "INT", "count":1}]
}]
}]
}
...@@ -59,6 +59,15 @@ class TDTestCase: ...@@ -59,6 +59,15 @@ class TDTestCase:
tdSql.query("select count(*) from db.stb") tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 33000) tdSql.checkData(0, 0, 33000)
os.system("%staosdemo -f tools/insert-tblimit-tboffset0.json" % binPath)
tdSql.execute("reset query cache")
tdSql.execute("use db")
tdSql.query("select count(tbname) from db.stb")
tdSql.checkData(0, 0, 100)
tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 20000)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numberOfTables = 10000
self.numberOfRecords = 100
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def run(self):
tdSql.prepare()
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
os.system("%staosdemo -f tools/taosdemo-sampledata.json" % binPath)
tdSql.execute("use db")
tdSql.query("select count(tbname) from db.stb")
tdSql.checkData(0, 0, 20)
tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 200)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册