提交 f4571ce0 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/hzcheng_3.0

...@@ -108,7 +108,7 @@ if (async) { ...@@ -108,7 +108,7 @@ if (async) {
} }
``` ```
In the above sample code in the else condition, there is an infinite loop. Each time carriage return is entered `taos_consume` is invoked. The return value of `taos_consume` is the selected result set. In the above sample, `print_result` is used to simplify the printing of the result set. Below is the implementation of `print_result`. In the above sample code in the else condition, there is an infinite loop. Each time carriage return is entered `taos_consume` is invoked. The return value of `taos_consume` is the selected result set. In the above sample, `print_result` is used to simplify the printing of the result set. It is similar to `taos_use_result`. Below is the implementation of `print_result`.
```c ```c
void print_result(TAOS_RES* res, int blockFetch) { void print_result(TAOS_RES* res, int blockFetch) {
......
...@@ -4,15 +4,15 @@ title: Cache ...@@ -4,15 +4,15 @@ title: Cache
description: "The latest row of each table is kept in cache to provide high performance query of latest state." description: "The latest row of each table is kept in cache to provide high performance query of latest state."
--- ---
The cache management policy in TDengine is First-In-First-Out (FIFO), which is also known as insert driven cache management policy and different from read driven cache management, i.e. Least-Recent-Used (LRU). It simply stores the latest data in cache and flushes the oldest data in cache to disk when the cache usage reaches a threshold. In IoT use cases, the most cared about data is the latest data, i.e. current state. The cache policy in TDengine is based the nature of IoT data. The cache management policy in TDengine is First-In-First-Out (FIFO). FIFO is also known as insert driven cache management policy and it is different from read driven cache management, which is more commonly known as Least-Recently-Used (LRU). FIFO simply stores the latest data in cache and flushes the oldest data in cache to disk, when the cache usage reaches a threshold. In IoT use cases, it is the current state i.e. the latest or most recent data that is important. The cache policy in TDengine, like much of the design and architecture of TDengine, is based on the nature of IoT data.
Caching the latest data provides the capability of retrieving data in milliseconds. With this capability, TDengine can be configured properly to be used as caching system without deploying another separate caching system to simplify the system architecture and minimize the operation cost. The cache will be emptied after TDengine is restarted, TDengine doesn't reload data from disk into cache like a real key-value caching system. Caching the latest data provides the capability of retrieving data in milliseconds. With this capability, TDengine can be configured properly to be used as a caching system without deploying another separate caching system. This simplifies the system architecture and minimizes operational costs. The cache is emptied after TDengine is restarted. TDengine does not reload data from disk into cache, like a key-value caching system.
The memory space used by TDengine cache is fixed in size, according to the configuration based on application requirement and system resources. Independent memory pool is allocated for and managed by each vnode (virtual node) in TDengine, there is no sharing of memory pools between vnodes. All the tables belonging to a vnode share all the cache memory of the vnode. The memory space used by the TDengine cache is fixed in size and configurable. It should be allocated based on application requirements and system resources. An independent memory pool is allocated for and managed by each vnode (virtual node) in TDengine. There is no sharing of memory pools between vnodes. All the tables belonging to a vnode share all the cache memory of the vnode.
Memory pool is divided into blocks and data is stored in row format in memory and each block follows FIFO policy. The size of each block is determined by configuration parameter `cache`, the number of blocks for each vnode is determined by `blocks`. For each vnode, the total cache size is `cache * blocks`. A cache block needs to ensure that each table can store at least dozens of records to be efficient. The memory pool is divided into blocks and data is stored in row format in memory and each block follows FIFO policy. The size of each block is determined by configuration parameter `cache` and the number of blocks for each vnode is determined by the parameter `blocks`. For each vnode, the total cache size is `cache * blocks`. A cache block needs to ensure that each table can store at least dozens of records, to be efficient.
`last_row` function can be used to retrieve the last row of a table or a STable to quickly show the current state of devices on monitoring screen. For example the below SQL statement retrieves the latest voltage of all meters in San Francisco of California. `last_row` function can be used to retrieve the last row of a table or a STable to quickly show the current state of devices on monitoring screen. For example the below SQL statement retrieves the latest voltage of all meters in San Francisco, California.
```sql ```sql
select last_row(voltage) from meters where location='California.SanFrancisco'; select last_row(voltage) from meters where location='California.SanFrancisco';
......
...@@ -1664,6 +1664,10 @@ typedef struct { ...@@ -1664,6 +1664,10 @@ typedef struct {
int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq); int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
int32_t tDeserializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq); int32_t tDeserializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
typedef struct {
int8_t reserved;
} SMDropCgroupRsp;
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType; int8_t alterType;
......
...@@ -151,6 +151,7 @@ enum { ...@@ -151,6 +151,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMqConsumerLostMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMqConsumerLostMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "mnode-mq-consumer-recover", SMqConsumerRecoverMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "mnode-mq-consumer-recover", SMqConsumerRecoverMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "mnode-mq-drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL)
......
...@@ -268,6 +268,7 @@ int32_t* taosGetErrno(); ...@@ -268,6 +268,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9) #define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
#define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA) #define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA)
#define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB) #define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB)
#define TSDB_CODE_MND_CGROUP_USED TAOS_DEF_ERROR_CODE(0, 0x03EC)
// mnode-stream // mnode-stream
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0) #define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
......
...@@ -94,6 +94,7 @@ typedef enum { ...@@ -94,6 +94,7 @@ typedef enum {
TRN_TYPE_ALTER_STREAM = 1027, TRN_TYPE_ALTER_STREAM = 1027,
TRN_TYPE_CONSUMER_LOST = 1028, TRN_TYPE_CONSUMER_LOST = 1028,
TRN_TYPE_CONSUMER_RECOVER = 1029, TRN_TYPE_CONSUMER_RECOVER = 1029,
TRN_TYPE_DROP_CGROUP = 1030,
TRN_TYPE_BASIC_SCOPE_END, TRN_TYPE_BASIC_SCOPE_END,
TRN_TYPE_GLOBAL_SCOPE = 2000, TRN_TYPE_GLOBAL_SCOPE = 2000,
......
...@@ -39,6 +39,7 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c ...@@ -39,6 +39,7 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic); int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
int32_t mndDropOffsetBySubKey(SMnode *pMnode, STrans *pTrans, const char *subKey);
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic); bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic);
......
...@@ -33,6 +33,7 @@ int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName ...@@ -33,6 +33,7 @@ int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic); int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -58,6 +58,12 @@ bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic) { ...@@ -58,6 +58,12 @@ bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic) {
return false; return false;
} }
bool mndOffsetFromSubKey(SMqOffsetObj *pOffset, const char *subKey) {
int32_t i = 0;
while (pOffset->key[i] != ':') i++;
if (strcmp(&pOffset->key[i + 1], subKey) == 0) return true;
return false;
}
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) { SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL; void *buf = NULL;
...@@ -303,7 +309,35 @@ int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) ...@@ -303,7 +309,35 @@ int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic)
continue; continue;
} }
if (mndSetDropOffsetRedoLogs(pMnode, pTrans, pOffset) < 0) { if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset);
goto END;
}
sdbRelease(pSdb, pOffset);
}
code = 0;
END:
return code;
}
int32_t mndDropOffsetBySubKey(SMnode *pMnode, STrans *pTrans, const char *subKey) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqOffsetObj *pOffset = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
if (pIter == NULL) break;
if (!mndOffsetFromSubKey(pOffset, subKey)) {
sdbRelease(pSdb, pOffset);
continue;
}
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset); sdbRelease(pSdb, pOffset);
goto END; goto END;
} }
......
...@@ -42,6 +42,7 @@ static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *); ...@@ -42,6 +42,7 @@ static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub); static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg); static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg);
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SRpcMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SRpcMsg *pMsg);
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
...@@ -75,6 +76,8 @@ int32_t mndInitSubscribe(SMnode *pMnode) { ...@@ -75,6 +76,8 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP, mndProcessDropCgroupReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
...@@ -581,6 +584,57 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -581,6 +584,57 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
return 0; return 0;
} }
static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
/*SSdb *pSdb = pMnode->pSdb;*/
SMDropCgroupReq dropReq = {0};
if (tDeserializeSMDropCgroupReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic);
if (pSub == NULL) {
if (dropReq.igNotExists) {
mDebug("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
return 0;
} else {
terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr());
return -1;
}
}
if (taosHashGetSize(pSub->consumerHash) == 0) {
terrno = TSDB_CODE_MND_CGROUP_USED;
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_CGROUP, pReq);
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
return -1;
}
mDebug("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
if (mndDropOffsetBySubKey(pMnode, pTrans, pSub->key) < 0) {
ASSERT(0);
return -1;
}
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
return -1;
}
mndReleaseSubscribe(pMnode, pSub);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
void mndCleanupSubscribe(SMnode *pMnode) {} void mndCleanupSubscribe(SMnode *pMnode) {}
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
...@@ -735,7 +789,7 @@ static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscrib ...@@ -735,7 +789,7 @@ static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscrib
return 0; return 0;
} }
static int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册