未验证 提交 95139cf4 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-4130]<fix>: taosdemo subscribe super table. (#6086)

Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 9c0e8c9d
...@@ -68,12 +68,6 @@ enum TEST_MODE { ...@@ -68,12 +68,6 @@ enum TEST_MODE {
INVAID_TEST INVAID_TEST
}; };
enum QUERY_MODE {
SYNC_QUERY_MODE, // 0
ASYNC_QUERY_MODE, // 1
INVALID_MODE
};
#define MAX_RECORDS_PER_REQ 32766 #define MAX_RECORDS_PER_REQ 32766
#define MAX_SQL_SIZE 65536 #define MAX_SQL_SIZE 65536
...@@ -1107,6 +1101,7 @@ static void appendResultBufToFile(char *resultBuf, char *resultFile) ...@@ -1107,6 +1101,7 @@ static void appendResultBufToFile(char *resultBuf, char *resultFile)
} }
} }
fprintf(fp, "%s", resultBuf); fprintf(fp, "%s", resultBuf);
tmfclose(fp); tmfclose(fp);
} }
...@@ -1142,6 +1137,7 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) { ...@@ -1142,6 +1137,7 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) {
totalLen += len; totalLen += len;
} }
verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n", __func__, __LINE__, databuf, resultFile);
appendResultBufToFile(databuf, resultFile); appendResultBufToFile(databuf, resultFile);
free(databuf); free(databuf);
} }
...@@ -6517,59 +6513,63 @@ static void *superSubscribe(void *sarg) { ...@@ -6517,59 +6513,63 @@ static void *superSubscribe(void *sarg) {
return NULL; return NULL;
} }
//int64_t st = 0;
//int64_t et = 0;
do {
//if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) < g_queryInfo.specifiedQueryInfo.queryInterval) {
// taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval- (et - st)); // ms
// //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to);
//}
//st = taosGetTimestampMs();
char topic[32] = {0}; char topic[32] = {0};
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (uint64_t i = pThreadInfo->start_table_from;
sprintf(topic, "taosdemo-subscribe-%d", i); i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%d", i, j);
memset(subSqlstr,0,sizeof(subSqlstr)); memset(subSqlstr,0,sizeof(subSqlstr));
replaceChildTblName(g_queryInfo.superQueryInfo.sql[i], subSqlstr, i); replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], subSqlstr, i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[i][0] != 0) { if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.superQueryInfo.result[i], pThreadInfo->threadID); g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID);
} }
tsub[i] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile);
if (NULL == tsub[i]) { uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
__func__, __LINE__, subSeq, subSqlstr);
tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile);
if (NULL == tsub[subSeq]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
} }
//et = taosGetTimestampMs(); }
//printf("========thread[%"PRIu64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0);
} while(0);
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
continue; continue;
} }
res = taos_consume(tsub[i]); uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taosMsleep(100); // ms
res = taos_consume(tsub[subSeq]);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[i][0] != 0) { if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.superQueryInfo.result[i], g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID); pThreadInfo->threadID);
appendResultToFile(res, tmpFile); appendResultToFile(res, tmpFile);
} }
} }
} }
} }
}
taos_free_result(res); taos_free_result(res);
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (uint64_t i = pThreadInfo->start_table_from;
taos_unsubscribe(tsub[i], g_queryInfo.superQueryInfo.subscribeKeepProgress); i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taos_unsubscribe(tsub[subSeq],
g_queryInfo.superQueryInfo.subscribeKeepProgress);
}
} }
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
...@@ -6607,15 +6607,6 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6607,15 +6607,6 @@ static void *specifiedSubscribe(void *sarg) {
return NULL; return NULL;
} }
//int64_t st = 0;
//int64_t et = 0;
do {
//if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) < g_queryInfo.specifiedQueryInfo.queryInterval) {
// taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval- (et - st)); // ms
// //printf("========sleep duration:%"PRIu64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, pThreadInfo->start_table_from, pThreadInfo->end_table_to);
//}
//st = taosGetTimestampMs();
char topic[32] = {0}; char topic[32] = {0};
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
sprintf(topic, "taosdemo-subscribe-%d", i); sprintf(topic, "taosdemo-subscribe-%d", i);
...@@ -6631,10 +6622,6 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6631,10 +6622,6 @@ static void *specifiedSubscribe(void *sarg) {
return NULL; return NULL;
} }
} }
//et = taosGetTimestampMs();
//printf("========thread[%"PRIu64"] complete all sqls to super table once queries duration:%.4fs\n", taosGetSelfPthreadId(), (double)(et - st)/1000.0);
} while(0);
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
...@@ -6643,6 +6630,7 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6643,6 +6630,7 @@ static void *specifiedSubscribe(void *sarg) {
continue; continue;
} }
taosMsleep(1000); // ms
res = taos_consume(tsub[i]); res = taos_consume(tsub[i]);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
...@@ -6699,13 +6687,16 @@ static int subscribeTestProcess() { ...@@ -6699,13 +6687,16 @@ static int subscribeTestProcess() {
pthread_t *pids = NULL; pthread_t *pids = NULL;
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query from super table //==== create sub threads for query for specified table
if ((g_queryInfo.specifiedQueryInfo.sqlCount <= 0) || if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
(g_queryInfo.specifiedQueryInfo.concurrent <= 0)) { printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
errorPrint("%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n",
__func__, __LINE__, __func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount, g_queryInfo.specifiedQueryInfo.sqlCount);
g_queryInfo.specifiedQueryInfo.concurrent); } else {
if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) {
errorPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount);
exit(-1); exit(-1);
} }
...@@ -6722,8 +6713,9 @@ static int subscribeTestProcess() { ...@@ -6722,8 +6713,9 @@ static int subscribeTestProcess() {
t_info->taos = NULL; // TODO: workaround to use separate taos connection; t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + i, NULL, specifiedSubscribe, t_info); pthread_create(pids + i, NULL, specifiedSubscribe, t_info);
} }
}
//==== create sub threads for query from sub table //==== create sub threads for super table query
pthread_t *pidsOfSub = NULL; pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL; threadInfo *infosOfSub = NULL;
if ((g_queryInfo.superQueryInfo.sqlCount > 0) if ((g_queryInfo.superQueryInfo.sqlCount > 0)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册