未验证 提交 44621d8a 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 4892 taosdemo sub fetch (#6614)

* [TD-4892]<fix>: taosdemo subscribe fetch result.

* fix stbname length.

* restrict prefix length.

* submit empty

* fix minor code.
上级 d02ce1e2
......@@ -79,10 +79,9 @@ enum TEST_MODE {
#define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2)
#define COND_BUF_LEN BUFFER_SIZE - 30
#define COND_BUF_LEN (BUFFER_SIZE - 30)
#define MAX_USERNAME_SIZE 64
#define MAX_PASSWORD_SIZE 64
#define MAX_DB_NAME_SIZE 64
#define MAX_HOSTNAME_SIZE 64
#define MAX_TB_NAME_SIZE 64
#define MAX_DATA_SIZE (16*1024)+20 // max record len: 16*1024, timestamp string and ,('') need extra space
......@@ -90,7 +89,7 @@ enum TEST_MODE {
#define OPT_ABORT 1 /* –abort */
#define STRING_LEN 60000
#define MAX_PREPARED_RAND 1000000
#define MAX_FILE_NAME_LEN 256
#define MAX_FILE_NAME_LEN 256 // max file name length on linux is 255.
#define MAX_SAMPLES_ONCE_FROM_FILE 10000
#define MAX_NUM_DATATYPE 10
......@@ -195,13 +194,6 @@ enum _describe_table_index {
TSDB_MAX_DESCRIBE_METRIC
};
typedef struct {
char field[TSDB_COL_NAME_LEN + 1];
char type[16];
int length;
char note[128];
} SColDes;
/* Used by main to communicate with parse_opt. */
static char *g_dupstr = NULL;
......@@ -247,16 +239,16 @@ typedef struct SArguments_S {
} SArguments;
typedef struct SColumn_S {
char field[TSDB_COL_NAME_LEN + 1];
char dataType[MAX_TB_NAME_SIZE];
char field[TSDB_COL_NAME_LEN];
char dataType[16];
uint32_t dataLen;
char note[128];
} StrColumn;
typedef struct SSuperTable_S {
char sTblName[MAX_TB_NAME_SIZE+1];
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
char childTblPrefix[MAX_TB_NAME_SIZE];
char sTblName[TSDB_TABLE_NAME_LEN];
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
char childTblPrefix[TSDB_TABLE_NAME_LEN - 20]; // 20 characters reserved for seq
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
uint16_t childTblExists;
int64_t childTblCount;
......@@ -277,8 +269,8 @@ typedef struct SSuperTable_S {
int64_t timeStampStep;
char startTimestamp[MAX_TB_NAME_SIZE];
char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json
char sampleFile[MAX_FILE_NAME_LEN+1];
char tagsFile[MAX_FILE_NAME_LEN+1];
char sampleFile[MAX_FILE_NAME_LEN];
char tagsFile[MAX_FILE_NAME_LEN];
uint32_t columnCount;
StrColumn columns[MAX_COLUMN_COUNT];
......@@ -305,7 +297,7 @@ typedef struct SSuperTable_S {
} SSuperTable;
typedef struct {
char name[TSDB_DB_NAME_LEN + 1];
char name[TSDB_DB_NAME_LEN];
char create_time[32];
int64_t ntables;
int32_t vgroups;
......@@ -341,11 +333,11 @@ typedef struct SDbCfg_S {
int cache;
int blocks;
int quorum;
char precision[MAX_TB_NAME_SIZE];
char precision[8];
} SDbCfg;
typedef struct SDataBase_S {
char dbName[MAX_DB_NAME_SIZE];
char dbName[TSDB_DB_NAME_LEN];
bool drop; // 0: use exists, 1: if exists, drop then new create
SDbCfg dbCfg;
uint64_t superTblCount;
......@@ -353,14 +345,14 @@ typedef struct SDataBase_S {
} SDataBase;
typedef struct SDbs_S {
char cfgDir[MAX_FILE_NAME_LEN+1];
char cfgDir[MAX_FILE_NAME_LEN];
char host[MAX_HOSTNAME_SIZE];
struct sockaddr_in serv_addr;
uint16_t port;
char user[MAX_USERNAME_SIZE];
char password[MAX_PASSWORD_SIZE];
char resultFile[MAX_FILE_NAME_LEN+1];
char resultFile[MAX_FILE_NAME_LEN];
bool use_metric;
bool insert_only;
bool do_aggreFunc;
......@@ -387,7 +379,7 @@ typedef struct SpecifiedQueryInfo_S {
bool subscribeRestart;
int subscribeKeepProgress;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
int endAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
......@@ -398,7 +390,7 @@ typedef struct SpecifiedQueryInfo_S {
} SpecifiedQueryInfo;
typedef struct SuperQueryInfo_S {
char sTblName[MAX_TB_NAME_SIZE+1];
char sTblName[TSDB_TABLE_NAME_LEN];
uint64_t queryInterval; // 0: unlimit > 0 loop/s
uint32_t threadCnt;
uint32_t asyncMode; // 0: sync, 1: async
......@@ -407,10 +399,10 @@ typedef struct SuperQueryInfo_S {
int subscribeKeepProgress;
uint64_t queryTimes;
int64_t childTblCount;
char childTblPrefix[MAX_TB_NAME_SIZE];
char childTblPrefix[TSDB_TABLE_NAME_LEN - 20]; // 20 characters reserved for seq
int sqlCount;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN];
int resubAfterConsume;
int endAfterConsume;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
......@@ -420,13 +412,13 @@ typedef struct SuperQueryInfo_S {
} SuperQueryInfo;
typedef struct SQueryMetaInfo_S {
char cfgDir[MAX_FILE_NAME_LEN+1];
char cfgDir[MAX_FILE_NAME_LEN];
char host[MAX_HOSTNAME_SIZE];
uint16_t port;
struct sockaddr_in serv_addr;
char user[MAX_USERNAME_SIZE];
char password[MAX_PASSWORD_SIZE];
char dbName[MAX_DB_NAME_SIZE+1];
char dbName[TSDB_DB_NAME_LEN];
char queryMode[MAX_TB_NAME_SIZE]; // taosc, rest
SpecifiedQueryInfo specifiedQueryInfo;
......@@ -438,11 +430,11 @@ typedef struct SThreadInfo_S {
TAOS * taos;
TAOS_STMT *stmt;
int threadID;
char db_name[MAX_DB_NAME_SIZE+1];
char db_name[TSDB_DB_NAME_LEN];
uint32_t time_precision;
char filePath[4096];
FILE *fp;
char tb_prefix[MAX_TB_NAME_SIZE];
char tb_prefix[TSDB_TABLE_NAME_LEN];
uint64_t start_table_from;
uint64_t end_table_to;
int64_t ntables;
......@@ -608,7 +600,7 @@ SArguments g_args = {
1, // query_times
0, // interlace_rows;
30000, // num_of_RPR
(1024*1024), // max_sql_len
(1024*1024), // max_sql_len
10000, // num_of_tables
10000, // num_of_DPT
0, // abort
......@@ -3038,7 +3030,7 @@ static int startMultiThreadCreateChildTable(
for (int64_t i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, MAX_DB_NAME_SIZE);
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->superTblInfo = superTblInfo;
verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name);
pThreadInfo->taos = taos_connect(
......@@ -3329,7 +3321,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
goto PARSE_OVER;
}
//tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
tstrncpy(columnCase.dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
tstrncpy(columnCase.dataType, dataType->valuestring, strlen(dataType->valuestring) + 1);
cJSON* dataLen = cJSON_GetObjectItem(column, "len");
if (dataLen && dataLen->type == cJSON_Number) {
......@@ -3344,7 +3336,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
for (int n = 0; n < count; ++n) {
tstrncpy(superTbls->columns[index].dataType,
columnCase.dataType, MAX_TB_NAME_SIZE);
columnCase.dataType, strlen(columnCase.dataType) + 1);
superTbls->columns[index].dataLen = columnCase.dataLen;
index++;
}
......@@ -3400,7 +3392,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
__func__, __LINE__);
goto PARSE_OVER;
}
tstrncpy(columnCase.dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
tstrncpy(columnCase.dataType, dataType->valuestring, strlen(dataType->valuestring) + 1);
cJSON* dataLen = cJSON_GetObjectItem(tag, "len");
if (dataLen && dataLen->type == cJSON_Number) {
......@@ -3415,7 +3407,7 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
for (int n = 0; n < count; ++n) {
tstrncpy(superTbls->tags[index].dataType, columnCase.dataType,
MAX_TB_NAME_SIZE);
strlen(columnCase.dataType) + 1);
superTbls->tags[index].dataLen = columnCase.dataLen;
index++;
}
......@@ -3638,7 +3630,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, db name not found\n");
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].dbName, dbName->valuestring, MAX_DB_NAME_SIZE);
tstrncpy(g_Dbs.db[i].dbName, dbName->valuestring, TSDB_DB_NAME_LEN);
cJSON *drop = cJSON_GetObjectItem(dbinfo, "drop");
if (drop && drop->type == cJSON_String && drop->valuestring != NULL) {
......@@ -3659,10 +3651,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if (precision && precision->type == cJSON_String
&& precision->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].dbCfg.precision, precision->valuestring,
MAX_DB_NAME_SIZE);
8);
} else if (!precision) {
//tstrncpy(g_Dbs.db[i].dbCfg.precision, "ms", MAX_DB_NAME_SIZE);
memset(g_Dbs.db[i].dbCfg.precision, 0, MAX_DB_NAME_SIZE);
memset(g_Dbs.db[i].dbCfg.precision, 0, 8);
} else {
printf("ERROR: failed to read json, precision not found\n");
goto PARSE_OVER;
......@@ -3839,7 +3830,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring,
MAX_TB_NAME_SIZE);
TSDB_TABLE_NAME_LEN);
cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix");
if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) {
......@@ -3847,7 +3838,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring,
MAX_DB_NAME_SIZE);
TSDB_TABLE_NAME_LEN - 20);
cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table");
if (autoCreateTbl
......@@ -3915,9 +3906,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if (dataSource && dataSource->type == cJSON_String
&& dataSource->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource,
dataSource->valuestring, MAX_DB_NAME_SIZE);
dataSource->valuestring, TSDB_DB_NAME_LEN);
} else if (!dataSource) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE);
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", TSDB_DB_NAME_LEN);
} else {
errorPrint("%s() LN%d, failed to read json, data_source not found\n",
__func__, __LINE__);
......@@ -3975,10 +3966,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON *ts = cJSON_GetObjectItem(stbInfo, "start_timestamp");
if (ts && ts->type == cJSON_String && ts->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
ts->valuestring, MAX_DB_NAME_SIZE);
ts->valuestring, TSDB_DB_NAME_LEN);
} else if (!ts) {
tstrncpy(g_Dbs.db[i].superTbls[j].startTimestamp,
"now", MAX_DB_NAME_SIZE);
"now", TSDB_DB_NAME_LEN);
} else {
printf("ERROR: failed to read json, start_timestamp not found\n");
goto PARSE_OVER;
......@@ -3998,9 +3989,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if (sampleFormat && sampleFormat->type
== cJSON_String && sampleFormat->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat,
sampleFormat->valuestring, MAX_DB_NAME_SIZE);
sampleFormat->valuestring, TSDB_DB_NAME_LEN);
} else if (!sampleFormat) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, "csv", MAX_DB_NAME_SIZE);
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFormat, "csv", TSDB_DB_NAME_LEN);
} else {
printf("ERROR: failed to read json, sample_format not found\n");
goto PARSE_OVER;
......@@ -4245,7 +4236,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (dbs && dbs->type == cJSON_String && dbs->valuestring != NULL) {
tstrncpy(g_queryInfo.dbName, dbs->valuestring, MAX_DB_NAME_SIZE);
tstrncpy(g_queryInfo.dbName, dbs->valuestring, TSDB_DB_NAME_LEN);
} else if (!dbs) {
printf("ERROR: failed to read json, databases not found\n");
goto PARSE_OVER;
......@@ -4495,7 +4486,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (stblname && stblname->type == cJSON_String
&& stblname->valuestring != NULL) {
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring,
MAX_TB_NAME_SIZE);
TSDB_TABLE_NAME_LEN);
} else {
errorPrint("%s() LN%d, failed to read json, super table name input error\n",
__func__, __LINE__);
......@@ -6408,7 +6399,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, MAX_DB_NAME_SIZE);
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->time_precision = timePrec;
pThreadInfo->superTblInfo = superTblInfo;
......@@ -6852,7 +6843,7 @@ static void *specifiedTableQuery(void *sarg) {
}
}
char sqlStr[MAX_DB_NAME_SIZE + 5];
char sqlStr[TSDB_DB_NAME_LEN + 5];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos);
......@@ -7328,12 +7319,6 @@ static void *superSubscribe(void *sarg) {
performancePrint("st: %"PRIu64" et: %"PRIu64" delta: %"PRIu64"\n", st, et, (et - st));
if (res) {
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
fetchResult(res, pThreadInfo);
}
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
......@@ -7440,10 +7425,10 @@ static void *specifiedSubscribe(void *sarg) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
fetchResult(
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID],
pThreadInfo);
}
fetchResult(
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID],
pThreadInfo);
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++;
if ((g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq] != -1)
......@@ -7680,9 +7665,9 @@ static void setParaFromArg(){
g_Dbs.dbCount = 1;
g_Dbs.db[0].drop = true;
tstrncpy(g_Dbs.db[0].dbName, g_args.database, MAX_DB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].dbName, g_args.database, TSDB_DB_NAME_LEN);
g_Dbs.db[0].dbCfg.replica = g_args.replica;
tstrncpy(g_Dbs.db[0].dbCfg.precision, "ms", MAX_DB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].dbCfg.precision, "ms", 8);
tstrncpy(g_Dbs.resultFile, g_args.output_file, MAX_FILE_NAME_LEN);
......@@ -7704,7 +7689,7 @@ static void setParaFromArg(){
if (g_args.use_metric) {
g_Dbs.db[0].superTblCount = 1;
tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].sTblName, "meters", TSDB_TABLE_NAME_LEN);
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
......@@ -7715,7 +7700,7 @@ static void setParaFromArg(){
g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange;
g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio;
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
g_args.tb_prefix, MAX_TB_NAME_SIZE);
g_args.tb_prefix, TSDB_TABLE_NAME_LEN - 20);
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].iface = g_args.iface;
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
......@@ -7732,7 +7717,7 @@ static void setParaFromArg(){
}
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
data_type[i], MAX_TB_NAME_SIZE);
data_type[i], strlen(data_type[i]) + 1);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].columnCount++;
}
......@@ -7743,18 +7728,18 @@ static void setParaFromArg(){
for (int i = g_Dbs.db[0].superTbls[0].columnCount;
i < g_args.num_of_CPR; i++) {
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
"INT", MAX_TB_NAME_SIZE);
"INT", strlen("INT") + 1);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0;
g_Dbs.db[0].superTbls[0].columnCount++;
}
}
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType,
"INT", MAX_TB_NAME_SIZE);
"INT", strlen("INT") + 1);
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0;
tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType,
"BINARY", MAX_TB_NAME_SIZE);
"BINARY", strlen("BINARY") + 1);
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].tagCount = 2;
} else {
......@@ -7890,11 +7875,11 @@ static void queryResult() {
pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
pThreadInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(pThreadInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix, MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].childTblPrefix, TSDB_TABLE_NAME_LEN - 20);
} else {
pThreadInfo->ntables = g_args.num_of_tables;
pThreadInfo->end_table_to = g_args.num_of_tables -1;
tstrncpy(pThreadInfo->tb_prefix, g_args.tb_prefix, MAX_TB_NAME_SIZE);
tstrncpy(pThreadInfo->tb_prefix, g_args.tb_prefix, TSDB_TABLE_NAME_LEN);
}
pThreadInfo->taos = taos_connect(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册