未验证 提交 db03be64 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-4296]<fix>: taosdemo sub keepProgress. (#6200)

Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 696c7843
...@@ -391,7 +391,7 @@ typedef struct SuperQueryInfo_S { ...@@ -391,7 +391,7 @@ typedef struct SuperQueryInfo_S {
uint64_t sqlCount; uint64_t sqlCount;
char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1];
char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1];
int resubAfterConsume[MAX_QUERY_SQL_COUNT]; int resubAfterConsume;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT];
char* childTblName; char* childTblName;
...@@ -4447,6 +4447,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4447,6 +4447,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; g_queryInfo.superQueryInfo.subscribeKeepProgress = 0;
} }
cJSON* superResubAfterConsume =
cJSON_GetObjectItem(superQuery, "resubAfterConsume");
if (superResubAfterConsume
&& superResubAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.resubAfterConsume =
superResubAfterConsume->valueint;
} else if (!superResubAfterConsume) {
//printf("failed to read json, subscribe interval no found\n");
////goto PARSE_OVER;
g_queryInfo.superQueryInfo.resubAfterConsume = 1;
}
// sqls // sqls
cJSON* subsqls = cJSON_GetObjectItem(superQuery, "sqls"); cJSON* subsqls = cJSON_GetObjectItem(superQuery, "sqls");
if (!subsqls) { if (!subsqls) {
...@@ -4478,18 +4490,6 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4478,18 +4490,6 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
MAX_QUERY_SQL_LENGTH); MAX_QUERY_SQL_LENGTH);
cJSON* superResubAfterConsume =
cJSON_GetObjectItem(sql, "resubAfterConsume");
if (superResubAfterConsume
&& superResubAfterConsume->type == cJSON_Number) {
g_queryInfo.superQueryInfo.resubAfterConsume[j] =
superResubAfterConsume->valueint;
} else if (!superResubAfterConsume) {
//printf("failed to read json, subscribe interval no found\n");
//goto PARSE_OVER;
g_queryInfo.superQueryInfo.resubAfterConsume[j] = 1;
}
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String if (result != NULL && result->type == cJSON_String
&& result->valuestring != NULL){ && result->valuestring != NULL){
...@@ -6688,6 +6688,8 @@ static void *superSubscribe(void *sarg) { ...@@ -6688,6 +6688,8 @@ static void *superSubscribe(void *sarg) {
} }
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
uint64_t st = 0, et = 0;
while(1) { while(1) {
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
...@@ -6696,7 +6698,12 @@ static void *superSubscribe(void *sarg) { ...@@ -6696,7 +6698,12 @@ static void *superSubscribe(void *sarg) {
continue; continue;
} }
st = taosGetTimestampMs();
performancePrint("st: %"PRIu64" et: %"PRIu64" st-et: %"PRIu64"\n", st, et, (st - et));
res = taos_consume(tsub[tsubSeq]); res = taos_consume(tsub[tsubSeq]);
et = taosGetTimestampMs();
performancePrint("st: %"PRIu64" et: %"PRIu64" delta: %"PRIu64"\n", st, et, (et - st));
if (res) { if (res) {
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d", sprintf(pThreadInfo->fp, "%s-%d",
...@@ -6714,7 +6721,7 @@ static void *superSubscribe(void *sarg) { ...@@ -6714,7 +6721,7 @@ static void *superSubscribe(void *sarg) {
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
&& (consumed[tsubSeq] >= && (consumed[tsubSeq] >=
g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { g_queryInfo.superQueryInfo.resubAfterConsume)) {
printf("keepProgress:%d, resub super table query: %"PRIu64"\n", printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress, g_queryInfo.superQueryInfo.subscribeKeepProgress,
pThreadInfo->querySeq); pThreadInfo->querySeq);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册