提交 5c61c3ad 编写于 作者: D dapan1121

Merge branch 'feature/TD-4034' into feature/TD-3950

...@@ -234,7 +234,7 @@ typedef struct SDataCol { ...@@ -234,7 +234,7 @@ typedef struct SDataCol {
int len; // column data length int len; // column data length
VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column
void * pData; // Actual data pointer void * pData; // Actual data pointer
TSKEY ts; // only used in last NULL column TKEY ts; // only used in last NULL column
} SDataCol; } SDataCol;
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
......
{
"filetype": "subscribe",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"databases": "test",
"specified_table_query": {
"concurrent": 1,
"mode": "async",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"resubAfterConsume": 10,
"sqls": [
{
"sql": "select col1 from meters where col1 > 1;",
"result": "./subscribe_res0.txt"
},
{
"sql": "select col2 from meters where col2 > 1;",
"result": "./subscribe_res2.txt"
}
]
},
"super_table_query": {
"stblname": "meters",
"threads": 1,
"mode": "sync",
"interval": 1000,
"restart": "yes",
"keepProgress": "yes",
"sqls": [
{
"sql": "select col1 from xxxx where col1 > 10;",
"result": "./subscribe_res1.txt"
}
]
}
}
...@@ -114,7 +114,7 @@ typedef enum TALBE_EXISTS_EN { ...@@ -114,7 +114,7 @@ typedef enum TALBE_EXISTS_EN {
TBL_EXISTS_BUTT TBL_EXISTS_BUTT
} TALBE_EXISTS_EN; } TALBE_EXISTS_EN;
enum MODE { enum enumSYNC_MODE {
SYNC_MODE, SYNC_MODE,
ASYNC_MODE, ASYNC_MODE,
MODE_BUT MODE_BUT
...@@ -127,6 +127,12 @@ enum enum_TAOS_INTERFACE { ...@@ -127,6 +127,12 @@ enum enum_TAOS_INTERFACE {
INTERFACE_BUT INTERFACE_BUT
}; };
typedef enum enumQUERY_CLASS {
SPECIFIED_CLASS,
STABLE_CLASS,
CLASS_BUT
} QUERY_CLASS;
typedef enum enum_PROGRESSIVE_OR_INTERLACE { typedef enum enum_PROGRESSIVE_OR_INTERLACE {
PROGRESSIVE_INSERT_MODE, PROGRESSIVE_INSERT_MODE,
INTERLACE_INSERT_MODE, INTERLACE_INSERT_MODE,
...@@ -451,8 +457,9 @@ typedef struct SThreadInfo_S { ...@@ -451,8 +457,9 @@ typedef struct SThreadInfo_S {
uint64_t maxDelay; uint64_t maxDelay;
uint64_t minDelay; uint64_t minDelay;
// query // seq of query or subscribe
uint64_t querySeq; // sequence number of sql command uint64_t querySeq; // sequence number of sql command
} threadInfo; } threadInfo;
#ifdef WINDOWS #ifdef WINDOWS
...@@ -1179,7 +1186,8 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) { ...@@ -1179,7 +1186,8 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) {
} }
static void selectAndGetResult( static void selectAndGetResult(
threadInfo *pThreadInfo, char *command, char* resultFile) { threadInfo *pThreadInfo, char *command)
{
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) {
TAOS_RES *res = taos_query(pThreadInfo->taos, command); TAOS_RES *res = taos_query(pThreadInfo->taos, command);
if (res == NULL || taos_errno(res) != 0) { if (res == NULL || taos_errno(res) != 0) {
...@@ -1189,8 +1197,8 @@ static void selectAndGetResult( ...@@ -1189,8 +1197,8 @@ static void selectAndGetResult(
return; return;
} }
if ((resultFile) && (strlen(resultFile))) { if ((strlen(pThreadInfo->fp))) {
appendResultToFile(res, resultFile); appendResultToFile(res, pThreadInfo->fp);
} }
taos_free_result(res); taos_free_result(res);
...@@ -1198,7 +1206,7 @@ static void selectAndGetResult( ...@@ -1198,7 +1206,7 @@ static void selectAndGetResult(
int retCode = postProceSql( int retCode = postProceSql(
g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port, g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port,
command, command,
resultFile); pThreadInfo->fp);
if (0 != retCode) { if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID); printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID);
} }
...@@ -6255,23 +6263,22 @@ static void *specifiedTableQuery(void *sarg) { ...@@ -6255,23 +6263,22 @@ static void *specifiedTableQuery(void *sarg) {
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
while(queryTimes --) { while(queryTimes --) {
if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) < if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) <
(int64_t)g_queryInfo.specifiedQueryInfo.queryInterval) { (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval) {
taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval - (et - st)); // ms taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval - (et - st)); // ms
} }
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
st = taosGetTimestampMs(); st = taosGetTimestampMs();
selectAndGetResult(pThreadInfo, selectAndGetResult(pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], tmpFile); g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]);
et = taosGetTimestampMs(); et = taosGetTimestampMs();
printf("=thread[%"PRId64"] use %s complete one sql, Spent %10.3f s\n", printf("=thread[%"PRId64"] use %s complete one sql, Spent %10.3f s\n",
...@@ -6357,13 +6364,12 @@ static void *superTableQuery(void *sarg) { ...@@ -6357,13 +6364,12 @@ static void *superTableQuery(void *sarg) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr)); memset(sqlstr,0,sizeof(sqlstr));
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i); replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[j][0] != 0) { if (g_queryInfo.superQueryInfo.result[j][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[j], g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID); pThreadInfo->threadID);
} }
selectAndGetResult(pThreadInfo, sqlstr, tmpFile); selectAndGetResult(pThreadInfo, sqlstr);
totalQueried++; totalQueried++;
g_queryInfo.superQueryInfo.totalQueried ++; g_queryInfo.superQueryInfo.totalQueried ++;
...@@ -6435,7 +6441,7 @@ static int queryTestProcess() { ...@@ -6435,7 +6441,7 @@ static int queryTestProcess() {
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query from specify table //==== create sub threads for query from specify table
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount; uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
uint64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
...@@ -6449,32 +6455,33 @@ static int queryTestProcess() { ...@@ -6449,32 +6455,33 @@ static int queryTestProcess() {
ERROR_EXIT("memory allocation failed for create threads\n"); ERROR_EXIT("memory allocation failed for create threads\n");
} }
for (int i = 0; i < nConcurrent; i++) { for (uint64_t i = 0; i < nSqlCount; i++) {
for (int j = 0; j < nSqlCount; j++) { for (int j = 0; j < nConcurrent; j++) {
threadInfo *t_info = infos + i * nSqlCount + j; uint64_t seq = i * nConcurrent + j;
t_info->threadID = i * nSqlCount + j; threadInfo *t_info = infos + seq;
t_info->querySeq = j; t_info->threadID = seq;
t_info->querySeq = i;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(taos);
free(infos);
free(pids);
errorPrint( "use database %s failed!\n\n",
g_queryInfo.dbName);
return -1;
}
}
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { t_info->taos = NULL;// TODO: workaround to use separate taos connection;
char sqlStr[MAX_TB_NAME_SIZE*2]; pthread_create(pids + seq, NULL, specifiedTableQuery,
sprintf(sqlStr, "use %s", g_queryInfo.dbName); t_info);
verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) {
taos_close(taos);
free(infos);
free(pids);
errorPrint( "use database %s failed!\n\n",
g_queryInfo.dbName);
return -1;
}
} }
t_info->taos = NULL;// TODO: workaround to use separate taos connection;
pthread_create(pids + i * nSqlCount + j, NULL, specifiedTableQuery,
t_info);
}
} }
} else { } else {
g_queryInfo.specifiedQueryInfo.concurrent = 0; g_queryInfo.specifiedQueryInfo.concurrent = 0;
...@@ -6559,7 +6566,21 @@ static int queryTestProcess() { ...@@ -6559,7 +6566,21 @@ static int queryTestProcess() {
return 0; return 0;
} }
static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { static void stable_sub_callback(
TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
if (res == NULL || taos_errno(res) != 0) {
errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n",
__func__, __LINE__, code, taos_errstr(res));
return;
}
if (param)
appendResultToFile(res, ((threadInfo *)param)->fp);
// tao_unscribe() will free result.
}
static void specified_sub_callback(
TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
if (res == NULL || taos_errno(res) != 0) { if (res == NULL || taos_errno(res) != 0) {
errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n", errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n",
__func__, __LINE__, code, taos_errstr(res)); __func__, __LINE__, code, taos_errstr(res));
...@@ -6567,24 +6588,36 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c ...@@ -6567,24 +6588,36 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
} }
if (param) if (param)
appendResultToFile(res, (char*)param); appendResultToFile(res, ((threadInfo *)param)->fp);
// tao_unscribe() will free result. // tao_unscribe() will free result.
} }
static TAOS_SUB* subscribeImpl( static TAOS_SUB* subscribeImpl(
TAOS *taos, char *sql, char* topic, bool restart, QUERY_CLASS class,
char* resultFileName) { threadInfo *pThreadInfo,
char *sql, char* topic, bool restart, uint64_t interval)
{
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if ((SPECIFIED_CLASS == class)
tsub = taos_subscribe(taos, && (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode)) {
tsub = taos_subscribe(
pThreadInfo->taos,
restart, restart,
topic, sql, subscribe_callback, (void*)resultFileName, topic, sql, specified_sub_callback, (void*)pThreadInfo,
g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.subscribeInterval);
} else if ((STABLE_CLASS == class)
&& (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode)) {
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, stable_sub_callback, (void*)pThreadInfo,
g_queryInfo.superQueryInfo.subscribeInterval);
} else { } else {
tsub = taos_subscribe(taos, tsub = taos_subscribe(
pThreadInfo->taos,
restart, restart,
topic, sql, NULL, NULL, 0); topic, sql, NULL, NULL, interval);
} }
if (tsub == NULL) { if (tsub == NULL) {
...@@ -6600,13 +6633,8 @@ static void *superSubscribe(void *sarg) { ...@@ -6600,13 +6633,8 @@ static void *superSubscribe(void *sarg) {
char subSqlstr[MAX_QUERY_SQL_LENGTH]; char subSqlstr[MAX_QUERY_SQL_LENGTH];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
if (g_queryInfo.superQueryInfo.sqlCount == 0) if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
return NULL; errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n",
g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables,
g_queryInfo.superQueryInfo.sqlCount,
pThreadInfo->ntables, pThreadInfo->ntables,
MAX_QUERY_SQL_COUNT); MAX_QUERY_SQL_COUNT);
exit(-1); exit(-1);
...@@ -6637,91 +6665,92 @@ static void *superSubscribe(void *sarg) { ...@@ -6637,91 +6665,92 @@ static void *superSubscribe(void *sarg) {
return NULL; return NULL;
} }
uint64_t subSeq;
char topic[32] = {0}; char topic[32] = {0};
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n",
sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%d", i, j); __func__, __LINE__,
memset(subSqlstr,0,sizeof(subSqlstr)); pThreadInfo->threadID,
replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], subSqlstr, i); pThreadInfo->start_table_from,
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; pThreadInfo->end_table_to, i);
if (g_queryInfo.superQueryInfo.result[j][0] != 0) { sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"",
sprintf(tmpFile, "%s-%d", i, pThreadInfo->querySeq);
g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); memset(subSqlstr, 0, sizeof(subSqlstr));
} replaceChildTblName(
g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq],
subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; subSqlstr, i);
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
__func__, __LINE__, subSeq, subSqlstr); sprintf(pThreadInfo->fp, "%s-%d",
tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
g_queryInfo.superQueryInfo.subscribeRestart, pThreadInfo->threadID);
tmpFile); }
if (NULL == tsub[subSeq]) {
debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
__func__, __LINE__, pThreadInfo->threadID, subSqlstr);
tsub[i] = subscribeImpl(
STABLE_CLASS,
pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval);
if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
}
} }
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
// start loop to consume result // start loop to consume result
int consumed[MAX_QUERY_SQL_COUNT];
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) {
consumed[i] = 0;
}
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { for (uint64_t i = pThreadInfo->start_table_from;
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { i <= pThreadInfo->end_table_to; i++) {
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
continue; continue;
} }
subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; taosMsleep(g_queryInfo.superQueryInfo.subscribeInterval); // ms
taosMsleep(100); // ms res = taos_consume(tsub[i]);
res = taos_consume(tsub[subSeq]);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
if (g_queryInfo.superQueryInfo.result[j][0] != 0) { sprintf(pThreadInfo->fp, "%s-%d",
sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
g_queryInfo.superQueryInfo.result[j],
pThreadInfo->threadID); pThreadInfo->threadID);
appendResultToFile(res, tmpFile); appendResultToFile(res, pThreadInfo->fp);
} }
consumed[j] ++; consumed[i] ++;
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
&& (consumed[j] >= && (consumed[i] >=
g_queryInfo.superQueryInfo.resubAfterConsume[j])) { g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub super table query: %d\n", printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress, j); g_queryInfo.superQueryInfo.subscribeKeepProgress,
taos_unsubscribe(tsub[subSeq], pThreadInfo->querySeq);
taos_unsubscribe(tsub,
g_queryInfo.superQueryInfo.subscribeKeepProgress); g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[j]= 0; consumed[i]= 0;
subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; tsub[i] = subscribeImpl(
debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", STABLE_CLASS,
__func__, __LINE__, subSeq, subSqlstr); pThreadInfo, subSqlstr, topic,
tsub[subSeq] = subscribeImpl( g_queryInfo.superQueryInfo.subscribeRestart,
pThreadInfo->taos, subSqlstr, topic, g_queryInfo.superQueryInfo.subscribeInterval
g_queryInfo.superQueryInfo.subscribeRestart, );
tmpFile); if (NULL == tsub[i]) {
if (NULL == tsub[subSeq]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
} }
} }
}
} }
} }
taos_free_result(res); taos_free_result(res);
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { taos_unsubscribe(tsub[i], 0);
subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j;
taos_unsubscribe(tsub[subSeq], 0);
}
} }
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
...@@ -6730,10 +6759,7 @@ static void *superSubscribe(void *sarg) { ...@@ -6730,10 +6759,7 @@ static void *superSubscribe(void *sarg) {
static void *specifiedSubscribe(void *sarg) { static void *specifiedSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; TAOS_SUB* tsub = NULL;
if (g_queryInfo.specifiedQueryInfo.sqlCount == 0)
return NULL;
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL; TAOS * taos = NULL;
...@@ -6760,74 +6786,64 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6760,74 +6786,64 @@ static void *specifiedSubscribe(void *sarg) {
} }
char topic[32] = {0}; char topic[32] = {0};
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { sprintf(topic, "taosdemo-subscribe-%"PRIu64"", pThreadInfo->querySeq);
sprintf(topic, "taosdemo-subscribe-%d", i); if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; sprintf(pThreadInfo->fp, "%s-%d",
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
sprintf(tmpFile, "%s-%d", pThreadInfo->threadID);
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); }
} tsub = subscribeImpl(
tsub[i] = subscribeImpl(pThreadInfo->taos, SPECIFIED_CLASS, pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[i], topic, g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.subscribeRestart,
tmpFile); g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == tsub[i]) { if (NULL == tsub) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
}
} }
// start loop to consume result // start loop to consume result
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
int consumed[MAX_QUERY_SQL_COUNT]; int consumed;
for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++)
consumed[i] = 0;
while(1) { while(1) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
continue; continue;
} }
taosMsleep(1000); // ms taosMsleep(g_queryInfo.specifiedQueryInfo.subscribeInterval); // ms
res = taos_consume(tsub[i]); res = taos_consume(tsub);
if (res) { if (res) {
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; consumed ++;
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress)
sprintf(tmpFile, "%s-%d", && (consumed >=
g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
appendResultToFile(res, tmpFile); printf("keepProgress:%d, resub specified query: %"PRIu64"\n",
} g_queryInfo.specifiedQueryInfo.subscribeKeepProgress,
consumed[i] ++; pThreadInfo->querySeq);
consumed = 0;
if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) taos_unsubscribe(tsub,
&& (consumed[i] >= g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
g_queryInfo.specifiedQueryInfo.resubAfterConsume[i])) { tsub = subscribeImpl(
printf("keepProgress:%d, resub specified query: %d\n", SPECIFIED_CLASS,
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, i); pThreadInfo,
consumed[i] = 0; g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
taos_unsubscribe(tsub[i], topic,
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); g_queryInfo.specifiedQueryInfo.subscribeRestart,
tsub[i] = subscribeImpl(pThreadInfo->taos, g_queryInfo.specifiedQueryInfo.subscribeInterval);
g_queryInfo.specifiedQueryInfo.sql[i], topic, if (NULL == tsub) {
g_queryInfo.specifiedQueryInfo.subscribeRestart,
tmpFile);
if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
} }
} }
}
} }
taos_free_result(res); taos_free_result(res);
taos_unsubscribe(tsub, 0);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
taos_unsubscribe(tsub[i], 0);
}
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
...@@ -6865,9 +6881,13 @@ static int subscribeTestProcess() { ...@@ -6865,9 +6881,13 @@ static int subscribeTestProcess() {
pthread_t *pids = NULL; pthread_t *pids = NULL;
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query for specified table
pthread_t *pidsOfStable = NULL;
threadInfo *infosOfStable = NULL;
//==== create threads for query for specified table
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", debugPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
__func__, __LINE__, __func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount); g_queryInfo.specifiedQueryInfo.sqlCount);
} else { } else {
...@@ -6878,80 +6898,109 @@ static int subscribeTestProcess() { ...@@ -6878,80 +6898,109 @@ static int subscribeTestProcess() {
exit(-1); exit(-1);
} }
pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); pids = malloc(
infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); g_queryInfo.specifiedQueryInfo.sqlCount *
g_queryInfo.specifiedQueryInfo.concurrent *
sizeof(pthread_t));
infos = malloc(
g_queryInfo.specifiedQueryInfo.sqlCount *
g_queryInfo.specifiedQueryInfo.concurrent *
sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) { if ((NULL == pids) || (NULL == infos)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
exit(-1); exit(-1);
} }
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
threadInfo *t_info = infos + i; for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
t_info->threadID = i; uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
t_info->taos = NULL; // TODO: workaround to use separate taos connection; threadInfo *t_info = infos + seq;
pthread_create(pids + i, NULL, specifiedSubscribe, t_info); t_info->threadID = seq;
t_info->querySeq = i;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pids + seq, NULL, specifiedSubscribe, t_info);
}
} }
} }
//==== create sub threads for super table query //==== create threads for super table query
pthread_t *pidsOfSub = NULL; if (g_queryInfo.superQueryInfo.sqlCount <= 0) {
threadInfo *infosOfSub = NULL; printf("%s() LN%d, super table query sqlCount %"PRIu64".\n",
if ((g_queryInfo.superQueryInfo.sqlCount > 0) __func__, __LINE__,
g_queryInfo.superQueryInfo.sqlCount);
} else {
if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) { && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * pidsOfStable = malloc(
g_queryInfo.superQueryInfo.sqlCount *
g_queryInfo.superQueryInfo.threadCnt *
sizeof(pthread_t)); sizeof(pthread_t));
infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * infosOfStable = malloc(
g_queryInfo.superQueryInfo.sqlCount *
g_queryInfo.superQueryInfo.threadCnt *
sizeof(threadInfo)); sizeof(threadInfo));
if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { if ((NULL == pidsOfStable) || (NULL == infosOfStable)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", errorPrint("%s() LN%d, malloc failed for create threads\n",
__func__, __LINE__); __func__, __LINE__);
// taos_close(taos); // taos_close(taos);
exit(-1); exit(-1);
} }
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt;
int64_t a = ntables / threads; int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
if (a < 1) { int threads = g_queryInfo.superQueryInfo.threadCnt;
threads = ntables;
a = 1;
}
int64_t b = 0; int64_t a = ntables / threads;
if (threads != 0) { if (a < 1) {
b = ntables % threads; threads = ntables;
} a = 1;
}
uint64_t startFrom = 0; int64_t b = 0;
for (int i = 0; i < threads; i++) { if (threads != 0) {
threadInfo *t_info = infosOfSub + i; b = ntables % threads;
t_info->threadID = i; }
t_info->start_table_from = startFrom; for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
t_info->ntables = i<b?a+1:a; uint64_t startFrom = 0;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1; for (int j = 0; j < threads; j++) {
startFrom = t_info->end_table_to + 1; uint64_t seq = i * threads + j;
t_info->taos = NULL; // TODO: workaround to use separate taos connection; threadInfo *t_info = infosOfStable + seq;
pthread_create(pidsOfSub + i, NULL, superSubscribe, t_info); t_info->threadID = seq;
} t_info->querySeq = i;
t_info->start_table_from = startFrom;
t_info->ntables = j<b?a+1:a;
t_info->end_table_to = j<b?startFrom+a:startFrom+a-1;
startFrom = t_info->end_table_to + 1;
t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfStable + seq,
NULL, superSubscribe, t_info);
}
}
g_queryInfo.superQueryInfo.threadCnt = threads; g_queryInfo.superQueryInfo.threadCnt = threads;
for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
pthread_join(pidsOfSub[i], NULL); for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
pthread_join(pidsOfStable[seq], NULL);
}
}
} }
} }
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
pthread_join(pids[i], NULL); for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) {
uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j;
pthread_join(pids[seq], NULL);
}
} }
tmfree((char*)pids); tmfree((char*)pids);
tmfree((char*)infos); tmfree((char*)infos);
tmfree((char*)pidsOfSub); tmfree((char*)pidsOfStable);
tmfree((char*)infosOfSub); tmfree((char*)infosOfStable);
// taos_close(taos); // taos_close(taos);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册