未验证 提交 fc394200 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 4240 taosdemo subscribe toomuch for develop (#6193)

* [TD-4240]<fix>: taosdemo subscribe more than max query sql count.

* [TD-4240]<fix>: taosdemo subscribe more than 100.

fix tsub sequence bug.

* [TD-4240]<fix>: taosdemo subscribe more than 100.

fix auto create table.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 c74651a6
...@@ -4890,10 +4890,12 @@ static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k) ...@@ -4890,10 +4890,12 @@ static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k)
return affectedRows; return affectedRows;
} }
static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t tableSeq) static void getTableName(char *pTblName,
threadInfo* pThreadInfo, uint64_t tableSeq)
{ {
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
if (superTblInfo) { if ((superTblInfo)
&& (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable)) {
if (superTblInfo->childTblLimit > 0) { if (superTblInfo->childTblLimit > 0) {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + superTblInfo->childTblName +
...@@ -6768,6 +6770,7 @@ static void *superSubscribe(void *sarg) { ...@@ -6768,6 +6770,7 @@ static void *superSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
char subSqlstr[MAX_QUERY_SQL_LENGTH]; char subSqlstr[MAX_QUERY_SQL_LENGTH];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
uint64_t tsubSeq;
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
...@@ -6776,6 +6779,15 @@ static void *superSubscribe(void *sarg) { ...@@ -6776,6 +6779,15 @@ static void *superSubscribe(void *sarg) {
exit(-1); exit(-1);
} }
if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n",
g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables,
g_queryInfo.superQueryInfo.sqlCount,
pThreadInfo->ntables,
MAX_QUERY_SQL_COUNT);
exit(-1);
}
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL; TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, taos = taos_connect(g_queryInfo.host,
...@@ -6804,6 +6816,8 @@ static void *superSubscribe(void *sarg) { ...@@ -6804,6 +6816,8 @@ static void *superSubscribe(void *sarg) {
char topic[32] = {0}; char topic[32] = {0};
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
tsubSeq = i - pThreadInfo->start_table_from;
verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n", verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n",
__func__, __LINE__, __func__, __LINE__,
pThreadInfo->threadID, pThreadInfo->threadID,
...@@ -6823,12 +6837,12 @@ static void *superSubscribe(void *sarg) { ...@@ -6823,12 +6837,12 @@ static void *superSubscribe(void *sarg) {
debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
__func__, __LINE__, pThreadInfo->threadID, subSqlstr); __func__, __LINE__, pThreadInfo->threadID, subSqlstr);
tsub[i] = subscribeImpl( tsub[tsubSeq] = subscribeImpl(
STABLE_CLASS, STABLE_CLASS,
pThreadInfo, subSqlstr, topic, pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval); g_queryInfo.superQueryInfo.subscribeInterval);
if (NULL == tsub[i]) { if (NULL == tsub[tsubSeq]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
...@@ -6844,54 +6858,56 @@ static void *superSubscribe(void *sarg) { ...@@ -6844,54 +6858,56 @@ static void *superSubscribe(void *sarg) {
while(1) { while(1) {
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { tsubSeq = i - pThreadInfo->start_table_from;
continue; if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
} continue;
}
res = taos_consume(tsub[i]);
if (res) {
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
appendResultToFile(res, pThreadInfo->fp);
}
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
appendResultToFile(res, pThreadInfo->fp);
}
consumed[i] ++;
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) res = taos_consume(tsub[tsubSeq]);
&& (consumed[i] >= if (res) {
g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
printf("keepProgress:%d, resub super table query: %"PRIu64"\n", sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.subscribeKeepProgress, g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->querySeq); pThreadInfo->threadID);
taos_unsubscribe(tsub, appendResultToFile(res, pThreadInfo->fp);
}
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
appendResultToFile(res, pThreadInfo->fp);
}
consumed[tsubSeq] ++;
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
&& (consumed[tsubSeq] >=
g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress,
pThreadInfo->querySeq);
taos_unsubscribe(tsub[tsubSeq],
g_queryInfo.superQueryInfo.subscribeKeepProgress); g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[i]= 0; consumed[tsubSeq]= 0;
tsub[i] = subscribeImpl( tsub[tsubSeq] = subscribeImpl(
STABLE_CLASS, STABLE_CLASS,
pThreadInfo, subSqlstr, topic, pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval g_queryInfo.superQueryInfo.subscribeInterval
); );
if (NULL == tsub[i]) { if (NULL == tsub[tsubSeq]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
} }
} }
} }
} }
taos_free_result(res); taos_free_result(res);
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
taos_unsubscribe(tsub[i], 0); tsubSeq = i - pThreadInfo->start_table_from;
taos_unsubscribe(tsub[tsubSeq], 0);
} }
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册