未验证 提交 ea1777d5 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-4918]<fix>: taosdemo subscribe endAfterResume. (#6654)

Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 159d3203
...@@ -4408,14 +4408,12 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4408,14 +4408,12 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* resubAfterConsume = cJSON* resubAfterConsume =
cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume"); cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume");
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
if ((resubAfterConsume) if ((resubAfterConsume)
&& (resubAfterConsume->type == cJSON_Number) && (resubAfterConsume->type == cJSON_Number)
&& (resubAfterConsume->valueint >= 0)) { && (resubAfterConsume->valueint >= 0)) {
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] g_queryInfo.specifiedQueryInfo.resubAfterConsume[j]
= resubAfterConsume->valueint; = resubAfterConsume->valueint;
} else if (!resubAfterConsume) {
// default value is -1, which mean do not resub
g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = -1;
} }
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
...@@ -4572,14 +4570,12 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { ...@@ -4572,14 +4570,12 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
cJSON* superResubAfterConsume = cJSON* superResubAfterConsume =
cJSON_GetObjectItem(superQuery, "resubAfterConsume"); cJSON_GetObjectItem(superQuery, "resubAfterConsume");
g_queryInfo.superQueryInfo.resubAfterConsume = -1;
if ((superResubAfterConsume) if ((superResubAfterConsume)
&& (superResubAfterConsume->type == cJSON_Number) && (superResubAfterConsume->type == cJSON_Number)
&& (superResubAfterConsume->valueint >= 0)) { && (superResubAfterConsume->valueint >= 0)) {
g_queryInfo.superQueryInfo.resubAfterConsume = g_queryInfo.superQueryInfo.resubAfterConsume =
superResubAfterConsume->valueint; superResubAfterConsume->valueint;
} else if (!superResubAfterConsume) {
// default value is -1, which mean do not resub
g_queryInfo.superQueryInfo.resubAfterConsume = -1;
} }
// supert table sqls // supert table sqls
...@@ -7222,151 +7218,159 @@ static TAOS_SUB* subscribeImpl( ...@@ -7222,151 +7218,159 @@ static TAOS_SUB* subscribeImpl(
} }
if (tsub == NULL) { if (tsub == NULL) {
printf("failed to create subscription. topic:%s, sql:%s\n", topic, sql); errorPrint("failed to create subscription. topic:%s, sql:%s\n", topic, sql);
return NULL; return NULL;
} }
return tsub; return tsub;
} }
static void *superSubscribe(void *sarg) { static void *superSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
char subSqlstr[MAX_QUERY_SQL_LENGTH]; char subSqlstr[MAX_QUERY_SQL_LENGTH];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
uint64_t tsubSeq; uint64_t tsubSeq;
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
pThreadInfo->ntables, pThreadInfo->ntables, MAX_QUERY_SQL_COUNT);
MAX_QUERY_SQL_COUNT); exit(-1);
exit(-1); }
}
if (pThreadInfo->taos == NULL) {
pThreadInfo->taos = taos_connect(g_queryInfo.host,
g_queryInfo.user,
g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n", pThreadInfo->taos = taos_connect(g_queryInfo.host,
pThreadInfo->threadID, taos_errstr(NULL)); g_queryInfo.user,
return NULL; g_queryInfo.password,
g_queryInfo.dbName,
g_queryInfo.port);
if (pThreadInfo->taos == NULL) {
errorPrint("[%d] Failed to connect to TDengine, reason:%s\n",
pThreadInfo->threadID, taos_errstr(NULL));
return NULL;
}
} }
}
char sqlStr[MAX_TB_NAME_SIZE*2]; char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName); sprintf(sqlStr, "use %s", g_queryInfo.dbName);
if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) { if (0 != queryDbExec(pThreadInfo->taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
errorPrint( "use database %s failed!\n\n", errorPrint( "use database %s failed!\n\n",
g_queryInfo.dbName); g_queryInfo.dbName);
return NULL; return NULL;
} }
char topic[32] = {0}; char topic[32] = {0};
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
tsubSeq = i - pThreadInfo->start_table_from;
verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n",
__func__, __LINE__,
pThreadInfo->threadID,
pThreadInfo->start_table_from,
pThreadInfo->end_table_to, i);
sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"",
i, pThreadInfo->querySeq);
memset(subSqlstr, 0, sizeof(subSqlstr));
replaceChildTblName(
g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq],
subSqlstr, i);
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
tsubSeq = i - pThreadInfo->start_table_from; debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n", __func__, __LINE__, pThreadInfo->threadID, subSqlstr);
__func__, __LINE__, tsub[tsubSeq] = subscribeImpl(
pThreadInfo->threadID, STABLE_CLASS,
pThreadInfo->start_table_from, pThreadInfo, subSqlstr, topic,
pThreadInfo->end_table_to, i); g_queryInfo.superQueryInfo.subscribeRestart,
sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"", g_queryInfo.superQueryInfo.subscribeInterval);
i, pThreadInfo->querySeq); if (NULL == tsub[tsubSeq]) {
memset(subSqlstr, 0, sizeof(subSqlstr)); taos_close(pThreadInfo->taos);
replaceChildTblName( return NULL;
g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq], }
subSqlstr, i); }
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", // start loop to consume result
__func__, __LINE__, pThreadInfo->threadID, subSqlstr); int consumed[MAX_QUERY_SQL_COUNT];
tsub[tsubSeq] = subscribeImpl( for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) {
STABLE_CLASS, consumed[i] = 0;
pThreadInfo, subSqlstr, topic, }
g_queryInfo.superQueryInfo.subscribeRestart, TAOS_RES* res = NULL;
g_queryInfo.superQueryInfo.subscribeInterval);
if (NULL == tsub[tsubSeq]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
// start loop to consume result uint64_t st = 0, et = 0;
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) {
consumed[i] = 0;
}
TAOS_RES* res = NULL;
uint64_t st = 0, et = 0; while ((g_queryInfo.superQueryInfo.endAfterConsume == -1)
|| (g_queryInfo.superQueryInfo.endAfterConsume >
consumed[pThreadInfo->end_table_to
- pThreadInfo->start_table_from])) {
while ((g_queryInfo.superQueryInfo.endAfterConsume == -1) verbosePrint("super endAfterConsume: %d, consumed: %d\n",
|| (g_queryInfo.superQueryInfo.endAfterConsume < g_queryInfo.superQueryInfo.endAfterConsume,
consumed[pThreadInfo->end_table_to - pThreadInfo->start_table_from])) { consumed[pThreadInfo->end_table_to
- pThreadInfo->start_table_from]);
for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) {
tsubSeq = i - pThreadInfo->start_table_from;
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
continue;
}
st = taosGetTimestampMs();
performancePrint("st: %"PRIu64" et: %"PRIu64" st-et: %"PRIu64"\n", st, et, (st - et));
res = taos_consume(tsub[tsubSeq]);
et = taosGetTimestampMs();
performancePrint("st: %"PRIu64" et: %"PRIu64" delta: %"PRIu64"\n", st, et, (et - st));
if (res) {
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
fetchResult(res, pThreadInfo);
}
consumed[tsubSeq] ++;
if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1)
&& (consumed[tsubSeq] >=
g_queryInfo.superQueryInfo.resubAfterConsume)) {
verbosePrint("%s() LN%d, keepProgress:%d, resub super table query: %"PRIu64"\n",
__func__, __LINE__,
g_queryInfo.superQueryInfo.subscribeKeepProgress,
pThreadInfo->querySeq);
taos_unsubscribe(tsub[tsubSeq],
g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[tsubSeq]= 0;
tsub[tsubSeq] = subscribeImpl(
STABLE_CLASS,
pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval
);
if (NULL == tsub[tsubSeq]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
}
}
}
verbosePrint("%s() LN%d, super endAfterConsume: %d, consumed: %d\n",
__func__, __LINE__,
g_queryInfo.superQueryInfo.endAfterConsume,
consumed[pThreadInfo->end_table_to - pThreadInfo->start_table_from]);
taos_free_result(res);
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
tsubSeq = i - pThreadInfo->start_table_from; tsubSeq = i - pThreadInfo->start_table_from;
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { taos_unsubscribe(tsub[tsubSeq], 0);
continue;
}
st = taosGetTimestampMs();
performancePrint("st: %"PRIu64" et: %"PRIu64" st-et: %"PRIu64"\n", st, et, (st - et));
res = taos_consume(tsub[tsubSeq]);
et = taosGetTimestampMs();
performancePrint("st: %"PRIu64" et: %"PRIu64" delta: %"PRIu64"\n", st, et, (et - st));
if (res) {
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
fetchResult(res, pThreadInfo);
}
consumed[tsubSeq] ++;
if ((g_queryInfo.superQueryInfo.resubAfterConsume != -1)
&& (consumed[tsubSeq] >=
g_queryInfo.superQueryInfo.resubAfterConsume)) {
printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress,
pThreadInfo->querySeq);
taos_unsubscribe(tsub[tsubSeq],
g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[tsubSeq]= 0;
tsub[tsubSeq] = subscribeImpl(
STABLE_CLASS,
pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval
);
if (NULL == tsub[tsubSeq]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
}
} }
}
taos_free_result(res);
for (uint64_t i = pThreadInfo->start_table_from; taos_close(pThreadInfo->taos);
i <= pThreadInfo->end_table_to; i++) { return NULL;
tsubSeq = i - pThreadInfo->start_table_from;
taos_unsubscribe(tsub[tsubSeq], 0);
}
taos_close(pThreadInfo->taos);
return NULL;
} }
static void *specifiedSubscribe(void *sarg) { static void *specifiedSubscribe(void *sarg) {
...@@ -7420,8 +7424,13 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -7420,8 +7424,13 @@ static void *specifiedSubscribe(void *sarg) {
|| (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] < || (g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] <
g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq])) { g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq])) {
printf("consumed[%d]: %d, endAfterConsum[%"PRId64"]: %d\n",
pThreadInfo->threadID,
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID],
pThreadInfo->querySeq,
g_queryInfo.specifiedQueryInfo.endAfterConsume[pThreadInfo->querySeq]);
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue; continue;
} }
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume( g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册