未验证 提交 78b8473e 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 4238 taosdemo async subscribe (#6162)

* [TD-4238]<fix>: taosdemo async subscribe.

* [TD-4238]<fix>: taosdemo async subscribe

subsribe sql command do not use aggregation functions.

* [TD-4238]<fix>: taosdemo async subscribe.

interval.

* [TD-4238]<fix>: taosdemo async subscribe.

fix super table sub result file.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 5c03dc9a
...@@ -114,12 +114,18 @@ typedef enum TALBE_EXISTS_EN { ...@@ -114,12 +114,18 @@ typedef enum TALBE_EXISTS_EN {
TBL_EXISTS_BUTT TBL_EXISTS_BUTT
} TALBE_EXISTS_EN; } TALBE_EXISTS_EN;
enum MODE { enum enumSYNC_MODE {
SYNC_MODE, SYNC_MODE,
ASYNC_MODE, ASYNC_MODE,
MODE_BUT MODE_BUT
}; };
typedef enum enumQUERY_CLASS {
SPECIFIED_CLASS,
STABLE_CLASS,
CLASS_BUT
} QUERY_CLASS;
typedef enum enum_INSERT_MODE { typedef enum enum_INSERT_MODE {
PROGRESSIVE_INSERT_MODE, PROGRESSIVE_INSERT_MODE,
INTERLACE_INSERT_MODE, INTERLACE_INSERT_MODE,
...@@ -6557,18 +6563,21 @@ static void specified_sub_callback( ...@@ -6557,18 +6563,21 @@ static void specified_sub_callback(
} }
static TAOS_SUB* subscribeImpl( static TAOS_SUB* subscribeImpl(
QUERY_CLASS class,
threadInfo *pThreadInfo, threadInfo *pThreadInfo,
char *sql, char* topic, bool restart) char *sql, char* topic, bool restart, uint64_t interval)
{ {
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if ((SPECIFIED_CLASS == class)
&& (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode)) {
tsub = taos_subscribe( tsub = taos_subscribe(
pThreadInfo->taos, pThreadInfo->taos,
restart, restart,
topic, sql, specified_sub_callback, (void*)pThreadInfo, topic, sql, specified_sub_callback, (void*)pThreadInfo,
g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.subscribeInterval);
} else if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { } else if ((STABLE_CLASS == class)
&& (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode)) {
tsub = taos_subscribe( tsub = taos_subscribe(
pThreadInfo->taos, pThreadInfo->taos,
restart, restart,
...@@ -6578,7 +6587,7 @@ static TAOS_SUB* subscribeImpl( ...@@ -6578,7 +6587,7 @@ static TAOS_SUB* subscribeImpl(
tsub = taos_subscribe( tsub = taos_subscribe(
pThreadInfo->taos, pThreadInfo->taos,
restart, restart,
topic, sql, NULL, NULL, 0); topic, sql, NULL, NULL, interval);
} }
if (tsub == NULL) { if (tsub == NULL) {
...@@ -6629,9 +6638,14 @@ static void *superSubscribe(void *sarg) { ...@@ -6629,9 +6638,14 @@ static void *superSubscribe(void *sarg) {
char topic[32] = {0}; char topic[32] = {0};
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n",
__func__, __LINE__,
pThreadInfo->threadID,
pThreadInfo->start_table_from,
pThreadInfo->end_table_to, i);
sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"", sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"",
i, pThreadInfo->querySeq); i, pThreadInfo->querySeq);
memset(subSqlstr,0,sizeof(subSqlstr)); memset(subSqlstr, 0, sizeof(subSqlstr));
replaceChildTblName( replaceChildTblName(
g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq], g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq],
subSqlstr, i); subSqlstr, i);
...@@ -6644,8 +6658,10 @@ static void *superSubscribe(void *sarg) { ...@@ -6644,8 +6658,10 @@ static void *superSubscribe(void *sarg) {
debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
__func__, __LINE__, pThreadInfo->threadID, subSqlstr); __func__, __LINE__, pThreadInfo->threadID, subSqlstr);
tsub[i] = subscribeImpl( tsub[i] = subscribeImpl(
STABLE_CLASS,
pThreadInfo, subSqlstr, topic, pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart); g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval);
if (NULL == tsub[i]) { if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
...@@ -6687,8 +6703,10 @@ static void *superSubscribe(void *sarg) { ...@@ -6687,8 +6703,10 @@ static void *superSubscribe(void *sarg) {
g_queryInfo.superQueryInfo.subscribeKeepProgress); g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[i]= 0; consumed[i]= 0;
tsub[i] = subscribeImpl( tsub[i] = subscribeImpl(
STABLE_CLASS,
pThreadInfo, subSqlstr, topic, pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval
); );
if (NULL == tsub[i]) { if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
...@@ -6744,10 +6762,12 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6744,10 +6762,12 @@ static void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID); pThreadInfo->threadID);
} }
tsub = subscribeImpl(pThreadInfo, tsub = subscribeImpl(
SPECIFIED_CLASS, pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
topic, topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart); g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == tsub) { if (NULL == tsub) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
...@@ -6777,11 +6797,12 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6777,11 +6797,12 @@ static void *specifiedSubscribe(void *sarg) {
taos_unsubscribe(tsub, taos_unsubscribe(tsub,
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
tsub = subscribeImpl( tsub = subscribeImpl(
SPECIFIED_CLASS,
pThreadInfo, pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
topic, topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart g_queryInfo.specifiedQueryInfo.subscribeRestart,
); g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == tsub) { if (NULL == tsub) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
...@@ -6833,7 +6854,7 @@ static int subscribeTestProcess() { ...@@ -6833,7 +6854,7 @@ static int subscribeTestProcess() {
//==== create threads for query for specified table //==== create threads for query for specified table
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", debugPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
__func__, __LINE__, __func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount); g_queryInfo.specifiedQueryInfo.sqlCount);
} else { } else {
...@@ -6870,10 +6891,10 @@ static int subscribeTestProcess() { ...@@ -6870,10 +6891,10 @@ static int subscribeTestProcess() {
} }
//==== create threads for super table query //==== create threads for super table query
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { if (g_queryInfo.superQueryInfo.sqlCount <= 0) {
printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", printf("%s() LN%d, super table query sqlCount %"PRIu64".\n",
__func__, __LINE__, __func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount); g_queryInfo.superQueryInfo.sqlCount);
} else { } else {
if ((g_queryInfo.superQueryInfo.sqlCount > 0) if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) { && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
...@@ -6906,8 +6927,8 @@ static int subscribeTestProcess() { ...@@ -6906,8 +6927,8 @@ static int subscribeTestProcess() {
b = ntables % threads; b = ntables % threads;
} }
uint64_t startFrom = 0;
for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
uint64_t startFrom = 0;
for (int j = 0; j < threads; j++) { for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j; uint64_t seq = i * threads + j;
threadInfo *t_info = infosOfStable + seq; threadInfo *t_info = infosOfStable + seq;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册