提交 1b067603 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/mnode

--- ---
sidebar_label: Subscription sidebar_label: Subscription
description: "Lightweight service for data subscription and pushing, the time series data inserted into TDengine continuously can be pushed automatically to the subscribing clients." description: "Lightweight service for data subscription and publishing. Time series data inserted into TDengine continuously can be pushed automatically to subscribing clients."
title: Data Subscription title: Data Subscription
--- ---
...@@ -16,9 +16,9 @@ import CDemo from "./_sub_c.mdx"; ...@@ -16,9 +16,9 @@ import CDemo from "./_sub_c.mdx";
## Introduction ## Introduction
Due to the nature of time series data, data inserting in TDengine is similar to data publishing in message queues. Data is stored in ascending order of timestamp inside TDengine, so each table in TDengine can essentially be considered as a message queue. Due to the nature of time series data, data insertion into TDengine is similar to data publishing in message queues. Data is stored in ascending order of timestamp inside TDengine, and so each table in TDengine can essentially be considered as a message queue.
A lightweight service for data subscription and pushing is built in TDengine. With the API provided by TDengine, client programs can use `select` statements to subscribe to data from one or more tables. The subscription and state maintenance is performed on the client side, the client programs poll the server to check whether there is new data, and if so the new data will be pushed back to the client side. If the client program is restarted, where to start for retrieving new data is up to the client side. A lightweight service for data subscription and publishing is built into TDengine. With the API provided by TDengine, client programs can use `select` statements to subscribe to data from one or more tables. The subscription and state maintenance is performed on the client side. The client programs poll the server to check whether there is new data, and if so the new data will be pushed back to the client side. If the client program is restarted, where to start retrieving new data is up to the client side.
There are 3 major APIs related to subscription provided in the TDengine client driver. There are 3 major APIs related to subscription provided in the TDengine client driver.
...@@ -32,7 +32,7 @@ For more details about these APIs please refer to [C/C++ Connector](/reference/c ...@@ -32,7 +32,7 @@ For more details about these APIs please refer to [C/C++ Connector](/reference/c
If we want to get a notification and take some actions if the current exceeds a threshold, like 10A, from some meters, there are two ways: If we want to get a notification and take some actions if the current exceeds a threshold, like 10A, from some meters, there are two ways:
The first way is to query on each sub table and record the last timestamp matching the criteria, then after some time query on the data later than recorded timestamp and repeat this process. The SQL statements for this way are as below. The first way is to query each sub table and record the last timestamp matching the criteria. Then after some time, query the data later than the recorded timestamp, and repeat this process. The SQL statements for this way are as below.
```sql ```sql
select * from D1001 where ts > {last_timestamp1} and current > 10; select * from D1001 where ts > {last_timestamp1} and current > 10;
...@@ -50,7 +50,7 @@ select * from meters where ts > {last_timestamp} and current > 10; ...@@ -50,7 +50,7 @@ select * from meters where ts > {last_timestamp} and current > 10;
However, this presents a new problem in how to choose `last_timestamp`. First, the timestamp when the data is generated is different from the timestamp when the data is inserted into the database, sometimes the difference between them may be very big. Second, the time when the data from different meters arrives at the database may be different too. If the timestamp of the "slowest" meter is used as `last_timestamp` in the query, the data from other meters may be selected repeatedly; but if the timestamp of the "fastest" meter is used as `last_timestamp`, some data from other meters may be missed. However, this presents a new problem in how to choose `last_timestamp`. First, the timestamp when the data is generated is different from the timestamp when the data is inserted into the database, sometimes the difference between them may be very big. Second, the time when the data from different meters arrives at the database may be different too. If the timestamp of the "slowest" meter is used as `last_timestamp` in the query, the data from other meters may be selected repeatedly; but if the timestamp of the "fastest" meter is used as `last_timestamp`, some data from other meters may be missed.
All the problems mentioned above can be resolved thoroughly using subscription provided by TDengine. All the problems mentioned above can be resolved easily using the subscription functionality provided by TDengine.
The first step is to create subscription using `taos_subscribe`. The first step is to create subscription using `taos_subscribe`.
...@@ -65,31 +65,33 @@ if (async) { ...@@ -65,31 +65,33 @@ if (async) {
} }
``` ```
The subscription in TDengine can be either synchronous or asynchronous. In the above sample code, the value of variable `async` is determined from the CLI input, then it's used to create either an async or sync subscription. Sync subscription means the client program needs to invoke `taos_consume` to retrieve data, and async subscription means another thread created by `taos_subscribe` internally invokes `taos_consume` to retrieve data and pass the data to `subscribe_callback` for processing, `subscribe_callback` is a call back function provided by the client program and it's suggested not to do time consuming operation in the call back function. The subscription in TDengine can be either synchronous or asynchronous. In the above sample code, the value of variable `async` is determined from the CLI input, then it's used to create either an async or sync subscription. Sync subscription means the client program needs to invoke `taos_consume` to retrieve data, and async subscription means another thread created by `taos_subscribe` internally invokes `taos_consume` to retrieve data and pass the data to `subscribe_callback` for processing. `subscribe_callback` is a callback function provided by the client program. You should not perform time consuming operations in the callback function.
The parameter `taos` is an established connection. There is nothing special in sync subscription mode. In async subscription, it should be exclusively by current thread, otherwise unpredictable error may occur. The parameter `taos` is an established connection. Nothing special needs to be done for thread safety for synchronous subscription. For asynchronous subscription, the taos_subscribe function should be called exclusively by the current thread, to avoid unpredictable errors.
The parameter `sql` is a `select` statement in which `where` clause can be used to specify filter conditions. In our example, the data whose current exceeds 10A needs to be subscribed like below SQL statement: The parameter `sql` is a `select` statement in which the `where` clause can be used to specify filter conditions. In our example, we can subscribe to the records in which the current exceeds 10A, with the following SQL statement:
```sql ```sql
select * from meters where current > 10; select * from meters where current > 10;
``` ```
Please note that, all the data will be processed because no start time is specified. If only the data from one day ago needs to be processed, a time related condition can be added: Please note that, all the data will be processed because no start time is specified. If we only want to process data for the past day, a time related condition can be added:
```sql ```sql
select * from meters where ts > now - 1d and current > 10; select * from meters where ts > now - 1d and current > 10;
``` ```
The parameter `topic` is the name of the subscription, it needs to be guaranteed unique in the client program, but it's not necessary to be globally unique because subscription is implemented in the APIs on the client side. The parameter `topic` is the name of the subscription. The client application must guarantee that the name is unique. However, it doesn't have to be globally unique because subscription is implemented in the APIs on the client side.
If the subscription named as `topic` doesn't exist, the parameter `restart` will be ignored. If the subscription named as `topic` has been created before by the client program, when the client program is restarted with the subscription named `topic`, parameter `restart` is used to determine whether to retrieve data from the beginning or from the last point where the subscription was broken. If the value of `restart` is **true** (i.e. a non-zero value), the data will be retrieved from beginning, or if it is **false** (i.e. zero), the data already consumed before will not be processed again. If the subscription named as `topic` doesn't exist, the parameter `restart` will be ignored. If the subscription named as `topic` has been created before by the client program, when the client program is restarted with the subscription named `topic`, parameter `restart` is used to determine whether to retrieve data from the beginning or from the last point where the subscription was broken.
The last parameter of `taos_subscribe` is the polling interval in unit of millisecond. In sync mode, if the time difference between two continuous invocations to `taos_consume` is smaller than the interval specified by `taos_subscribe`, `taos_consume` will be blocked until the interval is reached. In async mode, this interval is the minimum interval between two invocations to the call back function. If the value of `restart` is **true** (i.e. a non-zero value), data will be retrieved from the beginning. If it is **false** (i.e. zero), the data already consumed before will not be processed again.
The last parameter of `taos_subscribe` is the polling interval in units of millisecond. In sync mode, if the time difference between two continuous invocations to `taos_consume` is smaller than the interval specified by `taos_subscribe`, `taos_consume` will be blocked until the interval is reached. In async mode, this interval is the minimum interval between two invocations to the call back function.
The second to last parameter of `taos_subscribe` is used to pass arguments to the call back function. `taos_subscribe` doesn't process this parameter and simply passes it to the call back function. This parameter is simply ignored in sync mode. The second to last parameter of `taos_subscribe` is used to pass arguments to the call back function. `taos_subscribe` doesn't process this parameter and simply passes it to the call back function. This parameter is simply ignored in sync mode.
After a subscription is created, its data can be consumed and processed, below is the sample code of how to consume data in sync mode, in the else part if `if (async)`. After a subscription is created, its data can be consumed and processed. Shown below is the sample code to consume data in sync mode, in the else condition of `if (async)`.
```c ```c
if (async) { if (async) {
...@@ -106,7 +108,7 @@ if (async) { ...@@ -106,7 +108,7 @@ if (async) {
} }
``` ```
In the above sample code, there is an infinite loop, each time carriage return is entered `taos_consume` is invoked, the return value of `taos_consume` is the selected result set, exactly as the input of `taos_use_result`, in the above sample `print_result` is used instead to simplify the sample. Below is the implementation of `print_result`. In the above sample code in the else condition, there is an infinite loop. Each time carriage return is entered `taos_consume` is invoked. The return value of `taos_consume` is the selected result set. In the above sample, `print_result` is used to simplify the printing of the result set. Below is the implementation of `print_result`.
```c ```c
void print_result(TAOS_RES* res, int blockFetch) { void print_result(TAOS_RES* res, int blockFetch) {
...@@ -133,9 +135,9 @@ void print_result(TAOS_RES* res, int blockFetch) { ...@@ -133,9 +135,9 @@ void print_result(TAOS_RES* res, int blockFetch) {
} }
``` ```
In the above code `taos_print_row` is used to process the data consumed. All the matching rows will be printed. In the above code `taos_print_row` is used to process the data consumed. All matching rows are printed.
In async mode, the data consuming is simpler as below. In async mode, consuming data is simpler as shown below.
```c ```c
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
...@@ -175,7 +177,7 @@ Then, this row of data will be shown by the example program on the first termina ...@@ -175,7 +177,7 @@ Then, this row of data will be shown by the example program on the first termina
## Examples ## Examples
Below example program demonstrates how to subscribe the data rows whose current exceeds 10A using connectors. The example program below demonstrates how to subscribe, using connectors, to data rows in which current exceeds 10A.
### Prepare Data ### Prepare Data
...@@ -250,7 +252,7 @@ taos> use power; ...@@ -250,7 +252,7 @@ taos> use power;
taos> insert into d1001 values(now, 12.4, 220, 1); taos> insert into d1001 values(now, 12.4, 220, 1);
``` ```
Because the current in inserted row exceeds 10A, it will be consumed by the example program. Because the current in the inserted row exceeds 10A, it will be consumed by the example program.
``` ```
ts: 1651146662805 current: 12.4 voltage: 220 phase: 1 location: California.SanFrancisco groupid: 2 ts: 1651146662805 current: 12.4 voltage: 220 phase: 1 location: California.SanFrancisco groupid: 2
......
...@@ -101,6 +101,7 @@ typedef struct SDbVgVersion { ...@@ -101,6 +101,7 @@ typedef struct SDbVgVersion {
typedef struct STbSVersion { typedef struct STbSVersion {
char* tbFName; char* tbFName;
int32_t sver; int32_t sver;
int32_t tver;
} STbSVersion; } STbSVersion;
typedef struct SUserAuthVersion { typedef struct SUserAuthVersion {
......
...@@ -413,7 +413,7 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) { ...@@ -413,7 +413,7 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
STbVerInfo* tbInfo = taosArrayGet(pTbArray, i); STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion}; STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
taosArrayPush(pArray, &tbSver); taosArrayPush(pArray, &tbSver);
} }
} }
...@@ -745,12 +745,12 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -745,12 +745,12 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
pRequest->metric.rsp = taosGetTimestampUs(); pRequest->metric.rsp = taosGetTimestampUs();
STscObj* pTscObj = pRequest->pTscObj; //STscObj* pTscObj = pRequest->pTscObj;
if (pEpSet) { //if (pEpSet) {
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) { // if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); // updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
} // }
} //}
/* /*
* There is not response callback function for submit response. * There is not response callback function for submit response.
......
...@@ -1503,6 +1503,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa ...@@ -1503,6 +1503,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
pRsp->precision = pDb->cfg.precision; pRsp->precision = pDb->cfg.precision;
pRsp->tableType = TSDB_SUPER_TABLE; pRsp->tableType = TSDB_SUPER_TABLE;
pRsp->sversion = pStb->colVer; pRsp->sversion = pStb->colVer;
pRsp->tversion = pStb->tagVer;
pRsp->suid = pStb->uid; pRsp->suid = pStb->uid;
pRsp->tuid = pStb->uid; pRsp->tuid = pStb->uid;
...@@ -1629,7 +1630,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int ...@@ -1629,7 +1630,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int
metaRsp.suid = pStbVersion->suid; metaRsp.suid = pStbVersion->suid;
} }
if (pStbVersion->sversion != metaRsp.sversion) { if (pStbVersion->sversion != metaRsp.sversion || pStbVersion->tversion != metaRsp.tversion) {
taosArrayPush(batchMetaRsp.pArray, &metaRsp); taosArrayPush(batchMetaRsp.pArray, &metaRsp);
} else { } else {
tFreeSTableMetaRsp(&metaRsp); tFreeSTableMetaRsp(&metaRsp);
......
...@@ -344,7 +344,7 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { ...@@ -344,7 +344,7 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
EXPECT_EQ(metaRsp.precision, TSDB_TIME_PRECISION_MILLI); EXPECT_EQ(metaRsp.precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(metaRsp.tableType, TSDB_SUPER_TABLE); EXPECT_EQ(metaRsp.tableType, TSDB_SUPER_TABLE);
EXPECT_EQ(metaRsp.sversion, 1); EXPECT_EQ(metaRsp.sversion, 1);
EXPECT_EQ(metaRsp.tversion, 0); EXPECT_EQ(metaRsp.tversion, 1);
EXPECT_GT(metaRsp.suid, 0); EXPECT_GT(metaRsp.suid, 0);
EXPECT_GT(metaRsp.tuid, 0); EXPECT_GT(metaRsp.tuid, 0);
EXPECT_EQ(metaRsp.vgId, 0); EXPECT_EQ(metaRsp.vgId, 0);
......
...@@ -447,7 +447,7 @@ void ctgReleaseVgInfo(SCtgDBCache *dbCache); ...@@ -447,7 +447,7 @@ void ctgReleaseVgInfo(SCtgDBCache *dbCache);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache); int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist); int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist);
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tbType, uint64_t *suid, char *stbName); int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass); int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId); int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq); int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
......
...@@ -812,6 +812,7 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm ...@@ -812,6 +812,7 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
SName name; SName name;
int32_t sver = 0; int32_t sver = 0;
int32_t tver = 0;
int32_t tbNum = taosArrayGetSize(pTables); int32_t tbNum = taosArrayGetSize(pTables);
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i); STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i);
...@@ -828,8 +829,8 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm ...@@ -828,8 +829,8 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
int32_t tbType = 0; int32_t tbType = 0;
uint64_t suid = 0; uint64_t suid = 0;
char stbName[TSDB_TABLE_FNAME_LEN]; char stbName[TSDB_TABLE_FNAME_LEN];
ctgReadTbSverFromCache(pCtg, &name, &sver, &tbType, &suid, stbName); ctgReadTbVerFromCache(pCtg, &name, &sver, &tver, &tbType, &suid, stbName);
if (sver >= 0 && sver < pTb->sver) { if ((sver >= 0 && sver < pTb->sver) || (tver >= 0 && tver < pTb->tver)) {
switch (tbType) { switch (tbType) {
case TSDB_CHILD_TABLE: { case TSDB_CHILD_TABLE: {
SName stb = name; SName stb = name;
......
...@@ -322,9 +322,10 @@ _return: ...@@ -322,9 +322,10 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tbType, uint64_t *suid, int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid,
char *stbName) { char *stbName) {
*sver = -1; *sver = -1;
*tver = -1;
if (NULL == pCtg->dbCache) { if (NULL == pCtg->dbCache) {
ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname); ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname);
...@@ -348,6 +349,7 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t ...@@ -348,6 +349,7 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t
*suid = tbMeta->suid; *suid = tbMeta->suid;
if (*tbType != TSDB_CHILD_TABLE) { if (*tbType != TSDB_CHILD_TABLE) {
*sver = tbMeta->sversion; *sver = tbMeta->sversion;
*tver = tbMeta->tversion;
} }
} }
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
...@@ -359,7 +361,7 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t ...@@ -359,7 +361,7 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t
if (*tbType != TSDB_CHILD_TABLE) { if (*tbType != TSDB_CHILD_TABLE) {
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname); ctgDebug("Got sver %d tver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tver, *tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -391,12 +393,13 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t ...@@ -391,12 +393,13 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t
stbName[nameLen] = 0; stbName[nameLen] = 0;
*sver = (*stbMeta)->sversion; *sver = (*stbMeta)->sversion;
*tver = (*stbMeta)->tversion;
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname); ctgDebug("Got sver %d tver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tver, *tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -227,6 +227,7 @@ typedef struct SQWorkerMgmt { ...@@ -227,6 +227,7 @@ typedef struct SQWorkerMgmt {
#define QW_ELOG(_param, ...) qError("QW:%p " _param, mgmt, __VA_ARGS__) #define QW_ELOG(_param, ...) qError("QW:%p " _param, mgmt, __VA_ARGS__)
#define QW_DLOG(_param, ...) qDebug("QW:%p " _param, mgmt, __VA_ARGS__) #define QW_DLOG(_param, ...) qDebug("QW:%p " _param, mgmt, __VA_ARGS__)
#define QW_TLOG(_param, ...) qTrace("QW:%p " _param, mgmt, __VA_ARGS__)
#define QW_DUMP(_param, ...) \ #define QW_DUMP(_param, ...) \
do { \ do { \
......
...@@ -1409,7 +1409,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { ...@@ -1409,7 +1409,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWSchStatus *sch = (SQWSchStatus *)pIter; SQWSchStatus *sch = (SQWSchStatus *)pIter;
if (NULL == sch->hbConnInfo.handle) { if (NULL == sch->hbConnInfo.handle) {
uint64_t *sId = taosHashGetKey(pIter, NULL); uint64_t *sId = taosHashGetKey(pIter, NULL);
QW_DLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId); QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
pIter = taosHashIterate(mgmt->schHash, pIter); pIter = taosHashIterate(mgmt->schHash, pIter);
continue; continue;
} }
......
...@@ -557,7 +557,9 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { ...@@ -557,7 +557,9 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
param->nodeEpId.nodeId = addr->nodeId; param->nodeEpId.nodeId = addr->nodeId;
memcpy(&param->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(param->nodeEpId.ep.fqdn, pEp->fqdn);
param->nodeEpId.ep.port = pEp->port;
param->pTrans = pJob->pTrans; param->pTrans = pJob->pTrans;
*pParam = param; *pParam = param;
...@@ -788,7 +790,10 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { ...@@ -788,7 +790,10 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeEpId epId = {0}; SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId; epId.nodeId = addr->nodeId;
memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) { if (NULL == hb) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册