未验证 提交 4412c52f 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 4187 taosdemo keepprogress (#6149)

* [TD-4187]<fix>: taosdemo keepProgress.

* add consumed.

* [TD-4187]<fix>: taosdemo keepProgress.

add resubAfterConsume process.

* print resub msg for test.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 6d448dca
{
"filetype":"subscribe",
"filetype": "subscribe",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"databases": "dbx",
"specified_table_query":
{"concurrent":1, "mode":"sync", "interval":5000, "restart":"yes", "keepProgress":"yes",
"sqls": [{"sql": "select avg(col1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}]
},
"super_table_query":
{"stblname": "stb", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes",
"sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}]
}
"databases": "test",
"specified_table_query": {
"concurrent": 1,
"mode": "sync",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"resubAfterConsume": 10,
"sqls": [
{
"sql": "select avg(col1) from meters where col1 > 1;",
"result": "./subscribe_res0.txt"
}
]
},
"super_table_query": {
"stblname": "meters",
"threads": 1,
"mode": "sync",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"sqls": [
{
"sql": "select col1 from xxxx where col1 > 10;",
"result": "./subscribe_res1.txt"
}
]
}
}
......@@ -227,7 +227,7 @@ typedef struct SColumn_S {
char field[TSDB_COL_NAME_LEN + 1];
char dataType[MAX_TB_NAME_SIZE];
uint32_t dataLen;
char note[128];
char note[128];
} StrColumn;
typedef struct SSuperTable_S {
......@@ -360,10 +360,11 @@ typedef struct SpecifiedQueryInfo_S {
uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms
uint64_t queryTimes;
int subscribeRestart;
bool subscribeRestart;
int subscribeKeepProgress;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
uint64_t totalQueried;
} SpecifiedQueryInfo;
......@@ -374,7 +375,7 @@ typedef struct SuperQueryInfo_S {
uint32_t threadCnt;
uint32_t asyncMode; // 0: sync, 1: async
uint64_t subscribeInterval; // ms
int subscribeRestart;
bool subscribeRestart;
int subscribeKeepProgress;
uint64_t queryTimes;
int64_t childTblCount;
......@@ -382,6 +383,7 @@ typedef struct SuperQueryInfo_S {
uint64_t sqlCount;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char* childTblName;
......@@ -399,7 +401,7 @@ typedef struct SQueryMetaInfo_S {
char queryMode[MAX_TB_NAME_SIZE]; // taosc, rest
SpecifiedQueryInfo specifiedQueryInfo;
SuperQueryInfo superQueryInfo;
SuperQueryInfo superQueryInfo;
uint64_t totalQueried;
} SQueryMetaInfo;
......@@ -521,8 +523,8 @@ static void prompt2();
static int createDatabasesAndStables();
static void createChildTables();
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet);
static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port,
char* sqlstr, char *resultFile);
static int postProceSql(char *host, struct sockaddr_in *pServAddr,
uint16_t port, char* sqlstr, char *resultFile);
/* ************ Global variables ************ */
......@@ -1141,12 +1143,14 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) {
totalLen += len;
}
verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n", __func__, __LINE__, databuf, resultFile);
verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n",
__func__, __LINE__, databuf, resultFile);
appendResultBufToFile(databuf, resultFile);
free(databuf);
}
static void selectAndGetResult(threadInfo *pThreadInfo, char *command, char* resultFile)
static void selectAndGetResult(
threadInfo *pThreadInfo, char *command, char* resultFile)
{
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) {
TAOS_RES *res = taos_query(pThreadInfo->taos, command);
......@@ -1291,13 +1295,15 @@ static void init_rand_data() {
static int printfInsertMeta() {
SHOW_PARSE_RESULT_START();
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
printf("host: \033[33m%s:%u\033[0m\n",
g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
printf("configDir: \033[33m%s\033[0m\n", configDir);
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount);
printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl);
printf("thread num of create table: \033[33m%d\033[0m\n",
g_Dbs.threadCountByCreateTbl);
printf("top insert interval: \033[33m%"PRIu64"\033[0m\n",
g_args.insert_interval);
printf("number of records per req: \033[33m%"PRIu64"\033[0m\n",
......@@ -1309,7 +1315,8 @@ static int printfInsertMeta() {
for (int i = 0; i < g_Dbs.dbCount; i++) {
printf("database[\033[33m%d\033[0m]:\n", i);
printf(" database[%d] name: \033[33m%s\033[0m\n", i, g_Dbs.db[i].dbName);
printf(" database[%d] name: \033[33m%s\033[0m\n",
i, g_Dbs.db[i].dbName);
if (0 == g_Dbs.db[i].drop) {
printf(" drop: \033[33mno\033[0m\n");
} else {
......@@ -1317,40 +1324,51 @@ static int printfInsertMeta() {
}
if (g_Dbs.db[i].dbCfg.blocks > 0) {
printf(" blocks: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.blocks);
printf(" blocks: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.blocks);
}
if (g_Dbs.db[i].dbCfg.cache > 0) {
printf(" cache: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.cache);
printf(" cache: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.cache);
}
if (g_Dbs.db[i].dbCfg.days > 0) {
printf(" days: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.days);
printf(" days: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.days);
}
if (g_Dbs.db[i].dbCfg.keep > 0) {
printf(" keep: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.keep);
printf(" keep: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.keep);
}
if (g_Dbs.db[i].dbCfg.replica > 0) {
printf(" replica: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.replica);
printf(" replica: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.replica);
}
if (g_Dbs.db[i].dbCfg.update > 0) {
printf(" update: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.update);
printf(" update: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.update);
}
if (g_Dbs.db[i].dbCfg.minRows > 0) {
printf(" minRows: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.minRows);
printf(" minRows: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.minRows);
}
if (g_Dbs.db[i].dbCfg.maxRows > 0) {
printf(" maxRows: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.maxRows);
printf(" maxRows: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.maxRows);
}
if (g_Dbs.db[i].dbCfg.comp > 0) {
printf(" comp: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.comp);
}
if (g_Dbs.db[i].dbCfg.walLevel > 0) {
printf(" walLevel: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.walLevel);
printf(" walLevel: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.walLevel);
}
if (g_Dbs.db[i].dbCfg.fsync > 0) {
printf(" fsync: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.fsync);
printf(" fsync: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.fsync);
}
if (g_Dbs.db[i].dbCfg.quorum > 0) {
printf(" quorum: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.quorum);
printf(" quorum: \033[33m%d\033[0m\n",
g_Dbs.db[i].dbCfg.quorum);
}
if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
......@@ -1543,21 +1561,26 @@ static void printfInsertMetaToFile(FILE* fp) {
if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
fprintf(fp, " precision: %s\n", g_Dbs.db[i].dbCfg.precision);
fprintf(fp, " precision: %s\n",
g_Dbs.db[i].dbCfg.precision);
} else {
fprintf(fp, " precision error: %s\n", g_Dbs.db[i].dbCfg.precision);
fprintf(fp, " precision error: %s\n",
g_Dbs.db[i].dbCfg.precision);
}
}
fprintf(fp, " super table count: %"PRIu64"\n", g_Dbs.db[i].superTblCount);
fprintf(fp, " super table count: %"PRIu64"\n",
g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
fprintf(fp, " super table[%d]:\n", j);
fprintf(fp, " stbName: %s\n", g_Dbs.db[i].superTbls[j].sTblName);
fprintf(fp, " stbName: %s\n",
g_Dbs.db[i].superTbls[j].sTblName);
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
fprintf(fp, " autoCreateTable: %s\n", "no");
} else if (AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
} else if (AUTO_CREATE_SUBTBL
== g_Dbs.db[i].superTbls[j].autoCreateTable) {
fprintf(fp, " autoCreateTable: %s\n", "yes");
} else {
fprintf(fp, " autoCreateTable: %s\n", "error");
......@@ -1565,7 +1588,8 @@ static void printfInsertMetaToFile(FILE* fp) {
if (TBL_NO_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
fprintf(fp, " childTblExists: %s\n", "no");
} else if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
} else if (TBL_ALREADY_EXISTS
== g_Dbs.db[i].superTbls[j].childTblExists) {
fprintf(fp, " childTblExists: %s\n", "yes");
} else {
fprintf(fp, " childTblExists: %s\n", "error");
......@@ -1596,8 +1620,10 @@ static void printfInsertMetaToFile(FILE* fp) {
*/
fprintf(fp, " interlaceRows: %"PRIu64"\n",
g_Dbs.db[i].superTbls[j].interlaceRows);
fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange);
fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio);
fprintf(fp, " disorderRange: %d\n",
g_Dbs.db[i].superTbls[j].disorderRange);
fprintf(fp, " disorderRatio: %d\n",
g_Dbs.db[i].superTbls[j].disorderRatio);
fprintf(fp, " maxSqlLen: %"PRIu64"\n",
g_Dbs.db[i].superTbls[j].maxSqlLen);
......@@ -1605,23 +1631,29 @@ static void printfInsertMetaToFile(FILE* fp) {
g_Dbs.db[i].superTbls[j].timeStampStep);
fprintf(fp, " startTimestamp: %s\n",
g_Dbs.db[i].superTbls[j].startTimestamp);
fprintf(fp, " sampleFormat: %s\n", g_Dbs.db[i].superTbls[j].sampleFormat);
fprintf(fp, " sampleFile: %s\n", g_Dbs.db[i].superTbls[j].sampleFile);
fprintf(fp, " tagsFile: %s\n", g_Dbs.db[i].superTbls[j].tagsFile);
fprintf(fp, " sampleFormat: %s\n",
g_Dbs.db[i].superTbls[j].sampleFormat);
fprintf(fp, " sampleFile: %s\n",
g_Dbs.db[i].superTbls[j].sampleFile);
fprintf(fp, " tagsFile: %s\n",
g_Dbs.db[i].superTbls[j].tagsFile);
fprintf(fp, " columnCount: %d\n ", g_Dbs.db[i].superTbls[j].columnCount);
fprintf(fp, " columnCount: %d\n ",
g_Dbs.db[i].superTbls[j].columnCount);
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
if ((0 == strncasecmp(
g_Dbs.db[i].superTbls[j].columns[k].dataType,
"binary", strlen("binary")))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType,
|| (0 == strncasecmp(
g_Dbs.db[i].superTbls[j].columns[k].dataType,
"nchar", strlen("nchar")))) {
fprintf(fp, "column[%d]:%s(%d) ", k,
g_Dbs.db[i].superTbls[j].columns[k].dataType,
g_Dbs.db[i].superTbls[j].columns[k].dataLen);
} else {
fprintf(fp, "column[%d]:%s ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType);
fprintf(fp, "column[%d]:%s ",
k, g_Dbs.db[i].superTbls[j].columns[k].dataType);
}
}
fprintf(fp, "\n");
......@@ -1634,7 +1666,8 @@ static void printfInsertMetaToFile(FILE* fp) {
"binary", strlen("binary")))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType,
"nchar", strlen("nchar")))) {
fprintf(fp, "tag[%d]:%s(%d) ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType,
fprintf(fp, "tag[%d]:%s(%d) ",
k, g_Dbs.db[i].superTbls[j].tags[k].dataType,
g_Dbs.db[i].superTbls[j].tags[k].dataLen);
} else {
fprintf(fp, "tag[%d]:%s ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType);
......@@ -4128,7 +4161,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
"query_times");
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
if (specifiedQueryTimes->valueint <= 0) {
errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
errorPrint(
"%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n",
__func__, __LINE__, specifiedQueryTimes->valueint);
goto PARSE_OVER;
......@@ -4145,7 +4179,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent");
if (concurrent && concurrent->type == cJSON_Number) {
if (concurrent->valueint <= 0) {
errorPrint("%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n",
errorPrint(
"%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.specifiedQueryInfo.concurrent);
......@@ -4184,15 +4219,15 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart");
if (restart && restart->type == cJSON_String && restart->valuestring != NULL) {
if (0 == strcmp("yes", restart->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeRestart = 1;
g_queryInfo.specifiedQueryInfo.subscribeRestart = true;
} else if (0 == strcmp("no", restart->valuestring)) {
g_queryInfo.specifiedQueryInfo.subscribeRestart = 0;
g_queryInfo.specifiedQueryInfo.subscribeRestart = false;
} else {
printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.specifiedQueryInfo.subscribeRestart = 1;
g_queryInfo.specifiedQueryInfo.subscribeRestart = true;
}
cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress");
......@@ -4237,13 +4272,29 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
printf("ERROR: failed to read json, sql not found\n");
goto PARSE_OVER;
}
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j],
sqlStr->valuestring, MAX_QUERY_SQL_LENGTH);
cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
if (resubAfterConsume
&& resubAfterConsume->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint;
} else if (!resubAfterConsume) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = 1;
}
cJSON *result = cJSON_GetObjectItem(sql, "result");
if (NULL != result && result->type == cJSON_String && result->valuestring != NULL) {
tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN);
if ((NULL != result) && (result->type == cJSON_String)
&& (result->valuestring != NULL)) {
tstrncpy(g_queryInfo.specifiedQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) {
memset(g_queryInfo.specifiedQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
memset(g_queryInfo.specifiedQueryInfo.result[j],
0, MAX_FILE_NAME_LEN);
} else {
printf("ERROR: failed to read json, super query result file not found\n");
goto PARSE_OVER;
......@@ -4350,27 +4401,27 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (subrestart && subrestart->type == cJSON_String
&& subrestart->valuestring != NULL) {
if (0 == strcmp("yes", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 1;
g_queryInfo.superQueryInfo.subscribeRestart = true;
} else if (0 == strcmp("no", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 0;
g_queryInfo.superQueryInfo.subscribeRestart = false;
} else {
printf("ERROR: failed to read json, subscribe restart error\n");
goto PARSE_OVER;
}
} else {
g_queryInfo.superQueryInfo.subscribeRestart = 1;
g_queryInfo.superQueryInfo.subscribeRestart = true;
}
cJSON* subkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress");
if (subkeepProgress &&
subkeepProgress->type == cJSON_String
&& subkeepProgress->valuestring != NULL) {
if (0 == strcmp("yes", subkeepProgress->valuestring)) {
cJSON* superkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress");
if (superkeepProgress &&
superkeepProgress->type == cJSON_String
&& superkeepProgress->valuestring != NULL) {
if (0 == strcmp("yes", superkeepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 1;
} else if (0 == strcmp("no", subkeepProgress->valuestring)) {
} else if (0 == strcmp("no", superkeepProgress->valuestring)) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} else {
printf("ERROR: failed to read json, subscribe keepProgress error\n");
printf("ERROR: failed to read json, subscribe super table keepProgress error\n");
goto PARSE_OVER;
}
} else {
......@@ -4408,6 +4459,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
MAX_QUERY_SQL_LENGTH);
cJSON* superResubAfterConsume =
cJSON_GetObjectItem(sql, "resubAfterConsume");
if (superResubAfterConsume
&& superResubAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.resubAfterConsume[j] =
superResubAfterConsume->valueint;
} else if (!superResubAfterConsume) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.superQueryInfo.resubAfterConsume[j] = 1;
}
cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String
&& result->valuestring != NULL){
......@@ -5506,7 +5569,8 @@ static void callBack(void *param, TAOS_RES *res, int code) {
int rand_num = taosRandom() % 100;
if (0 != pThreadInfo->superTblInfo->disorderRatio
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) {
int64_t d = pThreadInfo->lastTs - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
int64_t d = pThreadInfo->lastTs
- (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
generateRowData(data, d, pThreadInfo->superTblInfo);
} else {
generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo);
......@@ -6480,17 +6544,18 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
}
static TAOS_SUB* subscribeImpl(
TAOS *taos, char *sql, char* topic, char* resultFileName) {
TAOS *taos, char *sql, char* topic, bool restart,
char* resultFileName) {
TAOS_SUB* tsub = NULL;
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
tsub = taos_subscribe(taos,
g_queryInfo.specifiedQueryInfo.subscribeRestart,
restart,
topic, sql, subscribe_callback, (void*)resultFileName,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
} else {
tsub = taos_subscribe(taos,
g_queryInfo.specifiedQueryInfo.subscribeRestart,
restart,
topic, sql, NULL, NULL, 0);
}
......@@ -6551,7 +6616,9 @@ static void *superSubscribe(void *sarg) {
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
__func__, __LINE__, subSeq, subSqlstr);
tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile);
tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[subSeq]) {
taos_close(pThreadInfo->taos);
return NULL;
......@@ -6559,17 +6626,22 @@ static void *superSubscribe(void *sarg) {
}
}
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
// start loop to consume result
TAOS_RES* res = NULL;
while(1) {
for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
continue;
}
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taosMsleep(100); // ms
taosMsleep(g_queryInfo.superQueryInfo.subscribeInterval); // ms
res = taos_consume(tsub[subSeq]);
if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
......@@ -6579,6 +6651,28 @@ static void *superSubscribe(void *sarg) {
pThreadInfo->threadID);
appendResultToFile(res, tmpFile);
}
consumed[j] ++;
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
&& (consumed[j] >=
g_queryInfo.superQueryInfo.resubAfterConsume[j])) {
printf("keepProgress:%d, resub super table query: %d\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress, j);
taos_unsubscribe(tsub[subSeq],
g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[j]= 0;
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
__func__, __LINE__, subSeq, subSqlstr);
tsub[subSeq] = subscribeImpl(
pThreadInfo->taos, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[subSeq]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
}
}
}
......@@ -6589,8 +6683,7 @@ static void *superSubscribe(void *sarg) {
i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taos_unsubscribe(tsub[subSeq],
g_queryInfo.superQueryInfo.subscribeKeepProgress);
taos_unsubscribe(tsub[subSeq], 0);
}
}
......@@ -6635,40 +6728,68 @@ static void *specifiedSubscribe(void *sarg) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
g_queryInfo.specifiedQueryInfo.result[i],
pThreadInfo->threadID);
}
tsub[i] = subscribeImpl(pThreadInfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile);
g_queryInfo.specifiedQueryInfo.sql[i], topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
// start loop to consume result
TAOS_RES* res = NULL;
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
while(1) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue;
}
taosMsleep(1000); // ms
taosMsleep(g_queryInfo.specifiedQueryInfo.subscribeInterval); // ms
res = taos_consume(tsub[i]);
if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID);
g_queryInfo.specifiedQueryInfo.result[i],
pThreadInfo->threadID);
appendResultToFile(res, tmpFile);
}
consumed[i] ++;
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
&& (consumed[i] >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[i])) {
printf("keepProgress:%d, resub specified query: %d\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, i);
consumed[i] = 0;
taos_unsubscribe(tsub[i],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
tsub[i] = subscribeImpl(pThreadInfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i], topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
}
}
}
taos_free_result(res);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
taos_unsubscribe(tsub[i],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
taos_unsubscribe(tsub[i], 0);
}
taos_close(pThreadInfo->taos);
......@@ -6719,8 +6840,10 @@ static int subscribeTestProcess() {
exit(-1);
}
pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t));
infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo));
pids = malloc(
g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t));
infos = malloc(
g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
exit(-1);
......@@ -6905,17 +7028,21 @@ static void setParaFromArg(){
if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) {
g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR;
} else {
for (int i = g_Dbs.db[0].superTbls[0].columnCount; i < g_args.num_of_CPR; i++) {
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType, "INT", MAX_TB_NAME_SIZE);
for (int i = g_Dbs.db[0].superTbls[0].columnCount;
i < g_args.num_of_CPR; i++) {
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
"INT", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0;
g_Dbs.db[0].superTbls[0].columnCount++;
}
}
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType, "INT", MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType,
"INT", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0;
tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType, "BINARY", MAX_TB_NAME_SIZE);
tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType,
"BINARY", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].tagCount = 2;
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册