未验证 提交 5a42914c 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Merge pull request #5427 from taosdata/feature/sangshuduo/TD-3192-support-tblimit-tboffset

Feature/sangshuduo/td 3192 support tblimit tboffset
......@@ -120,7 +120,7 @@ TDengine是一个高效的存储、查询、分析时序大数据的平台,专
* [TDengine性能对比测试工具](https://www.taosdata.com/blog/2020/01/18/1166.html)
* [IDEA数据库管理工具可视化使用TDengine](https://www.taosdata.com/blog/2020/08/27/1767.html)
* [基于eletron开发的跨平台TDengine图形化管理工具](https://github.com/skye0207/TDengineGUI)
* [DataX,支持TDengine的离线数据采集/同步工具](https://github.com/wgzhao/DataX)(文档:[读取插件](https://github.com/wgzhao/DataX/blob/master/docs/src/main/sphinx/reader/tdenginereader.md)[写入插件](https://github.com/wgzhao/DataX/blob/master/docs/src/main/sphinx/writer/tdenginewriter.md)
* [DataX,支持TDengine的离线数据采集/同步工具](https://github.com/alibaba/DataX)
## TDengine与其他数据库的对比测试
......
......@@ -11,6 +11,7 @@
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": {
"name": "db",
......@@ -38,7 +39,9 @@
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 100000,
"childtable_limit": 33,
"childtable_offset": 33,
"insert_rows": 1000,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"rows_per_tbl": 100,
......
......@@ -98,6 +98,8 @@ extern char configDir[];
#define MAX_DATABASE_COUNT 256
#define INPUT_BUF_LEN 256
#define DEFAULT_TIMESTAMP_STEP 10
typedef enum CREATE_SUB_TALBE_MOD_EN {
PRE_CREATE_SUBTBL,
AUTO_CREATE_SUBTBL,
......@@ -196,6 +198,7 @@ typedef struct SArguments_S {
int num_of_threads;
int insert_interval;
int num_of_RPR;
int max_sql_len;
int num_of_tables;
int num_of_DPT;
int abort;
......@@ -222,6 +225,8 @@ typedef struct SSuperTable_S {
char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
char insertMode[MAX_TB_NAME_SIZE]; // taosc, restful
int childTblLimit;
int childTblOffset;
int multiThreadWriteOneTbl; // 0: no, 1: yes
int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl
......@@ -259,7 +264,7 @@ typedef struct SSuperTable_S {
int tagUsePos;
// statistics
int64_t totalRowsInserted;
int64_t totalInsertRows;
int64_t totalAffectedRows;
} SSuperTable;
......@@ -329,7 +334,7 @@ typedef struct SDbs_S {
SDataBase db[MAX_DB_COUNT];
// statistics
int64_t totalRowsInserted;
int64_t totalInsertRows;
int64_t totalAffectedRows;
} SDbs;
......@@ -400,7 +405,7 @@ typedef struct SThreadInfo_S {
int64_t lastTs;
// statistics
int64_t totalRowsInserted;
int64_t totalInsertRows;
int64_t totalAffectedRows;
// insert delay statistics
......@@ -514,6 +519,7 @@ SArguments g_args = {
10, // num_of_connections/thread
0, // insert_interval
100, // num_of_RPR
TSDB_PAYLOAD_SIZE, // max_sql_len
10000, // num_of_tables
10000, // num_of_DPT
0, // abort
......@@ -761,6 +767,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
}
printf("# Insertion interval: %d\n", arguments->insert_interval);
printf("# Number of records per req: %d\n", arguments->num_of_RPR);
printf("# Max SQL length: %d\n", arguments->max_sql_len);
printf("# Length of Binary: %d\n", arguments->len_of_binary);
printf("# Number of Threads: %d\n", arguments->num_of_threads);
printf("# Number of Tables: %d\n", arguments->num_of_tables);
printf("# Number of Data per Table: %d\n", arguments->num_of_DPT);
......@@ -774,6 +782,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
printf("# Delete method: %d\n", arguments->method_of_delete);
printf("# Answer yes when prompt: %d\n", arguments->answer_yes);
printf("# Print debug info: %d\n", arguments->debug_print);
printf("# Print verbose info: %d\n", arguments->verbose_print);
printf("###################################################################\n");
if (!arguments->answer_yes) {
printf("Press enter key to continue\n\n");
......@@ -1006,6 +1015,7 @@ static int printfInsertMeta() {
printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl);
printf("insert interval: \033[33m%d\033[0m\n", g_args.insert_interval);
printf("number of records per req: \033[33m%d\033[0m\n", g_args.num_of_RPR);
printf("max sql length: \033[33m%d\033[0m\n", g_args.max_sql_len);
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) {
......@@ -1085,37 +1095,55 @@ static int printfInsertMeta() {
} else {
printf(" childTblExists: \033[33m%s\033[0m\n", "error");
}
printf(" childTblCount: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].childTblCount);
printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource);
printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode);
printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows);
printf(" childTblCount: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].childTblCount);
printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource);
printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode);
if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) {
printf(" childTblLimit: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].childTblLimit);
}
if (g_Dbs.db[i].superTbls[j].childTblOffset > 0) {
printf(" childTblOffset: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].childTblOffset);
}
printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n");
printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n");
}else {
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n");
}
printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].numberOfTblInOneSql);
printf(" rowsPerTbl: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].rowsPerTbl);
printf(" disorderRange: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].disorderRange);
printf(" disorderRatio: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].disorderRatio);
printf(" maxSqlLen: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].maxSqlLen);
printf(" timeStampStep: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].timeStampStep);
printf(" startTimestamp: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].startTimestamp);
printf(" sampleFormat: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sampleFormat);
printf(" sampleFile: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sampleFile);
printf(" tagsFile: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].tagsFile);
printf(" columnCount: \033[33m%d\033[0m\n ", g_Dbs.db[i].superTbls[j].columnCount);
printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].numberOfTblInOneSql);
printf(" rowsPerTbl: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].rowsPerTbl);
printf(" disorderRange: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].disorderRange);
printf(" disorderRatio: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].disorderRatio);
printf(" maxSqlLen: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].maxSqlLen);
printf(" timeStampStep: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTbls[j].timeStampStep);
printf(" startTimestamp: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].startTimestamp);
printf(" sampleFormat: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].sampleFormat);
printf(" sampleFile: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].sampleFile);
printf(" tagsFile: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].tagsFile);
printf(" columnCount: \033[33m%d\033[0m\n ",
g_Dbs.db[i].superTbls[j].columnCount);
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "binary", 6))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "nchar", 5))) {
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType,
"binary", 6))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType,
"nchar", 5))) {
printf("column[\033[33m%d\033[0m]:\033[33m%s(%d)\033[0m ", k,
g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
g_Dbs.db[i].superTbls[j].columns[k].dataType,
g_Dbs.db[i].superTbls[j].columns[k].dataLen);
} else {
printf("column[%d]:\033[33m%s\033[0m ", k,
g_Dbs.db[i].superTbls[j].columns[k].dataType);
......@@ -1130,11 +1158,12 @@ static int printfInsertMeta() {
if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "binary", 6))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", 5))) {
printf("tag[%d]:\033[33m%s(%d)\033[0m ", k,
g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen);
g_Dbs.db[i].superTbls[j].tags[k].dataType,
g_Dbs.db[i].superTbls[j].tags[k].dataLen);
} else {
printf("tag[%d]:\033[33m%s\033[0m ", k,
g_Dbs.db[i].superTbls[j].tags[k].dataType);
}
}
}
printf("\n");
}
......@@ -1768,21 +1797,21 @@ int postProceSql(char* host, uint16_t port, char* sqlstr)
return 0;
}
char* getTagValueFromTagSample( SSuperTable* stbInfo, int tagUsePos) {
static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) {
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
if (NULL == dataBuf) {
printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1);
return NULL;
}
int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "(%s)", stbInfo->tagDataBuf + stbInfo->lenOfTagOfOneRow * tagUsePos);
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"(%s)", stbInfo->tagDataBuf + stbInfo->lenOfTagOfOneRow * tagUsePos);
return dataBuf;
}
char* generateTagVaulesForStb(SSuperTable* stbInfo) {
static char* generateTagVaulesForStb(SSuperTable* stbInfo) {
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
if (NULL == dataBuf) {
printf("calloc failed! size:%d\n", TSDB_MAX_SQL_LEN+1);
......@@ -1792,13 +1821,15 @@ char* generateTagVaulesForStb(SSuperTable* stbInfo) {
int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "(");
for (int i = 0; i < stbInfo->tagCount; i++) {
if ((0 == strncasecmp(stbInfo->tags[i].dataType, "binary", 6)) || (0 == strncasecmp(stbInfo->tags[i].dataType, "nchar", 5))) {
if ((0 == strncasecmp(stbInfo->tags[i].dataType, "binary", strlen("binary")))
|| (0 == strncasecmp(stbInfo->tags[i].dataType, "nchar", strlen("nchar")))) {
if (stbInfo->tags[i].dataLen > TSDB_MAX_BINARY_LEN) {
printf("binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN);
printf("binary or nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
tmfree(dataBuf);
return NULL;
}
char* buf = (char*)calloc(stbInfo->tags[i].dataLen+1, 1);
if (NULL == buf) {
printf("calloc failed! size:%d\n", stbInfo->tags[i].dataLen);
......@@ -1806,30 +1837,48 @@ char* generateTagVaulesForStb(SSuperTable* stbInfo) {
return NULL;
}
rand_string(buf, stbInfo->tags[i].dataLen);
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "\'%s\', ", buf);
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"\'%s\', ", buf);
tmfree(buf);
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "int", 3)) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_int());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "bigint", 6)) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%"PRId64", ", rand_bigint());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "float", 5)) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%f, ", rand_float());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "double", 6)) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%f, ", rand_double());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "smallint", 8)) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_smallint());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "tinyint", 7)) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_tinyint());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "bool", 4)) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%d, ", rand_bool());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType, "timestamp", 4)) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, "%"PRId64", ", rand_bigint());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"int", strlen("int"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%d, ", rand_int());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"bigint", strlen("bigint"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%"PRId64", ", rand_bigint());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"float", strlen("float"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%f, ", rand_float());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"double", strlen("double"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%f, ", rand_double());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"smallint", strlen("smallint"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%d, ", rand_smallint());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"tinyint", strlen("tinyint"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%d, ", rand_tinyint());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"bool", strlen("bool"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%d, ", rand_bool());
} else if (0 == strncasecmp(stbInfo->tags[i].dataType,
"timestamp", strlen("timestamp"))) {
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen,
"%"PRId64", ", rand_bigint());
} else {
printf("No support data type: %s\n", stbInfo->tags[i].dataType);
tmfree(dataBuf);
return NULL;
}
}
dataLen -= 2;
dataLen += snprintf(dataBuf + dataLen, TSDB_MAX_SQL_LEN - dataLen, ")");
return dataBuf;
......@@ -1874,7 +1923,7 @@ static int calcRowLen(SSuperTable* superTbls) {
int lenOfTagOfOneRow = 0;
for (tagIndex = 0; tagIndex < superTbls->tagCount; tagIndex++) {
char* dataType = superTbls->tags[tagIndex].dataType;
if (strcasecmp(dataType, "BINARY") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 3;
} else if (strcasecmp(dataType, "NCHAR") == 0) {
......@@ -1905,15 +1954,25 @@ static int calcRowLen(SSuperTable* superTbls) {
}
static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName, char** childTblNameOfSuperTbl, int* childTblCountOfSuperTbl) {
static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
char* dbName, char* sTblName, char** childTblNameOfSuperTbl,
int* childTblCountOfSuperTbl, int limit, int offset) {
char command[BUFFER_SIZE] = "\0";
char limitBuf[100] = "\0";
TAOS_RES * res;
TAOS_ROW row = NULL;
char* childTblName = *childTblNameOfSuperTbl;
if (offset >= 0) {
snprintf(limitBuf, 100, " limit %d offset %d", limit, offset);
}
//get all child table name use cmd: select tbname from superTblName;
snprintf(command, BUFFER_SIZE, "select tbname from %s.%s", dbName, sTblName);
snprintf(command, BUFFER_SIZE, "select tbname from %s.%s %s", dbName, sTblName, limitBuf);
res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
......@@ -1923,7 +1982,7 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName
exit(-1);
}
int childTblCount = 10000;
int childTblCount = (limit < 0)?10000:limit;
int count = 0;
childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
char* pTblName = childTblName;
......@@ -1937,7 +1996,8 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName
if (tmp != NULL) {
childTblName = tmp;
childTblCount = (int)(childTblCount*1.5);
memset(childTblName + count*TSDB_TABLE_NAME_LEN, 0, (size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN));
memset(childTblName + count*TSDB_TABLE_NAME_LEN, 0,
(size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN));
} else {
// exit, if allocate more memory failed
printf("realloc fail for save child table name of %s.%s\n", dbName, sTblName);
......@@ -1949,7 +2009,7 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName
}
pTblName = childTblName + count * TSDB_TABLE_NAME_LEN;
}
*childTblCountOfSuperTbl = count;
*childTblNameOfSuperTbl = childTblName;
......@@ -1957,19 +2017,29 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, char* sTblName
return 0;
}
static int getSuperTableFromServer(TAOS * taos, char* dbName, SSuperTable* superTbls) {
static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName,
char* sTblName, char** childTblNameOfSuperTbl,
int* childTblCountOfSuperTbl) {
return getChildNameOfSuperTableWithLimitAndOffset(taos, dbName, sTblName,
childTblNameOfSuperTbl, childTblCountOfSuperTbl,
-1, -1);
}
static int getSuperTableFromServer(TAOS * taos, char* dbName,
SSuperTable* superTbls) {
char command[BUFFER_SIZE] = "\0";
TAOS_RES * res;
TAOS_ROW row = NULL;
int count = 0;
//get schema use cmd: describe superTblName;
//get schema use cmd: describe superTblName;
snprintf(command, BUFFER_SIZE, "describe %s.%s", dbName, superTbls->sTblName);
res = taos_query(taos, command);
res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
printf("failed to run command %s\n", command);
taos_free_result(res);
taos_free_result(res);
return -1;
}
......@@ -1980,19 +2050,33 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, SSuperTable* supe
if (0 == count) {
count++;
continue;
}
}
if (strcmp((char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], "TAG") == 0) {
tstrncpy(superTbls->tags[tagIndex].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->tags[tagIndex].dataType, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes);
superTbls->tags[tagIndex].dataLen = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->tags[tagIndex].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
tstrncpy(superTbls->tags[tagIndex].field,
(char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->tags[tagIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes);
superTbls->tags[tagIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->tags[tagIndex].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
tagIndex++;
} else {
tstrncpy(superTbls->columns[columnIndex].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->columns[columnIndex].dataType, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes);
superTbls->columns[columnIndex].dataLen = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->columns[columnIndex].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
} else {
tstrncpy(superTbls->columns[columnIndex].field,
(char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->columns[columnIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes);
superTbls->columns[columnIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->columns[columnIndex].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
columnIndex++;
}
count++;
......@@ -2006,14 +2090,17 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, SSuperTable* supe
if (TBL_ALREADY_EXISTS == superTbls->childTblExists) {
//get all child table name use cmd: select tbname from superTblName;
getAllChildNameOfSuperTable(taos, dbName, superTbls->sTblName, &superTbls->childTblName, &superTbls->childTblCount);
getAllChildNameOfSuperTable(taos, dbName,
superTbls->sTblName,
&superTbls->childTblName,
&superTbls->childTblCount);
}
return 0;
}
static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, bool use_metric) {
static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, bool use_metric) {
char command[BUFFER_SIZE] = "\0";
char cols[STRING_LEN] = "\0";
int colIndex;
int len = 0;
......@@ -2021,7 +2108,7 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
int lenOfOneRow = 0;
for (colIndex = 0; colIndex < superTbls->columnCount; colIndex++) {
char* dataType = superTbls->columns[colIndex].dataType;
if (strcasecmp(dataType, "BINARY") == 0) {
len += snprintf(cols + len, STRING_LEN - len,
", col%d %s(%d)", colIndex, "BINARY",
......@@ -2050,10 +2137,10 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
} else if (strcasecmp(dataType, "FLOAT") == 0) {
len += snprintf(cols + len, STRING_LEN - len, ", col%d %s", colIndex, "FLOAT");
lenOfOneRow += 22;
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
len += snprintf(cols + len, STRING_LEN - len, ", col%d %s", colIndex, "DOUBLE");
lenOfOneRow += 42;
} else if (strcasecmp(dataType, "TIMESTAMP") == 0) {
} else if (strcasecmp(dataType, "TIMESTAMP") == 0) {
len += snprintf(cols + len, STRING_LEN - len, ", col%d %s", colIndex, "TIMESTAMP");
lenOfOneRow += 21;
} else {
......@@ -2085,33 +2172,42 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
len += snprintf(tags + len, STRING_LEN - len, "(");
for (tagIndex = 0; tagIndex < superTbls->tagCount; tagIndex++) {
char* dataType = superTbls->tags[tagIndex].dataType;
if (strcasecmp(dataType, "BINARY") == 0) {
len += snprintf(tags + len, STRING_LEN - len, "t%d %s(%d), ", tagIndex, "BINARY", superTbls->tags[tagIndex].dataLen);
len += snprintf(tags + len, STRING_LEN - len, "t%d %s(%d), ", tagIndex,
"BINARY", superTbls->tags[tagIndex].dataLen);
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 3;
} else if (strcasecmp(dataType, "NCHAR") == 0) {
len += snprintf(tags + len, STRING_LEN - len, "t%d %s(%d), ", tagIndex, "NCHAR", superTbls->tags[tagIndex].dataLen);
len += snprintf(tags + len, STRING_LEN - len, "t%d %s(%d), ", tagIndex,
"NCHAR", superTbls->tags[tagIndex].dataLen);
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 3;
} else if (strcasecmp(dataType, "INT") == 0) {
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "INT");
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex,
"INT");
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 11;
} else if (strcasecmp(dataType, "BIGINT") == 0) {
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "BIGINT");
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex,
"BIGINT");
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 21;
} else if (strcasecmp(dataType, "SMALLINT") == 0) {
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "SMALLINT");
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex,
"SMALLINT");
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 6;
} else if (strcasecmp(dataType, "TINYINT") == 0) {
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "TINYINT");
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex,
"TINYINT");
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 4;
} else if (strcasecmp(dataType, "BOOL") == 0) {
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "BOOL");
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex,
"BOOL");
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 6;
} else if (strcasecmp(dataType, "FLOAT") == 0) {
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "FLOAT");
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex,
"FLOAT");
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 22;
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex, "DOUBLE");
len += snprintf(tags + len, STRING_LEN - len, "t%d %s, ", tagIndex,
"DOUBLE");
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + 42;
} else {
taos_close(taos);
......@@ -2130,7 +2226,8 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
fprintf(stderr, "create supertable %s failed!\n\n", superTbls->sTblName);
fprintf(stderr, "create supertable %s failed!\n\n",
superTbls->sTblName);
return -1;
}
debugPrint("create supertable %s success!\n\n", superTbls->sTblName);
......@@ -2138,7 +2235,6 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
return 0;
}
static int createDatabases() {
TAOS * taos = NULL;
int ret = 0;
......@@ -2352,7 +2448,7 @@ static void* createTable(void *sarg)
}
int startMultiThreadCreateChildTable(
char* cols, int threads, int ntables,
char* cols, int threads, int startFrom, int ntables,
char* db_name, SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
threadInfo *infos = malloc(threads * sizeof(threadInfo));
......@@ -2375,7 +2471,7 @@ int startMultiThreadCreateChildTable(
int b = 0;
b = ntables % threads;
int last = 0;
int last = startFrom;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
......@@ -2423,7 +2519,7 @@ static void createChildTables() {
char tblColsBuf[MAX_SQL_SIZE];
int len;
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].superTblCount > 0) {
// with super table
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
......@@ -2434,12 +2530,17 @@ static void createChildTables() {
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
int startFrom = 0;
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
verbosePrint("%s() LN%d: create %d child tables from %d\n", __func__, __LINE__,
g_totalChildTables, startFrom);
startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
g_Dbs.threadCountByCreateTbl,
g_Dbs.db[i].superTbls[j].childTblCount,
startFrom,
g_totalChildTables,
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
}
} else {
// normal table
......@@ -2447,10 +2548,13 @@ static void createChildTables() {
int j = 0;
while (g_args.datatype[j]) {
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.datatype[j], "NCHAR", strlen("NCHAR")) == 0)) {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s(60)", j, g_args.datatype[j]);
|| (strncasecmp(g_args.datatype[j],
"NCHAR", strlen("NCHAR")) == 0)) {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE,
", COL%d %s(60)", j, g_args.datatype[j]);
} else {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s", j, g_args.datatype[j]);
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE,
", COL%d %s", j, g_args.datatype[j]);
}
len = strlen(tblColsBuf);
j++;
......@@ -2458,11 +2562,13 @@ static void createChildTables() {
len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")");
verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__,
verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n",
__func__, __LINE__,
g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf);
startMultiThreadCreateChildTable(
tblColsBuf,
g_Dbs.threadCountByCreateTbl,
0,
g_args.num_of_tables,
g_Dbs.db[i].dbName,
NULL);
......@@ -2470,30 +2576,6 @@ static void createChildTables() {
}
}
/*
static int taosGetLineNum(const char *fileName)
{
int lineNum = 0;
char cmd[1024] = { 0 };
char buf[1024] = { 0 };
sprintf(cmd, "wc -l %s", fileName);
FILE *fp = popen(cmd, "r");
if (fp == NULL) {
fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno));
return lineNum;
}
if (fgets(buf, sizeof(buf), fp)) {
int index = strchr((const char*)buf, ' ') - buf;
buf[index] = '\0';
lineNum = atoi(buf);
}
pclose(fp);
return lineNum;
}
*/
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
......@@ -2571,19 +2653,29 @@ int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) {
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sampleBuf) {
static int readSampleFromCsvFileToMem(
SSuperTable* superTblInfo) {
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
int getRows = 0;
FILE* fp = fopen(superTblInfo->sampleFile, "r");
if (fp == NULL) {
fprintf(stderr, "Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
return -1;
}
memset(sampleBuf, 0, MAX_SAMPLES_ONCE_FROM_FILE* superTblInfo->lenOfOneRow);
memset(superTblInfo->sampleDataBuf, 0,
MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow);
while (1) {
readLen = tgetline(&line, &n, fp);
if (-1 == readLen) {
if(0 != fseek(fp, 0, SEEK_SET)) {
printf("Failed to fseek file: %s, reason:%s\n",
fprintf(stderr, "Failed to fseek file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
fclose(fp);
return -1;
}
continue;
......@@ -2603,7 +2695,8 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample
continue;
}
memcpy(sampleBuf + getRows * superTblInfo->lenOfOneRow, line, readLen);
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
line, readLen);
getRows++;
if (getRows == MAX_SAMPLES_ONCE_FROM_FILE) {
......@@ -2611,6 +2704,7 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample
}
}
fclose(fp);
tmfree(line);
return 0;
}
......@@ -2635,7 +2729,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
// columns
cJSON *columns = cJSON_GetObjectItem(stbInfo, "columns");
if (columns && columns->type != cJSON_Array) {
printf("failed to read json, columns not found\n");
printf("ERROR: failed to read json, columns not found\n");
goto PARSE_OVER;
} else if (NULL == columns) {
superTbls->columnCount = 0;
......@@ -2645,7 +2739,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
int columnSize = cJSON_GetArraySize(columns);
if (columnSize > MAX_COLUMN_COUNT) {
printf("failed to read json, column size overflow, max column size is %d\n",
printf("ERROR: failed to read json, column size overflow, max column size is %d\n",
MAX_COLUMN_COUNT);
goto PARSE_OVER;
}
......@@ -2664,7 +2758,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
if (countObj && countObj->type == cJSON_Number) {
count = countObj->valueint;
} else if (countObj && countObj->type != cJSON_Number) {
printf("failed to read json, column count not found");
printf("ERROR: failed to read json, column count not found\n");
goto PARSE_OVER;
} else {
count = 1;
......@@ -2674,7 +2768,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(column, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) {
printf("failed to read json, column type not found");
printf("ERROR: failed to read json, column type not found\n");
goto PARSE_OVER;
}
//tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
......@@ -2684,7 +2778,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) {
printf("failed to read json, column len not found");
printf("ERROR: failed to read json, column len not found\n");
goto PARSE_OVER;
} else {
columnCase.dataLen = 8;
......@@ -2703,13 +2797,13 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
// tags
cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags");
if (!tags || tags->type != cJSON_Array) {
printf("failed to read json, tags not found");
printf("ERROR: failed to read json, tags not found\n");
goto PARSE_OVER;
}
int tagSize = cJSON_GetArraySize(tags);
if (tagSize > MAX_TAG_COUNT) {
printf("failed to read json, tags size overflow, max tag size is %d\n", MAX_TAG_COUNT);
printf("ERROR: failed to read json, tags size overflow, max tag size is %d\n", MAX_TAG_COUNT);
goto PARSE_OVER;
}
......@@ -2723,7 +2817,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
if (countObj && countObj->type == cJSON_Number) {
count = countObj->valueint;
} else if (countObj && countObj->type != cJSON_Number) {
printf("failed to read json, column count not found");
printf("ERROR: failed to read json, column count not found\n");
goto PARSE_OVER;
} else {
count = 1;
......@@ -2733,7 +2827,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(tag, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) {
printf("failed to read json, tag type not found");
printf("ERROR: failed to read json, tag type not found\n");
goto PARSE_OVER;
}
tstrncpy(columnCase.dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
......@@ -2742,7 +2836,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(cJSON* stbInfo, SSuperTable* s
if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) {
printf("failed to read json, column len not found");
printf("ERROR: failed to read json, column len not found\n");
goto PARSE_OVER;
} else {
columnCase.dataLen = 0;
......@@ -2779,7 +2873,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!host) {
tstrncpy(g_Dbs.host, "127.0.0.1", MAX_DB_NAME_SIZE);
} else {
printf("failed to read json, host not found\n");
printf("ERROR: failed to read json, host not found\n");
goto PARSE_OVER;
}
......@@ -2817,17 +2911,17 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!threads) {
g_Dbs.threadCount = 1;
} else {
printf("failed to read json, threads not found");
printf("ERROR: failed to read json, threads not found\n");
goto PARSE_OVER;
}
cJSON* threads2 = cJSON_GetObjectItem(root, "thread_count_create_tbl");
if (threads2 && threads2->type == cJSON_Number) {
g_Dbs.threadCountByCreateTbl = threads2->valueint;
} else if (!threads2) {
g_Dbs.threadCountByCreateTbl = 1;
} else {
printf("failed to read json, threads2 not found");
printf("ERROR: failed to read json, threads2 not found\n");
goto PARSE_OVER;
}
......@@ -2837,17 +2931,28 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!gInsertInterval) {
g_args.insert_interval = 0;
} else {
printf("failed to read json, insert_interval input mistake");
fprintf(stderr, "ERROR: failed to read json, insert_interval input mistake\n");
goto PARSE_OVER;
}
cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len");
if (maxSqlLen && maxSqlLen->type == cJSON_Number) {
g_args.max_sql_len = maxSqlLen->valueint;
} else if (!maxSqlLen) {
g_args.max_sql_len = TSDB_PAYLOAD_SIZE;
} else {
fprintf(stderr, "ERROR: failed to read json, max_sql_len input mistake\n");
goto PARSE_OVER;
}
cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req");
if (numRecPerReq && numRecPerReq->type == cJSON_Number) {
g_args.num_of_RPR = numRecPerReq->valueint;
} else if (!numRecPerReq) {
g_args.num_of_RPR = 100;
} else {
printf("failed to read json, num_of_records_per_req not found");
printf("ERROR: failed to read json, num_of_records_per_req not found\n");
goto PARSE_OVER;
}
......@@ -2865,19 +2970,19 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!answerPrompt) {
g_args.answer_yes = false;
} else {
printf("failed to read json, confirm_parameter_prompt not found");
printf("ERROR: failed to read json, confirm_parameter_prompt not found\n");
goto PARSE_OVER;
}
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (!dbs || dbs->type != cJSON_Array) {
printf("failed to read json, databases not found\n");
printf("ERROR: failed to read json, databases not found\n");
goto PARSE_OVER;
}
int dbSize = cJSON_GetArraySize(dbs);
if (dbSize > MAX_DB_COUNT) {
printf("failed to read json, databases size overflow, max database is %d\n", MAX_DB_COUNT);
printf("ERROR: failed to read json, databases size overflow, max database is %d\n", MAX_DB_COUNT);
goto PARSE_OVER;
}
......@@ -2889,13 +2994,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
// dbinfo
cJSON *dbinfo = cJSON_GetObjectItem(dbinfos, "dbinfo");
if (!dbinfo || dbinfo->type != cJSON_Object) {
printf("failed to read json, dbinfo not found");
printf("ERROR: failed to read json, dbinfo not found\n");
goto PARSE_OVER;
}
cJSON *dbName = cJSON_GetObjectItem(dbinfo, "name");
if (!dbName || dbName->type != cJSON_String || dbName->valuestring == NULL) {
printf("failed to read json, db name not found");
printf("ERROR: failed to read json, db name not found\n");
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].dbName, dbName->valuestring, MAX_DB_NAME_SIZE);
......@@ -2910,7 +3015,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!drop) {
g_Dbs.db[i].drop = 0;
} else {
printf("failed to read json, drop not found");
printf("ERROR: failed to read json, drop not found\n");
goto PARSE_OVER;
}
......@@ -2921,7 +3026,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
//tstrncpy(g_Dbs.db[i].dbCfg.precision, "ms", MAX_DB_NAME_SIZE);
memset(g_Dbs.db[i].dbCfg.precision, 0, MAX_DB_NAME_SIZE);
} else {
printf("failed to read json, precision not found");
printf("ERROR: failed to read json, precision not found\n");
goto PARSE_OVER;
}
......@@ -2931,7 +3036,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!update) {
g_Dbs.db[i].dbCfg.update = -1;
} else {
printf("failed to read json, update not found");
printf("ERROR: failed to read json, update not found\n");
goto PARSE_OVER;
}
......@@ -2941,7 +3046,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!replica) {
g_Dbs.db[i].dbCfg.replica = -1;
} else {
printf("failed to read json, replica not found");
printf("ERROR: failed to read json, replica not found\n");
goto PARSE_OVER;
}
......@@ -2951,7 +3056,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!keep) {
g_Dbs.db[i].dbCfg.keep = -1;
} else {
printf("failed to read json, keep not found");
printf("ERROR: failed to read json, keep not found\n");
goto PARSE_OVER;
}
......@@ -2961,7 +3066,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!days) {
g_Dbs.db[i].dbCfg.days = -1;
} else {
printf("failed to read json, days not found");
printf("ERROR: failed to read json, days not found\n");
goto PARSE_OVER;
}
......@@ -2971,7 +3076,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!cache) {
g_Dbs.db[i].dbCfg.cache = -1;
} else {
printf("failed to read json, cache not found");
printf("ERROR: failed to read json, cache not found\n");
goto PARSE_OVER;
}
......@@ -2981,7 +3086,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!blocks) {
g_Dbs.db[i].dbCfg.blocks = -1;
} else {
printf("failed to read json, block not found");
printf("ERROR: failed to read json, block not found\n");
goto PARSE_OVER;
}
......@@ -3001,7 +3106,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!minRows) {
g_Dbs.db[i].dbCfg.minRows = -1;
} else {
printf("failed to read json, minRows not found");
printf("ERROR: failed to read json, minRows not found\n");
goto PARSE_OVER;
}
......@@ -3011,7 +3116,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!maxRows) {
g_Dbs.db[i].dbCfg.maxRows = -1;
} else {
printf("failed to read json, maxRows not found");
printf("ERROR: failed to read json, maxRows not found\n");
goto PARSE_OVER;
}
......@@ -3021,7 +3126,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!comp) {
g_Dbs.db[i].dbCfg.comp = -1;
} else {
printf("failed to read json, comp not found");
printf("ERROR: failed to read json, comp not found\n");
goto PARSE_OVER;
}
......@@ -3031,7 +3136,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!walLevel) {
g_Dbs.db[i].dbCfg.walLevel = -1;
} else {
printf("failed to read json, walLevel not found");
printf("ERROR: failed to read json, walLevel not found\n");
goto PARSE_OVER;
}
......@@ -3041,7 +3146,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!cacheLast) {
g_Dbs.db[i].dbCfg.cacheLast = -1;
} else {
printf("failed to read json, cacheLast not found");
printf("ERROR: failed to read json, cacheLast not found\n");
goto PARSE_OVER;
}
......@@ -3061,20 +3166,20 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!fsync) {
g_Dbs.db[i].dbCfg.fsync = -1;
} else {
printf("failed to read json, fsync not found");
printf("ERROR: failed to read json, fsync not found\n");
goto PARSE_OVER;
}
// super_talbes
cJSON *stables = cJSON_GetObjectItem(dbinfos, "super_tables");
if (!stables || stables->type != cJSON_Array) {
printf("failed to read json, super_tables not found");
printf("ERROR: failed to read json, super_tables not found\n");
goto PARSE_OVER;
}
int stbSize = cJSON_GetArraySize(stables);
if (stbSize > MAX_SUPER_TABLE_COUNT) {
printf("failed to read json, databases size overflow, max database is %d\n", MAX_SUPER_TABLE_COUNT);
printf("ERROR: failed to read json, databases size overflow, max database is %d\n", MAX_SUPER_TABLE_COUNT);
goto PARSE_OVER;
}
......@@ -3086,14 +3191,14 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
// dbinfo
cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name");
if (!stbName || stbName->type != cJSON_String || stbName->valuestring == NULL) {
printf("failed to read json, stb name not found");
printf("ERROR: failed to read json, stb name not found\n");
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring, MAX_TB_NAME_SIZE);
cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix");
if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) {
printf("failed to read json, childtable_prefix not found");
printf("ERROR: failed to read json, childtable_prefix not found\n");
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, MAX_DB_NAME_SIZE);
......@@ -3112,7 +3217,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!autoCreateTbl) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} else {
printf("failed to read json, auto_create_table not found");
printf("ERROR: failed to read json, auto_create_table not found\n");
goto PARSE_OVER;
}
......@@ -3122,7 +3227,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!batchCreateTbl) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = 1000;
} else {
printf("failed to read json, batch_create_tbl_num not found");
printf("ERROR: failed to read json, batch_create_tbl_num not found\n");
goto PARSE_OVER;
}
......@@ -3140,13 +3245,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!childTblExists) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
} else {
printf("failed to read json, child_table_exists not found");
printf("ERROR: failed to read json, child_table_exists not found\n");
goto PARSE_OVER;
}
cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count");
if (!count || count->type != cJSON_Number || 0 >= count->valueint) {
printf("failed to read json, childtable_count not found");
printf("ERROR: failed to read json, childtable_count not found\n");
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblCount = count->valueint;
......@@ -3159,7 +3264,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!dataSource) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE);
} else {
printf("failed to read json, data_source not found");
printf("ERROR: failed to read json, data_source not found\n");
goto PARSE_OVER;
}
......@@ -3171,27 +3276,49 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!insertMode) {
tstrncpy(g_Dbs.db[i].superTbls[j].insertMode, "taosc", MAX_DB_NAME_SIZE);
} else {
printf("failed to read json, insert_mode not found");
printf("ERROR: failed to read json, insert_mode not found\n");
goto PARSE_OVER;
}
cJSON* childTbl_limit = cJSON_GetObjectItem(stbInfo, "childtable_limit");
if (childTbl_limit) {
if (childTbl_limit->type != cJSON_Number) {
printf("ERROR: failed to read json, childtable_limit\n");
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblLimit = childTbl_limit->valueint;
} else {
g_Dbs.db[i].superTbls[j].childTblLimit = -1; // select ... limit -1 means all query result
}
cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset");
if (childTbl_offset) {
if (childTbl_offset->type != cJSON_Number || 0 > childTbl_offset->valueint) {
printf("ERROR: failed to read json, childtable_offset\n");
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].childTblOffset = childTbl_offset->valueint;
} else {
g_Dbs.db[i].superTbls[j].childTblOffset = 0;
}
cJSON *ts = cJSON_GetObjectItem(stbInfo, "start_timestamp");
if (ts && ts->type == cJSON_String && ts->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, ts->valuestring, MAX_DB_NAME_SIZE);
} else if (!ts) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp, "now", MAX_DB_NAME_SIZE);
} else {
printf("failed to read json, start_timestamp not found");
printf("ERROR: failed to read json, start_timestamp not found\n");
goto PARSE_OVER;
}
cJSON* timestampStep = cJSON_GetObjectItem(stbInfo, "timestamp_step");
if (timestampStep && timestampStep->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].timeStampStep = timestampStep->valueint;
} else if (!timestampStep) {
g_Dbs.db[i].superTbls[j].timeStampStep = 1000;
g_Dbs.db[i].superTbls[j].timeStampStep = DEFAULT_TIMESTAMP_STEP;
} else {
printf("failed to read json, timestamp_step not found");
printf("ERROR: failed to read json, timestamp_step not found\n");
goto PARSE_OVER;
}
......@@ -3204,10 +3331,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!sampleDataBufSize) {
g_Dbs.db[i].superTbls[j].sampleDataBufSize = 1024*1024 + 1024;
} else {
printf("failed to read json, sample_buf_size not found");
printf("ERROR: failed to read json, sample_buf_size not found\n");
goto PARSE_OVER;
}
cJSON *sampleFormat = cJSON_GetObjectItem(stbInfo, "sample_format");
if (sampleFormat && sampleFormat->type == cJSON_String && sampleFormat->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat,
......@@ -3215,10 +3342,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!sampleFormat) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, "csv", MAX_DB_NAME_SIZE);
} else {
printf("failed to read json, sample_format not found");
printf("ERROR: failed to read json, sample_format not found\n");
goto PARSE_OVER;
}
cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file");
if (sampleFile && sampleFile->type == cJSON_String && sampleFile->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile,
......@@ -3226,10 +3353,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!sampleFile) {
memset(g_Dbs.db[i].superTbls[j].sampleFile, 0, MAX_FILE_NAME_LEN);
} else {
printf("failed to read json, sample_file not found");
printf("ERROR: failed to read json, sample_file not found\n");
goto PARSE_OVER;
}
cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file");
if (tagsFile && tagsFile->type == cJSON_String && tagsFile->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile,
......@@ -3243,10 +3370,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
memset(g_Dbs.db[i].superTbls[j].tagsFile, 0, MAX_FILE_NAME_LEN);
g_Dbs.db[i].superTbls[j].tagSource = 0;
} else {
printf("failed to read json, tags_file not found");
printf("ERROR: failed to read json, tags_file not found\n");
goto PARSE_OVER;
}
cJSON* maxSqlLen = cJSON_GetObjectItem(stbInfo, "max_sql_len");
if (maxSqlLen && maxSqlLen->type == cJSON_Number) {
int32_t len = maxSqlLen->valueint;
......@@ -3259,7 +3386,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!maxSqlLen) {
g_Dbs.db[i].superTbls[j].maxSqlLen = TSDB_MAX_SQL_LEN;
} else {
printf("failed to read json, maxSqlLen not found");
printf("ERROR: failed to read json, maxSqlLen not found\n");
goto PARSE_OVER;
}
......@@ -3276,7 +3403,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!multiThreadWriteOneTbl) {
g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl = 0;
} else {
printf("failed to read json, multiThreadWriteOneTbl not found");
printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n");
goto PARSE_OVER;
}
......@@ -3286,7 +3413,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!numberOfTblInOneSql) {
g_Dbs.db[i].superTbls[j].numberOfTblInOneSql = 0;
} else {
printf("failed to read json, numberOfTblInOneSql not found");
printf("ERROR: failed to read json, numberOfTblInOneSql not found\n");
goto PARSE_OVER;
}
......@@ -3296,7 +3423,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!rowsPerTbl) {
g_Dbs.db[i].superTbls[j].rowsPerTbl = 1;
} else {
printf("failed to read json, rowsPerTbl not found");
printf("ERROR: failed to read json, rowsPerTbl not found\n");
goto PARSE_OVER;
}
......@@ -3306,7 +3433,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!disorderRatio) {
g_Dbs.db[i].superTbls[j].disorderRatio = 0;
} else {
printf("failed to read json, disorderRatio not found");
printf("ERROR: failed to read json, disorderRatio not found\n");
goto PARSE_OVER;
}
......@@ -3316,7 +3443,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!disorderRange) {
g_Dbs.db[i].superTbls[j].disorderRange = 1000;
} else {
printf("failed to read json, disorderRange not found");
printf("ERROR: failed to read json, disorderRange not found\n");
goto PARSE_OVER;
}
......@@ -3326,7 +3453,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else {
printf("failed to read json, insert_rows input mistake");
fprintf(stderr, "failed to read json, insert_rows input mistake");
goto PARSE_OVER;
}
......@@ -3338,7 +3465,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
__func__, __LINE__, g_args.insert_interval);
g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval;
} else {
printf("failed to read json, insert_interval input mistake");
fprintf(stderr, "failed to read json, insert_interval input mistake");
goto PARSE_OVER;
}
......@@ -3377,7 +3504,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (!host) {
tstrncpy(g_queryInfo.host, "127.0.0.1", MAX_DB_NAME_SIZE);
} else {
printf("failed to read json, host not found\n");
printf("ERROR: failed to read json, host not found\n");
goto PARSE_OVER;
}
......@@ -3415,7 +3542,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (!answerPrompt) {
g_args.answer_yes = false;
} else {
printf("failed to read json, confirm_parameter_prompt not found");
printf("ERROR: failed to read json, confirm_parameter_prompt not found\n");
goto PARSE_OVER;
}
......@@ -3423,7 +3550,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (dbs && dbs->type == cJSON_String && dbs->valuestring != NULL) {
tstrncpy(g_queryInfo.dbName, dbs->valuestring, MAX_DB_NAME_SIZE);
} else if (!dbs) {
printf("failed to read json, databases not found\n");
printf("ERROR: failed to read json, databases not found\n");
goto PARSE_OVER;
}
......@@ -3433,7 +3560,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (!queryMode) {
tstrncpy(g_queryInfo.queryMode, "taosc", MAX_TB_NAME_SIZE);
} else {
printf("failed to read json, query_mode not found\n");
printf("ERROR: failed to read json, query_mode not found\n");
goto PARSE_OVER;
}
......@@ -3443,7 +3570,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.superQueryInfo.concurrent = 0;
g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (superQuery->type != cJSON_Object) {
printf("failed to read json, super_table_query not found");
printf("ERROR: failed to read json, super_table_query not found\n");
goto PARSE_OVER;
} else {
cJSON* rate = cJSON_GetObjectItem(superQuery, "query_interval");
......@@ -3467,7 +3594,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (0 == strcmp("async", mode->valuestring)) {
g_queryInfo.superQueryInfo.subscribeMode = 1;
} else {
printf("failed to read json, subscribe mod error\n");
printf("ERROR: failed to read json, subscribe mod error\n");
goto PARSE_OVER;
}
} else {
......@@ -3490,7 +3617,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (0 == strcmp("no", restart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 0;
} else {
printf("failed to read json, subscribe restart error\n");
printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER;
}
} else {
......@@ -3506,7 +3633,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (0 == strcmp("no", keepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} else {
printf("failed to read json, subscribe keepProgress error\n");
printf("ERROR: failed to read json, subscribe keepProgress error\n");
goto PARSE_OVER;
}
} else {
......@@ -3518,15 +3645,15 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (!superSqls) {
g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (superSqls->type != cJSON_Array) {
printf("failed to read json, super sqls not found\n");
printf("ERROR: failed to read json, super sqls not found\n");
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
printf("failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT);
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
g_queryInfo.superQueryInfo.sqlCount = superSqlSize;
for (int j = 0; j < superSqlSize; ++j) {
cJSON* sql = cJSON_GetArrayItem(superSqls, j);
......@@ -3534,7 +3661,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) {
printf("failed to read json, sql not found\n");
printf("ERROR: failed to read json, sql not found\n");
goto PARSE_OVER;
}
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
......@@ -3545,20 +3672,20 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (NULL == result) {
memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
} else {
printf("failed to read json, super query result file not found\n");
printf("ERROR: failed to read json, super query result file not found\n");
goto PARSE_OVER;
}
}
}
}
// sub_table_query
cJSON *subQuery = cJSON_GetObjectItem(root, "super_table_query");
if (!subQuery) {
g_queryInfo.subQueryInfo.threadCnt = 0;
g_queryInfo.subQueryInfo.sqlCount = 0;
} else if (subQuery->type != cJSON_Object) {
printf("failed to read json, sub_table_query not found");
printf("ERROR: failed to read json, sub_table_query not found\n");
ret = true;
goto PARSE_OVER;
} else {
......@@ -3568,29 +3695,29 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (!subrate) {
g_queryInfo.subQueryInfo.rate = 0;
}
cJSON* threads = cJSON_GetObjectItem(subQuery, "threads");
if (threads && threads->type == cJSON_Number) {
g_queryInfo.subQueryInfo.threadCnt = threads->valueint;
} else if (!threads) {
g_queryInfo.subQueryInfo.threadCnt = 1;
}
//cJSON* subTblCnt = cJSON_GetObjectItem(subQuery, "childtable_count");
//if (subTblCnt && subTblCnt->type == cJSON_Number) {
// g_queryInfo.subQueryInfo.childTblCount = subTblCnt->valueint;
//} else if (!subTblCnt) {
// g_queryInfo.subQueryInfo.childTblCount = 0;
//}
cJSON* stblname = cJSON_GetObjectItem(subQuery, "stblname");
if (stblname && stblname->type == cJSON_String && stblname->valuestring != NULL) {
tstrncpy(g_queryInfo.subQueryInfo.sTblName, stblname->valuestring, MAX_TB_NAME_SIZE);
} else {
printf("failed to read json, super table name not found\n");
printf("ERROR: failed to read json, super table name not found\n");
goto PARSE_OVER;
}
cJSON* submode = cJSON_GetObjectItem(subQuery, "mode");
if (submode && submode->type == cJSON_String && submode->valuestring != NULL) {
if (0 == strcmp("sync", submode->valuestring)) {
......@@ -3598,13 +3725,13 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (0 == strcmp("async", submode->valuestring)) {
g_queryInfo.subQueryInfo.subscribeMode = 1;
} else {
printf("failed to read json, subscribe mod error\n");
printf("ERROR: failed to read json, subscribe mod error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.subQueryInfo.subscribeMode = 0;
}
cJSON* subinterval = cJSON_GetObjectItem(subQuery, "interval");
if (subinterval && subinterval->type == cJSON_Number) {
g_queryInfo.subQueryInfo.subscribeInterval = subinterval->valueint;
......@@ -3621,7 +3748,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (0 == strcmp("no", subrestart->valuestring)) {
g_queryInfo.subQueryInfo.subscribeRestart = 0;
} else {
printf("failed to read json, subscribe restart error\n");
printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER;
}
} else {
......@@ -3635,7 +3762,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (0 == strcmp("no", subkeepProgress->valuestring)) {
g_queryInfo.subQueryInfo.subscribeKeepProgress = 0;
} else {
printf("failed to read json, subscribe keepProgress error\n");
printf("ERROR: failed to read json, subscribe keepProgress error\n");
goto PARSE_OVER;
}
} else {
......@@ -3647,12 +3774,12 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (!subsqls) {
g_queryInfo.subQueryInfo.sqlCount = 0;
} else if (subsqls->type != cJSON_Array) {
printf("failed to read json, super sqls not found\n");
printf("ERROR: failed to read json, super sqls not found\n");
goto PARSE_OVER;
} else {
int superSqlSize = cJSON_GetArraySize(subsqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) {
printf("failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT);
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT);
goto PARSE_OVER;
}
......@@ -3663,7 +3790,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) {
printf("failed to read json, sql not found\n");
printf("ERROR: failed to read json, sql not found\n");
goto PARSE_OVER;
}
tstrncpy(g_queryInfo.subQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
......@@ -3674,7 +3801,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (NULL == result) {
memset(g_queryInfo.subQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
} else {
printf("failed to read json, sub query result file not found\n");
printf("ERROR: failed to read json, sub query result file not found\n");
goto PARSE_OVER;
}
}
......@@ -3713,7 +3840,7 @@ static bool getInfoFromJsonFile(char* file) {
content[len] = 0;
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
printf("failed to cjson parse %s, invalid json format", file);
printf("ERROR: failed to cjson parse %s, invalid json format\n", file);
goto PARSE_OVER;
}
......@@ -3726,13 +3853,13 @@ static bool getInfoFromJsonFile(char* file) {
} else if (0 == strcasecmp("subscribe", filetype->valuestring)) {
g_args.test_mode = SUBSCRIBE_MODE;
} else {
printf("failed to read json, filetype not support\n");
printf("ERROR: failed to read json, filetype not support\n");
goto PARSE_OVER;
}
} else if (!filetype) {
g_args.test_mode = INSERT_MODE;
} else {
printf("failed to read json, filetype not found\n");
printf("ERROR: failed to read json, filetype not found\n");
goto PARSE_OVER;
}
......@@ -3743,7 +3870,7 @@ static bool getInfoFromJsonFile(char* file) {
} else if (SUBSCRIBE_MODE == g_args.test_mode) {
ret = getMetaFromQueryJsonFile(root);
} else {
printf("input json file type error! please input correct file type: insert or query or subscribe\n");
printf("ERROR: input json file type error! please input correct file type: insert or query or subscribe\n");
goto PARSE_OVER;
}
......@@ -3754,7 +3881,7 @@ PARSE_OVER:
return ret;
}
void prePareSampleData() {
void prepareSampleData() {
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
//if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) {
......@@ -3792,22 +3919,26 @@ void postFreeResource() {
}
}
int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* superTblInfo, int* sampleUsePos, FILE *fp, char* sampleBuf) {
static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp,
SSuperTable* superTblInfo, int* sampleUsePos) {
if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
int ret = readSampleFromCsvFileToMem(fp, superTblInfo, sampleBuf);
int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
tmfree(superTblInfo->sampleDataBuf);
return -1;
}
*sampleUsePos = 0;
}
int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%s", sampleBuf + superTblInfo->lenOfOneRow * (*sampleUsePos));
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"(%" PRId64 ", ", timestamp);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%s", superTblInfo->sampleDataBuf + superTblInfo->lenOfOneRow * (*sampleUsePos));
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
(*sampleUsePos)++;
return dataLen;
}
......@@ -3858,14 +3989,12 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable*
}
static void syncWriteForNumberOfTblInOneSql(
threadInfo *winfo, FILE *fp, char* sampleDataBuf) {
threadInfo *winfo, char* sampleDataBuf) {
SSuperTable* superTblInfo = winfo->superTblInfo;
int samplePos = 0;
//printf("========threadID[%d], table rang: %d - %d \n", winfo->threadID, winfo->start_table_id, winfo->end_table_id);
int64_t totalRowsInserted = 0;
int64_t totalAffectedRows = 0;
int64_t lastPrintTime = taosGetTimestampMs();
char* buffer = calloc(superTblInfo->maxSqlLen+1, 1);
......@@ -3883,12 +4012,13 @@ static void syncWriteForNumberOfTblInOneSql(
uint64_t time_counter = winfo->start_time;
int sampleUsePos;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
int64_t st = 0;
int64_t et = 0;
int64_t et = 0xffffffff;
for (int i = 0; i < superTblInfo->insertRows;) {
int32_t tbl_id = 0;
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) {
int64_t tmp_time = 0;
for (int tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; ) {
int64_t start_time = 0;
int inserted = i;
for (int k = 0; k < g_args.num_of_RPR;) {
......@@ -3896,12 +4026,12 @@ static void syncWriteForNumberOfTblInOneSql(
memset(buffer, 0, superTblInfo->maxSqlLen);
char *pstr = buffer;
int32_t end_tbl_id = tID + numberOfTblInOneSql;
int32_t end_tbl_id = tableSeq + numberOfTblInOneSql;
if (end_tbl_id > winfo->end_table_id) {
end_tbl_id = winfo->end_table_id+1;
}
for (tbl_id = tID; tbl_id < end_tbl_id; tbl_id++) {
for (tbl_id = tableSeq ; tbl_id < end_tbl_id; tbl_id++) {
sampleUsePos = samplePos;
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
......@@ -3969,18 +4099,16 @@ static void syncWriteForNumberOfTblInOneSql(
}
}
tmp_time = time_counter;
for (k = 0; k < superTblInfo->rowsPerTbl;) {
start_time = time_counter;
for (int j = 0; j < superTblInfo->rowsPerTbl;) {
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) {
retLen = getRowDataFromSample(pstr + len,
superTblInfo->maxSqlLen - len,
tmp_time += superTblInfo->timeStampStep,
superTblInfo,
&sampleUsePos,
fp,
sampleDataBuf);
retLen = getRowDataFromSample(pstr + len,
superTblInfo->maxSqlLen - len,
start_time += superTblInfo->timeStampStep,
superTblInfo,
&sampleUsePos);
if (retLen < 0) {
goto free_and_statistics;
}
......@@ -3989,15 +4117,15 @@ static void syncWriteForNumberOfTblInOneSql(
int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) {
int64_t d = tmp_time - rand() % superTblInfo->disorderRange;
retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len,
d,
int64_t d = start_time - rand() % superTblInfo->disorderRange;
retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len,
d,
superTblInfo);
} else {
retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len,
tmp_time += superTblInfo->timeStampStep,
retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len,
start_time += superTblInfo->timeStampStep,
superTblInfo);
}
if (retLen < 0) {
......@@ -4006,12 +4134,12 @@ static void syncWriteForNumberOfTblInOneSql(
}
len += retLen;
//inserted++;
k++;
totalRowsInserted++;
j++;
winfo->totalInsertRows++;
if (inserted >= superTblInfo->insertRows ||
(superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) {
tID = tbl_id + 1;
tableSeq = tbl_id + 1;
printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n",
superTblInfo->lenOfOneRow);
goto send_to_server;
......@@ -4019,18 +4147,18 @@ static void syncWriteForNumberOfTblInOneSql(
}
}
tID = tbl_id;
tableSeq = tbl_id;
inserted += superTblInfo->rowsPerTbl;
send_to_server:
if (g_args.insert_interval && (g_args.insert_interval > (et - st))) {
int sleep_time = g_args.insert_interval - (et -st);
if (insert_interval) {
st = taosGetTimestampUs();
if (insert_interval > ((et - st)/1000)) {
int sleep_time = insert_interval - (et -st);
printf("sleep: %d ms insert interval\n", sleep_time);
taosMsleep(sleep_time); // ms
}
if (g_args.insert_interval) {
st = taosGetTimestampMs();
}
}
if (0 == strncasecmp(superTblInfo->insertMode,
......@@ -4064,7 +4192,7 @@ send_to_server:
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID,
winfo->totalRowsInserted,
winfo->totalInsertRows,
winfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
......@@ -4081,19 +4209,19 @@ send_to_server:
goto free_and_statistics;
}
}
if (g_args.insert_interval) {
et = taosGetTimestampMs();
if (insert_interval) {
et = taosGetTimestampUs();
}
break;
}
if (tID > winfo->end_table_id) {
if (tableSeq > winfo->end_table_id) {
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
samplePos = sampleUsePos;
}
i = inserted;
time_counter = tmp_time;
time_counter = start_time;
}
}
......@@ -4102,14 +4230,13 @@ send_to_server:
free_and_statistics:
tmfree(buffer);
winfo->totalRowsInserted = totalRowsInserted;
winfo->totalAffectedRows = totalAffectedRows;
printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", winfo->threadID, totalRowsInserted, totalAffectedRows);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n",
winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows);
return;
}
int32_t generateData(char *res, char **data_type,
int num_of_cols, int64_t timestamp, int len_of_binary) {
int num_of_cols, int64_t timestamp, int lenOfBinary) {
memset(res, 0, MAX_DATA_SIZE);
char *pstr = res;
pstr += sprintf(pstr, "(%" PRId64, timestamp);
......@@ -4144,13 +4271,13 @@ int32_t generateData(char *res, char **data_type,
bool b = rand() & 1;
pstr += sprintf(pstr, ", %s", b ? "true" : "false");
} else if (strcasecmp(data_type[i % c], "binary") == 0) {
char *s = malloc(len_of_binary);
rand_string(s, len_of_binary);
char *s = malloc(lenOfBinary);
rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ", \"%s\"", s);
free(s);
}else if (strcasecmp(data_type[i % c], "nchar") == 0) {
char *s = malloc(len_of_binary);
rand_string(s, len_of_binary);
char *s = malloc(lenOfBinary);
rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ", \"%s\"", s);
free(s);
}
......@@ -4166,337 +4293,314 @@ int32_t generateData(char *res, char **data_type,
return (int32_t)(pstr - res);
}
// sync insertion
/*
1 thread: 100 tables * 2000 rows/s
1 thread: 10 tables * 20000 rows/s
6 thread: 300 tables * 2000 rows/s
static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
char* sampleDataBuf = NULL;
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/
static void* syncWrite(void *sarg) {
// each thread read sample data from csv file
if (0 == strncasecmp(superTblInfo->dataSource,
"sample",
strlen("sample"))) {
sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) {
fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n",
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno));
return -1;
}
threadInfo *winfo = (threadInfo *)sarg;
int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
tmfree(superTblInfo->sampleDataBuf);
return -1;
}
}
char buffer[BUFFER_SIZE] = "\0";
char data[MAX_DATA_SIZE];
char **data_type = g_args.datatype;
int len_of_binary = g_args.len_of_binary;
superTblInfo->sampleDataBuf = sampleDataBuf;
return 0;
}
static int execInsert(threadInfo *winfo, char *buffer, int k)
{
int affectedRows;
SSuperTable* superTblInfo = winfo->superTblInfo;
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
} else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
if (0 != retCode) {
affectedRows = -1;
printf("========restful return fail, threadID[%d]\n", winfo->threadID);
} else {
affectedRows = k;
}
}
} else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, 1);
}
if (0 > affectedRows){
return affectedRows;
}
return affectedRows;
}
static int generateDataBuffer(int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
int64_t startFrom, int64_t startTime, int *pSampleUsePos)
{
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int ncols_per_record = 1; // count first col ts
int i = 0;
while(g_args.datatype[i]) {
i ++;
ncols_per_record ++;
if (superTblInfo == NULL) {
int datatypeSeq = 0;
while(g_args.datatype[datatypeSeq]) {
datatypeSeq ++;
ncols_per_record ++;
}
}
srand((uint32_t)time(NULL));
int64_t time_counter = winfo->start_time;
assert(buffer != NULL);
uint64_t st = 0;
uint64_t et = 0;
char *pChildTblName;
int childTblCount;
winfo->totalRowsInserted = 0;
winfo->totalAffectedRows = 0;
if (superTblInfo && (superTblInfo->childTblOffset > 0)) {
// TODO
// select tbname from stb limit 1 offset tableSeq
getChildNameOfSuperTableWithLimitAndOffset(pThreadInfo->taos,
pThreadInfo->db_name, superTblInfo->sTblName,
&pChildTblName, &childTblCount,
1, tableSeq);
} else {
pChildTblName = calloc(TSDB_TABLE_NAME_LEN, 1);
if (NULL == pChildTblName) {
fprintf(stderr, "failed to alloc memory %d\n", TSDB_TABLE_NAME_LEN);
return -1;
}
snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d",
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq);
}
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
int64_t tmp_time = time_counter;
char *pstr = buffer;
if (superTblInfo) {
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo);
} else {
tagsValBuf = getTagValueFromTagSample(
superTblInfo,
tableSeq % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
fprintf(stderr, "tag buf failed to allocate memory\n");
free(pChildTblName);
return -1;
}
for (int i = 0; i < g_args.num_of_DPT;) {
pstr += snprintf(pstr,
superTblInfo->maxSqlLen,
"insert into %s.%s using %s.%s tags %s values",
pThreadInfo->db_name,
pChildTblName,
pThreadInfo->db_name,
superTblInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
pstr += snprintf(pstr,
superTblInfo->maxSqlLen,
"insert into %s.%s values",
pThreadInfo->db_name,
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
} else {
pstr += snprintf(pstr,
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
"insert into %s.%s values",
pThreadInfo->db_name,
pChildTblName);
}
} else {
pstr += snprintf(pstr,
(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len),
"insert into %s.%s values",
pThreadInfo->db_name,
pChildTblName);
}
int tblInserted = i;
int k;
int len = 0;
char *pstr = buffer;
pstr += sprintf(pstr,
"insert into %s.%s%d values ",
winfo->db_name, g_args.tb_prefix, tID);
int k;
for (k = 0; k < g_args.num_of_RPR;) {
int rand_num = rand() % 100;
int len = -1;
verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
for (k = 0; k < g_args.num_of_RPR;) {
if (superTblInfo) {
int retLen = 0;
if ((g_args.disorderRatio != 0)
&& (rand_num < g_args.disorderRange)) {
int64_t d = tmp_time - rand() % 1000000 + rand_num;
len = generateData(data, data_type,
ncols_per_record, d, len_of_binary);
if (0 == strncasecmp(superTblInfo->dataSource,
"sample", strlen("sample"))) {
retLen = getRowDataFromSample(
pstr + len,
superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * startFrom,
superTblInfo,
pSampleUsePos);
} else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) {
int64_t d = startTime - rand() % superTblInfo->disorderRange;
retLen = generateRowData(
pstr + len,
superTblInfo->maxSqlLen - len,
d,
superTblInfo);
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d);
} else {
len = generateData(data, data_type,
ncols_per_record, tmp_time += 1000, len_of_binary);
retLen = generateRowData(
pstr + len,
superTblInfo->maxSqlLen - len,
startTime + superTblInfo->timeStampStep * startFrom,
superTblInfo);
}
//assert(len + pstr - buffer < BUFFER_SIZE);
if (len + pstr - buffer >= BUFFER_SIZE) { // too long
break;
if (retLen < 0) {
free(pChildTblName);
return -1;
}
pstr += sprintf(pstr, " %s", data);
tblInserted++;
k++;
i++;
if (tblInserted >= g_args.num_of_DPT)
break;
}
winfo->totalRowsInserted += k;
/* puts(buffer); */
int64_t startTs;
int64_t endTs;
startTs = taosGetTimestampUs();
//queryDB(winfo->taos, buffer);
if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) {
int sleep_time = g_args.insert_interval - (et -st);
printf("sleep: %d ms specified by insert_interval\n", sleep_time);
taosMsleep(sleep_time); // ms
len += retLen;
}
} else {
int rand_num = rand() % 100;
char data[MAX_DATA_SIZE];
char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary;
if (g_args.insert_interval) {
st = taosGetTimestampMs();
}
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, 1);
if (0 < affectedRows){
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
if (delay > winfo->maxDelay)
winfo->maxDelay = delay;
if (delay < winfo->minDelay)
winfo->minDelay = delay;
winfo->cntDelay++;
winfo->totalDelay += delay;
winfo->totalAffectedRows += affectedRows;
winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
if ((g_args.disorderRatio != 0)
&& (rand_num < g_args.disorderRange)) {
int64_t d = startTime - rand() % 1000000 + rand_num;
len = generateData(data, data_type,
ncols_per_record, d, lenOfBinary);
} else {
fprintf(stderr, "queryDbExec() buffer:\n%s\naffected rows is %d", buffer, affectedRows);
}
verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64" tblInserted=%d\n", __func__, __LINE__, winfo->totalAffectedRows, tblInserted);
if (g_args.insert_interval) {
et = taosGetTimestampMs();
len = generateData(data, data_type,
ncols_per_record,
startTime + DEFAULT_TIMESTAMP_STEP * startFrom,
lenOfBinary);
}
if (tblInserted >= g_args.num_of_DPT) {
//assert(len + pstr - buffer < BUFFER_SIZE);
if (len + pstr - buffer >= g_args.max_sql_len) { // too long
break;
}
} // num_of_DPT
} // tId
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID,
winfo->totalRowsInserted,
winfo->totalAffectedRows);
pstr += sprintf(pstr, " %s", data);
}
return NULL;
verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer);
k++;
startFrom ++;
if (startFrom >= insertRows)
break;
}
return k;
}
// sync insertion
/*
1 thread: 100 tables * 2000 rows/s
1 thread: 10 tables * 20000 rows/s
6 thread: 300 tables * 2000 rows/s
static void* syncWriteWithStb(void *sarg) {
uint64_t lastPrintTime = taosGetTimestampMs();
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/
static void* syncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
FILE *fp = NULL;
char* sampleDataBuf = NULL;
int samplePos = 0;
// each thread read sample data from csv file
if (0 == strncasecmp(superTblInfo->dataSource,
"sample",
strlen("sample"))) {
sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) {
printf("Failed to calloc %d Bytes, reason:%s\n",
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno));
return NULL;
}
if (superTblInfo) {
if (0 != prepareSampleDataForSTable(superTblInfo))
return NULL;
fp = fopen(superTblInfo->sampleFile, "r");
if (fp == NULL) {
printf("Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
tmfree(sampleDataBuf);
if (superTblInfo->numberOfTblInOneSql > 0) {
syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf);
tmfree(superTblInfo->sampleDataBuf);
return NULL;
}
int ret = readSampleFromCsvFileToMem(fp,
superTblInfo, sampleDataBuf);
if (0 != ret) {
tmfree(sampleDataBuf);
tmfclose(fp);
return NULL;
}
}
if (superTblInfo->numberOfTblInOneSql > 0) {
syncWriteForNumberOfTblInOneSql(winfo, fp, sampleDataBuf);
tmfree(sampleDataBuf);
tmfclose(fp);
return NULL;
}
int samplePos = 0;
char* buffer = calloc(superTblInfo->maxSqlLen, 1);
char* buffer = calloc(superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len, 1);
if (NULL == buffer) {
printf("Failed to calloc %d Bytes, reason:%s\n",
fprintf(stderr, "Failed to alloc %d Bytes, reason:%s\n",
superTblInfo->maxSqlLen,
strerror(errno));
tmfree(sampleDataBuf);
tmfclose(fp);
tmfree(superTblInfo->sampleDataBuf);
return NULL;
}
int64_t lastPrintTime = taosGetTimestampMs();
int64_t startTs = taosGetTimestampUs();
int64_t endTs;
int insert_interval = superTblInfo?superTblInfo->insertInterval:
g_args.insert_interval;
uint64_t st = 0;
uint64_t et = 0;
uint64_t et = 0xffffffff;
winfo->totalRowsInserted = 0;
winfo->totalInsertRows = 0;
winfo->totalAffectedRows = 0;
int sampleUsePos;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows);
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id;
tID++) {
for (uint32_t tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id;
tableSeq ++) {
int64_t start_time = winfo->start_time;
for (int i = 0; i < superTblInfo->insertRows;) {
int64_t tblInserted = i;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
if (i > 0 && superTblInfo->insertInterval
&& (superTblInfo->insertInterval > (et - st) )) {
int sleep_time = superTblInfo->insertInterval - (et -st);
printf("sleep: %d ms insert interval\n", sleep_time);
taosMsleep(sleep_time); // ms
}
if (superTblInfo->insertInterval) {
st = taosGetTimestampMs();
for (int64_t i = 0; i < insertRows;) {
if (insert_interval) {
st = taosGetTimestampUs();
}
sampleUsePos = samplePos;
verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
memset(buffer, 0, superTblInfo->maxSqlLen);
int len = 0;
char *pstr = buffer;
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo);
} else {
tagsValBuf = getTagValueFromTagSample(
superTblInfo,
tID % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
goto free_and_statistics_2;
}
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s%d using %s.%s tags %s values",
winfo->db_name,
superTblInfo->childTblPrefix,
tID,
winfo->db_name,
superTblInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s values",
winfo->db_name,
superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN);
} else {
len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len,
"insert into %s.%s%d values",
winfo->db_name,
superTblInfo->childTblPrefix,
tID);
}
int k;
for (k = 0; k < g_args.num_of_RPR;) {
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
retLen = getRowDataFromSample(
pstr + len,
superTblInfo->maxSqlLen - len,
start_time + superTblInfo->timeStampStep * i,
superTblInfo,
&sampleUsePos,
fp,
sampleDataBuf);
if (retLen < 0) {
goto free_and_statistics_2;
}
} else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) {
int64_t d = start_time - rand() % superTblInfo->disorderRange;
retLen = generateRowData(
pstr + len,
superTblInfo->maxSqlLen - len,
d,
superTblInfo);
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d);
} else {
retLen = generateRowData(
pstr + len,
superTblInfo->maxSqlLen - len,
start_time + superTblInfo->timeStampStep * i,
superTblInfo);
}
if (retLen < 0) {
goto free_and_statistics_2;
}
}
len += retLen;
verbosePrint("%s() LN%d retLen=%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, retLen, len, k, buffer);
int generated = generateDataBuffer(tableSeq, winfo, buffer, insertRows,
i, start_time, &sampleUsePos);
if (generated > 0)
i += generated;
else
goto free_and_statistics_2;
tblInserted++;
k++;
i++;
if (tblInserted >= superTblInfo->insertRows)
break;
}
winfo->totalRowsInserted += k;
int64_t startTs = taosGetTimestampUs();
int64_t endTs;
int affectedRows;
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
if (0 > affectedRows){
goto free_and_statistics_2;
}
} else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
if (0 != retCode) {
printf("========restful return fail, threadID[%d]\n", winfo->threadID);
goto free_and_statistics_2;
}
int affectedRows = execInsert(winfo, buffer, generated);
if (affectedRows < 0)
goto free_and_statistics_2;
affectedRows = k;
}
winfo->totalInsertRows += generated;
winfo->totalAffectedRows += affectedRows;
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
......@@ -4505,53 +4609,57 @@ static void* syncWriteWithStb(void *sarg) {
winfo->cntDelay++;
winfo->totalDelay += delay;
winfo->totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID,
winfo->totalRowsInserted,
winfo->totalInsertRows,
winfo->totalAffectedRows);
lastPrintTime = currentPrintTime;
}
if (superTblInfo->insertInterval) {
et = taosGetTimestampMs();
}
if (tblInserted >= superTblInfo->insertRows)
if (i >= insertRows)
break;
if (insert_interval) {
et = taosGetTimestampUs();
if (insert_interval > ((et - st)/1000) ) {
int sleep_time = insert_interval - (et -st)/1000;
verbosePrint("%s() LN%d sleep: %d ms for insert interval\n", __func__, __LINE__, sleep_time);
taosMsleep(sleep_time); // ms
}
}
} // num_of_DPT
if (tID == winfo->end_table_id) {
if (0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample"))) {
if ((tableSeq == winfo->end_table_id) && superTblInfo &&
(0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample")))) {
samplePos = sampleUsePos;
}
}
} // tID
} // tableSeq
free_and_statistics_2:
tmfree(buffer);
tmfree(sampleDataBuf);
tmfclose(fp);
if (superTblInfo)
tmfree(superTblInfo->sampleDataBuf);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID,
winfo->totalRowsInserted,
winfo->totalInsertRows,
winfo->totalAffectedRows);
return NULL;
}
void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* winfo = (threadInfo*)param;
SSuperTable* superTblInfo = winfo->superTblInfo;
if (g_args.insert_interval) {
winfo->et = taosGetTimestampMs();
if (winfo->et - winfo->st < 1000) {
taosMsleep(1000 - (winfo->et - winfo->st)); // ms
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
winfo->et = taosGetTimestampUs();
if (((winfo->et - winfo->st)/1000) < insert_interval) {
taosMsleep(insert_interval - (winfo->et - winfo->st)/1000); // ms
}
}
......@@ -4580,7 +4688,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
//generateData(data, datatype, ncols_per_record, d, len_of_binary);
(void)generateRowData(data, MAX_DATA_SIZE, d, winfo->superTblInfo);
} else {
//generateData(data, datatype, ncols_per_record, tmp_time += 1000, len_of_binary);
//generateData(data, datatype, ncols_per_record, start_time += 1000, len_of_binary);
(void)generateRowData(data, MAX_DATA_SIZE, winfo->lastTs += 1000, winfo->superTblInfo);
}
pstr += sprintf(pstr, "%s", data);
......@@ -4591,8 +4699,8 @@ void callBack(void *param, TAOS_RES *res, int code) {
}
}
if (g_args.insert_interval) {
winfo->st = taosGetTimestampMs();
if (insert_interval) {
winfo->st = taosGetTimestampUs();
}
taos_query_a(winfo->taos, buffer, callBack, winfo);
free(buffer);
......@@ -4603,13 +4711,15 @@ void callBack(void *param, TAOS_RES *res, int code) {
void *asyncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
winfo->st = 0;
winfo->et = 0;
winfo->lastTs = winfo->start_time;
if (g_args.insert_interval) {
winfo->st = taosGetTimestampMs();
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
winfo->st = taosGetTimestampUs();
}
taos_query_a(winfo->taos, "show databases", callBack, winfo);
......@@ -4618,17 +4728,24 @@ void *asyncWrite(void *sarg) {
return NULL;
}
void startMultiThreadInsertData(int threads, char* db_name, char* precision,
SSuperTable* superTblInfo) {
static void startMultiThreadInsertData(int threads, char* db_name,
char* precision,SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
assert(pids != NULL);
threadInfo *infos = malloc(threads * sizeof(threadInfo));
assert(infos != NULL);
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
int ntables = 0;
if (superTblInfo)
ntables = superTblInfo->childTblCount;
if (superTblInfo->childTblOffset)
ntables = superTblInfo->childTblLimit;
else
ntables = superTblInfo->childTblCount;
else
ntables = g_args.num_of_tables;
......@@ -4659,7 +4776,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
} else if (0 == strncasecmp(precision, "us", 2)) {
timePrec = TSDB_TIME_PRECISION_MICRO;
} else {
printf("No support precision: %s\n", precision);
fprintf(stderr, "No support precision: %s\n", precision);
exit(-1);
}
}
......@@ -4668,11 +4785,11 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
start_time = taosGetTimestamp(timePrec);
} else {
} else {
if (TSDB_CODE_SUCCESS != taosParseTime(
superTblInfo->startTimestamp,
&start_time,
strlen(superTblInfo->startTimestamp),
superTblInfo->startTimestamp,
&start_time,
strlen(superTblInfo->startTimestamp),
timePrec, 0)) {
printf("ERROR to parse time!\n");
exit(-1);
......@@ -4683,8 +4800,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
}
double start = getCurrentTime();
int last = 0;
int last;
if ((superTblInfo) && (superTblInfo->childTblOffset))
last = superTblInfo->childTblOffset;
else
last = 0;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
......@@ -4698,7 +4821,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
(0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) {
//t_info->taos = taos;
t_info->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) {
printf("connect to server fail from insert sub thread, reason: %s\n",
......@@ -4722,16 +4845,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
tsem_init(&(t_info->lock_sem), 0, 0);
if (SYNC == g_Dbs.queryMode) {
if (superTblInfo) {
pthread_create(pids + i, NULL, syncWriteWithStb, t_info);
} else {
pthread_create(pids + i, NULL, syncWrite, t_info);
}
} else {
pthread_create(pids + i, NULL, syncWrite, t_info);
} else {
pthread_create(pids + i, NULL, asyncWrite, t_info);
}
}
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
}
......@@ -4750,13 +4869,13 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
if (superTblInfo) {
superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalRowsInserted += t_info->totalRowsInserted;
superTblInfo->totalInsertRows += t_info->totalInsertRows;
}
totalDelay += t_info->totalDelay;
cntDelay += t_info->cntDelay;
if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay;
if (t_info->minDelay < minDelay) minDelay = t_info->minDelay;
if (t_info->minDelay < minDelay) minDelay = t_info->minDelay;
}
cntDelay -= 1;
......@@ -4767,16 +4886,17 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
double t = end - start;
if (superTblInfo) {
printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
t, superTblInfo->totalRowsInserted,
printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
t, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
superTblInfo->totalRowsInserted / t);
fprintf(g_fpOfInsertResult, "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
t, superTblInfo->totalRowsInserted,
superTblInfo->totalInsertRows / t);
fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
t, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
superTblInfo->totalRowsInserted / t);
superTblInfo->totalInsertRows/ t);
}
printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n",
......@@ -4787,10 +4907,9 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
//taos_close(taos);
free(pids);
free(infos);
free(infos);
}
void *readTable(void *sarg) {
#if 1
threadInfo *rinfo = (threadInfo *)sarg;
......@@ -4935,7 +5054,7 @@ void *readMetric(void *sarg) {
}
int insertTestProcess() {
static int insertTestProcess() {
setupForAnsiEscape();
int ret = printfInsertMeta();
......@@ -4949,15 +5068,15 @@ int insertTestProcess() {
if (NULL == g_fpOfInsertResult) {
fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile);
return -1;
} {
printfInsertMetaToFile(g_fpOfInsertResult);
}
printfInsertMetaToFile(g_fpOfInsertResult);
if (!g_args.answer_yes) {
printf("Press enter key to continue\n\n");
(void)getchar();
}
init_rand_data();
// create database and super tables
......@@ -4967,7 +5086,7 @@ int insertTestProcess() {
}
// pretreatement
prePareSampleData();
prepareSampleData();
double start;
double end;
......@@ -4978,47 +5097,47 @@ int insertTestProcess() {
end = getCurrentTime();
if (g_totalChildTables > 0) {
printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
end - start, g_totalChildTables, g_Dbs.threadCount);
fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
end - start, g_totalChildTables, g_Dbs.threadCount);
}
taosMsleep(1000);
// create sub threads for inserting data
//start = getCurrentTime();
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].superTblCount > 0) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
if (0 == g_Dbs.db[i].superTbls[j].insertRows) {
continue;
}
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
superTblInfo);
}
} else {
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
NULL);
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].superTblCount > 0) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
if (0 == g_Dbs.db[i].superTbls[j].insertRows) {
continue;
}
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
superTblInfo);
}
} else {
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
NULL);
}
}
//end = getCurrentTime();
//int64_t totalRowsInserted = 0;
//int64_t totalInsertRows = 0;
//int64_t totalAffectedRows = 0;
//for (int i = 0; i < g_Dbs.dbCount; i++) {
// for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// totalRowsInserted += g_Dbs.db[i].superTbls[j].totalRowsInserted;
// totalInsertRows+= g_Dbs.db[i].superTbls[j].totalInsertRows;
// totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows;
//}
//printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalRowsInserted, totalAffectedRows, g_Dbs.threadCount);
//printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalInsertRows, totalAffectedRows, g_Dbs.threadCount);
postFreeResource();
return 0;
......@@ -5039,7 +5158,7 @@ void *superQueryProcess(void *sarg) {
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
}
st = taosGetTimestampMs();
st = taosGetTimestampUs();
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
int64_t t1 = taosGetTimestampUs();
......@@ -5065,14 +5184,14 @@ void *superQueryProcess(void *sarg) {
}
}
}
et = taosGetTimestampMs();
et = taosGetTimestampUs();
printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n",
taosGetSelfPthreadId(), (double)(et - st)/1000.0);
}
return NULL;
}
void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
char sourceString[32] = "xxxx";
char subTblName[MAX_TB_NAME_SIZE*3];
sprintf(subTblName, "%s.%s",
......@@ -5094,7 +5213,7 @@ void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
//printf("3: %s\n", outSql);
}
void *subQueryProcess(void *sarg) {
static void *subQueryProcess(void *sarg) {
char sqlstr[1024];
threadInfo *winfo = (threadInfo *)sarg;
int64_t st = 0;
......@@ -5105,7 +5224,7 @@ void *subQueryProcess(void *sarg) {
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
}
st = taosGetTimestampMs();
st = taosGetTimestampUs();
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
for (int j = 0; j < g_queryInfo.subQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr));
......@@ -5119,12 +5238,12 @@ void *subQueryProcess(void *sarg) {
selectAndGetResult(winfo->taos, sqlstr, tmpFile);
}
}
et = taosGetTimestampMs();
et = taosGetTimestampUs();
printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n",
taosGetSelfPthreadId(),
winfo->start_table_id,
winfo->end_table_id,
(double)(et - st)/1000.0);
(double)(et - st)/1000000.0);
}
return NULL;
}
......@@ -5142,10 +5261,10 @@ static int queryTestProcess() {
}
if (0 != g_queryInfo.subQueryInfo.sqlCount) {
(void)getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName,
getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName,
&g_queryInfo.subQueryInfo.childTblCount);
}
......@@ -5162,7 +5281,7 @@ static int queryTestProcess() {
threadInfo *infos = NULL;
//==== create sub threads for query from specify table
if (g_queryInfo.superQueryInfo.sqlCount > 0 && g_queryInfo.superQueryInfo.concurrent > 0) {
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t));
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
......@@ -5170,14 +5289,14 @@ static int queryTestProcess() {
taos_close(taos);
exit(-1);
}
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
t_info->threadID = i;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
t_info->taos = taos;
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
......@@ -5186,16 +5305,17 @@ static int queryTestProcess() {
t_info->taos = NULL;
}
pthread_create(pids + i, NULL, superQueryProcess, t_info);
}
pthread_create(pids + i, NULL, superQueryProcess, t_info);
}
}else {
g_queryInfo.superQueryInfo.concurrent = 0;
}
pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL;
//==== create sub threads for query from all sub table of the super table
if ((g_queryInfo.subQueryInfo.sqlCount > 0) && (g_queryInfo.subQueryInfo.threadCnt > 0)) {
if ((g_queryInfo.subQueryInfo.sqlCount > 0)
&& (g_queryInfo.subQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t));
infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
......@@ -5203,7 +5323,7 @@ static int queryTestProcess() {
taos_close(taos);
exit(-1);
}
int ntables = g_queryInfo.subQueryInfo.childTblCount;
int threads = g_queryInfo.subQueryInfo.threadCnt;
......@@ -5212,12 +5332,12 @@ static int queryTestProcess() {
threads = ntables;
a = 1;
}
int b = 0;
if (threads != 0) {
b = ntables % threads;
}
int last = 0;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i;
......@@ -5411,7 +5531,7 @@ void *superSubscribeProcess(void *sarg) {
}
}
taos_free_result(res);
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
taos_unsubscribe(g_queryInfo.superQueryInfo.tsub[i],
g_queryInfo.superQueryInfo.subscribeKeepProgress);
......@@ -5439,14 +5559,13 @@ static int subscribeTestProcess() {
}
if (0 != g_queryInfo.subQueryInfo.sqlCount) {
(void)getAllChildNameOfSuperTable(taos,
getAllChildNameOfSuperTable(taos,
g_queryInfo.dbName,
g_queryInfo.subQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName,
&g_queryInfo.subQueryInfo.childTblCount);
}
pthread_t *pids = NULL;
threadInfo *infos = NULL;
//==== create sub threads for query from super table
......@@ -5455,53 +5574,53 @@ static int subscribeTestProcess() {
pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t));
infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
printf("malloc failed for create threads\n");
printf("malloc failed for create threads\n");
taos_close(taos);
exit(-1);
}
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
t_info->taos = taos;
pthread_create(pids + i, NULL, superSubscribeProcess, t_info);
}
}
//==== create sub threads for query from sub table
pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL;
if ((g_queryInfo.subQueryInfo.sqlCount > 0)
if ((g_queryInfo.subQueryInfo.sqlCount > 0)
&& (g_queryInfo.subQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt *
pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt *
sizeof(pthread_t));
infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt *
sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
printf("malloc failed for create threads\n");
printf("malloc failed for create threads\n");
taos_close(taos);
exit(-1);
}
int ntables = g_queryInfo.subQueryInfo.childTblCount;
int threads = g_queryInfo.subQueryInfo.threadCnt;
int a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int b = 0;
if (threads != 0) {
b = ntables % threads;
}
int last = 0;
for (int i = 0; i < threads; i++) {
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i;
t_info->threadID = i;
t_info->start_table_id = last;
t_info->end_table_id = i < b ? last + a : last + a - 1;
t_info->taos = taos;
......@@ -5509,20 +5628,20 @@ static int subscribeTestProcess() {
}
g_queryInfo.subQueryInfo.threadCnt = threads;
}
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
pthread_join(pids[i], NULL);
}
tmfree((char*)pids);
tmfree((char*)infos);
tmfree((char*)infos);
for (int i = 0; i < g_queryInfo.subQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL);
}
tmfree((char*)pidsOfSub);
tmfree((char*)infosOfSub);
tmfree((char*)infosOfSub);
taos_close(taos);
return 0;
}
......@@ -5615,9 +5734,9 @@ void setParaFromArg(){
tstrncpy(g_Dbs.db[0].superTbls[0].insertMode, "taosc", MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = 10;
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE;
g_Dbs.db[0].superTbls[0].columnCount = 0;
......@@ -5761,14 +5880,7 @@ static void testMetaFile() {
}
}
static void testCmdLine() {
g_args.test_mode = INSERT_MODE;
insertTestProcess();
if (g_Dbs.insert_only)
return;
static void queryResult() {
// select
if (false == g_Dbs.insert_only) {
// query data
......@@ -5814,6 +5926,17 @@ static void testCmdLine() {
}
}
static void testCmdLine() {
g_args.test_mode = INSERT_MODE;
insertTestProcess();
if (g_Dbs.insert_only)
return;
else
queryResult();
}
int main(int argc, char *argv[]) {
parse_args(argc, argv, &g_args);
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import os
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.numberOfTables = 100
self.numberOfRecords = 1000
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def run(self):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath + "/build/bin/"
os.system("%staosdemo -y -t %d -n %d -x" %
(binPath, self.numberOfTables, self.numberOfRecords))
tdSql.query("show databases")
for i in range(18):
print(tdSql.getData(0, i) )
tdSql.checkData(0, 2, self.numberOfTables)
tdSql.execute("use test")
tdSql.query(
"select count(*) from test.t%d" % (self.numberOfTables -1))
tdSql.checkData(0, 0, self.numberOfRecords)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -316,6 +316,7 @@ if [ "$2" != "sim" ] && [ "$2" != "python" ] && [ "$2" != "unit" ] && [ "$1" ==
cd debug/
stopTaosd
rm -rf /var/lib/taos/*
nohup build/bin/taosd -c /etc/taos/ > /dev/null 2>&1 &
sleep 30
......@@ -358,6 +359,7 @@ if [ "$2" != "sim" ] && [ "$2" != "python" ] && [ "$2" != "jdbc" ] && [ "$1" ==
pwd
cd debug/build/bin
rm -rf /var/lib/taos/*
nohup ./taosd -c /etc/taos/ > /dev/null 2>&1 &
sleep 30
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册