未验证 提交 5c03dc9a 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 4238 taosdemo async subscribe (#6161)

* [TD-4238]<fix>: taosdemo async subscribe.

* [TD-4238]<fix>: taosdemo async subscribe

subsribe sql command do not use aggregation functions.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 48f49de9
{
"filetype": "subscribe",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"databases": "test",
"specified_table_query": {
"concurrent": 1,
"mode": "async",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"resubAfterConsume": 10,
"sqls": [
{
"sql": "select col1 from meters where col1 > 1;",
"result": "./subscribe_res0.txt"
},
{
"sql": "select col2 from meters where col2 > 1;",
"result": "./subscribe_res2.txt"
}
]
},
"super_table_query": {
"stblname": "meters",
"threads": 1,
"mode": "sync",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"sqls": [
{
"sql": "select col1 from xxxx where col1 > 10;",
"result": "./subscribe_res1.txt"
}
]
}
}
......@@ -441,8 +441,9 @@ typedef struct SThreadInfo_S {
uint64_t maxDelay;
uint64_t minDelay;
// query
// seq of query or subscribe
uint64_t querySeq; // sequence number of sql command
} threadInfo;
#ifdef WINDOWS
......@@ -1107,7 +1108,6 @@ static void appendResultBufToFile(char *resultBuf, char *resultFile)
}
}
fprintf(fp, "%s", resultBuf);
tmfclose(fp);
}
......@@ -1150,7 +1150,7 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) {
}
static void selectAndGetResult(
threadInfo *pThreadInfo, char *command, char* resultFile)
threadInfo *pThreadInfo, char *command)
{
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) {
TAOS_RES *res = taos_query(pThreadInfo->taos, command);
......@@ -1161,8 +1161,8 @@ static void selectAndGetResult(
return;
}
if ((resultFile) && (strlen(resultFile))) {
appendResultToFile(res, resultFile);
if ((strlen(pThreadInfo->fp))) {
appendResultToFile(res, pThreadInfo->fp);
}
taos_free_result(res);
......@@ -1170,7 +1170,7 @@ static void selectAndGetResult(
int retCode = postProceSql(
g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port,
command,
resultFile);
pThreadInfo->fp);
if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID);
}
......@@ -6230,23 +6230,22 @@ static void *specifiedTableQuery(void *sarg) {
uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
while(queryTimes --) {
if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) <
(int64_t)g_queryInfo.specifiedQueryInfo.queryInterval) {
taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval - (et - st)); // ms
}
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
st = taosGetTimestampMs();
selectAndGetResult(pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], tmpFile);
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]);
et = taosGetTimestampMs();
printf("=thread[%"PRId64"] use %s complete one sql, Spent %10.3f s\n",
......@@ -6332,13 +6331,12 @@ static void *superTableQuery(void *sarg) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr));
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
sprintf(tmpFile, "%s-%d",
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID);
}
selectAndGetResult(pThreadInfo, sqlstr, tmpFile);
selectAndGetResult(pThreadInfo, sqlstr);
totalQueried++;
g_queryInfo.superQueryInfo.totalQueried ++;
......@@ -6407,7 +6405,7 @@ static int queryTestProcess() {
threadInfo *infos = NULL;
//==== create sub threads for query from specify table
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
uint64_t startTs = taosGetTimestampMs();
......@@ -6421,32 +6419,33 @@ static int queryTestProcess() {
ERROR_EXIT("memory allocation failed for create threads\n");
}
for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) {
threadInfo *t_info = infos + i * nSqlCount + j;
t_info->threadID = i * nSqlCount + j;
t_info->querySeq = j;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(taos);
free(infos);
free(pids);
errorPrint( "use database %s failed!\n\n",
g_queryInfo.dbName);
return -1;
}
}
for (uint64_t i = 0; i < nSqlCount; i++) {
for (int j = 0; j < nConcurrent; j++) {
uint64_t seq = i * nConcurrent + j;
threadInfo *t_info = infos + seq;
t_info->threadID = seq;
t_info->querySeq = i;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(taos);
free(infos);
free(pids);
errorPrint( "use database %s failed!\n\n",
g_queryInfo.dbName);
return -1;
}
}
t_info->taos = NULL;// TODO: workaround to use separate taos connection;
t_info->taos = NULL;// TODO: workaround to use separate taos connection;
pthread_create(pids + i * nSqlCount + j, NULL, specifiedTableQuery,
t_info);
}
pthread_create(pids + seq, NULL, specifiedTableQuery,
t_info);
}
}
} else {
g_queryInfo.specifiedQueryInfo.concurrent = 0;
......@@ -6531,7 +6530,21 @@ static int queryTestProcess() {
return 0;
}
static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
static void stable_sub_callback(
TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
if (res == NULL || taos_errno(res) != 0) {
errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n",
__func__, __LINE__, code, taos_errstr(res));
return;
}
if (param)
appendResultToFile(res, ((threadInfo *)param)->fp);
// tao_unscribe() will free result.
}
static void specified_sub_callback(
TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
if (res == NULL || taos_errno(res) != 0) {
errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n",
__func__, __LINE__, code, taos_errstr(res));
......@@ -6539,22 +6552,31 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
}
if (param)
appendResultToFile(res, (char*)param);
appendResultToFile(res, ((threadInfo *)param)->fp);
// tao_unscribe() will free result.
}
static TAOS_SUB* subscribeImpl(
TAOS *taos, char *sql, char* topic, bool restart,
char* resultFileName) {
threadInfo *pThreadInfo,
char *sql, char* topic, bool restart)
{
TAOS_SUB* tsub = NULL;
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
tsub = taos_subscribe(taos,
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, subscribe_callback, (void*)resultFileName,
topic, sql, specified_sub_callback, (void*)pThreadInfo,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
} else if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, stable_sub_callback, (void*)pThreadInfo,
g_queryInfo.superQueryInfo.subscribeInterval);
} else {
tsub = taos_subscribe(taos,
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, NULL, NULL, 0);
}
......@@ -6572,13 +6594,8 @@ static void *superSubscribe(void *sarg) {
char subSqlstr[MAX_QUERY_SQL_LENGTH];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
if (g_queryInfo.superQueryInfo.sqlCount == 0)
return NULL;
if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n",
g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables,
g_queryInfo.superQueryInfo.sqlCount,
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
pThreadInfo->ntables,
MAX_QUERY_SQL_COUNT);
exit(-1);
......@@ -6612,88 +6629,80 @@ static void *superSubscribe(void *sarg) {
char topic[32] = {0};
for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%d", i, j);
sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"",
i, pThreadInfo->querySeq);
memset(subSqlstr,0,sizeof(subSqlstr));
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], subSqlstr, i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID);
}
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
__func__, __LINE__, subSeq, subSqlstr);
tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[subSeq]) {
replaceChildTblName(
g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq],
subSqlstr, i);
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
__func__, __LINE__, pThreadInfo->threadID, subSqlstr);
tsub[i] = subscribeImpl(
pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart);
if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
}
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
// start loop to consume result
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) {
consumed[i] = 0;
}
TAOS_RES* res = NULL;
while(1) {
for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
continue;
}
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taosMsleep(g_queryInfo.superQueryInfo.subscribeInterval); // ms
res = taos_consume(tsub[subSeq]);
res = taos_consume(tsub[i]);
if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.superQueryInfo.result[j],
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
appendResultToFile(res, tmpFile);
appendResultToFile(res, pThreadInfo->fp);
}
consumed[j] ++;
consumed[i] ++;
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
&& (consumed[j] >=
g_queryInfo.superQueryInfo.resubAfterConsume[j])) {
printf("keepProgress:%d, resub super table query: %d\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress, j);
taos_unsubscribe(tsub[subSeq],
&& (consumed[i] >=
g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress,
pThreadInfo->querySeq);
taos_unsubscribe(tsub,
g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[j]= 0;
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n",
__func__, __LINE__, subSeq, subSqlstr);
tsub[subSeq] = subscribeImpl(
pThreadInfo->taos, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[subSeq]) {
consumed[i]= 0;
tsub[i] = subscribeImpl(
pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart
);
if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
}
}
}
}
taos_free_result(res);
for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taos_unsubscribe(tsub[subSeq], 0);
}
taos_unsubscribe(tsub[i], 0);
}
taos_close(pThreadInfo->taos);
......@@ -6702,10 +6711,7 @@ static void *superSubscribe(void *sarg) {
static void *specifiedSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
if (g_queryInfo.specifiedQueryInfo.sqlCount == 0)
return NULL;
TAOS_SUB* tsub = NULL;
if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL;
......@@ -6732,76 +6738,61 @@ static void *specifiedSubscribe(void *sarg) {
}
char topic[32] = {0};
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
sprintf(topic, "taosdemo-subscribe-%d", i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i],
sprintf(topic, "taosdemo-subscribe-%"PRIu64"", pThreadInfo->querySeq);
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
tsub[i] = subscribeImpl(pThreadInfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i], topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
tsub = subscribeImpl(pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart);
if (NULL == tsub) {
taos_close(pThreadInfo->taos);
return NULL;
}
// start loop to consume result
TAOS_RES* res = NULL;
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
int consumed;
while(1) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue;
}
taosMsleep(g_queryInfo.specifiedQueryInfo.subscribeInterval); // ms
res = taos_consume(tsub[i]);
res = taos_consume(tsub);
if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i],
pThreadInfo->threadID);
appendResultToFile(res, tmpFile);
}
consumed[i] ++;
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
&& (consumed[i] >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[i])) {
printf("keepProgress:%d, resub specified query: %d\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, i);
consumed[i] = 0;
taos_unsubscribe(tsub[i],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
tsub[i] = subscribeImpl(pThreadInfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i], topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[i]) {
consumed ++;
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
&& (consumed >=
g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub specified query: %"PRIu64"\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress,
pThreadInfo->querySeq);
consumed = 0;
taos_unsubscribe(tsub,
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
tsub = subscribeImpl(
pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart
);
if (NULL == tsub) {
taos_close(pThreadInfo->taos);
return NULL;
}
}
}
}
}
}
}
taos_free_result(res);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
taos_unsubscribe(tsub[i], 0);
}
taos_unsubscribe(tsub, 0);
taos_close(pThreadInfo->taos);
return NULL;
}
......@@ -6836,7 +6827,11 @@ static int subscribeTestProcess() {
pthread_t *pids = NULL;
threadInfo *infos = NULL;
//==== create sub threads for query for specified table
pthread_t *pidsOfStable = NULL;
threadInfo *infosOfStable = NULL;
//==== create threads for query for specified table
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
__func__, __LINE__,
......@@ -6850,81 +6845,108 @@ static int subscribeTestProcess() {
}
pids = malloc(
g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t));
g_queryInfo.specifiedQueryInfo.sqlCount *
g_queryInfo.specifiedQueryInfo.concurrent *
sizeof(pthread_t));
infos = malloc(
g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo));
g_queryInfo.specifiedQueryInfo.sqlCount *
g_queryInfo.specifiedQueryInfo.concurrent *
sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
exit(-1);
}
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
threadInfo *t_info = infos + i;
t_info->threadID = i;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + i, NULL, specifiedSubscribe, t_info);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
threadInfo *t_info = infos + seq;
t_info->threadID = seq;
t_info->querySeq = i;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedSubscribe, t_info);
}
}
}
//==== create sub threads for super table query
pthread_t *pidsOfSub = NULL;
threadInfo *infosOfSub = NULL;
if ((g_queryInfo.superQueryInfo.sqlCount > 0)
//==== create threads for super table query
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount);
} else {
if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt *
pidsOfStable = malloc(
g_queryInfo.superQueryInfo.sqlCount *
g_queryInfo.superQueryInfo.threadCnt *
sizeof(pthread_t));
infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt *
infosOfStable = malloc(
g_queryInfo.superQueryInfo.sqlCount *
g_queryInfo.superQueryInfo.threadCnt *
sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
errorPrint("%s() LN%d, malloc failed for create threads\n",
if ((NULL == pidsOfStable) || (NULL == infosOfStable)) {
errorPrint("%s() LN%d, malloc failed for create threads\n",
__func__, __LINE__);
// taos_close(taos);
exit(-1);
}
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt;
// taos_close(taos);
exit(-1);
}
int64_t a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt;
int64_t b = 0;
if (threads != 0) {
b = ntables % threads;
}
int64_t a = ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
}
uint64_t startFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i;
t_info->threadID = i;
int64_t b = 0;
if (threads != 0) {
b = ntables % threads;
}
t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superSubscribe, t_info);
}
uint64_t startFrom = 0;
for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
threadInfo *t_info = infosOfStable + seq;
t_info->threadID = seq;
t_info->querySeq = i;
t_info->start_table_from = startFrom;
t_info->ntables = j<b?a+1:a;
t_info->end_table_to = j<b?startFrom+a:startFrom+a-1;
startFrom = t_info->end_table_to + 1;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfStable + seq,
NULL, superSubscribe, t_info);
}
}
g_queryInfo.superQueryInfo.threadCnt = threads;
g_queryInfo.superQueryInfo.threadCnt = threads;
for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) {
pthread_join(pidsOfSub[i], NULL);
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
pthread_join(pidsOfStable[seq], NULL);
}
}
}
}
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) {
pthread_join(pids[i], NULL);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
pthread_join(pids[seq], NULL);
}
}
tmfree((char*)pids);
tmfree((char*)infos);
tmfree((char*)pidsOfSub);
tmfree((char*)infosOfSub);
tmfree((char*)pidsOfStable);
tmfree((char*)infosOfStable);
// taos_close(taos);
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册