提交 1404adcc 编写于 作者: S slzhou

Merge branch '3.0' of github.com:taosdata/TDengine into szhou/fix/leave

......@@ -11,7 +11,7 @@ import TabItem from "@theme/TabItem";
:::
TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包。
TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包。也支持通过 `apt-get` 工具从线上进行安装。
## 安装
......@@ -293,4 +293,4 @@ taos> select avg(current), max(voltage), min(phase) from test.meters where group
```sql
taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s);
```
\ No newline at end of file
```
......@@ -25,6 +25,7 @@ TDengine 采用 SQL 作为查询语言。应用程序可以通过 REST API 或
- 单列、多列数据查询
- 标签和数值的多种过滤条件:>, <, =, <\>, like 等
- 聚合结果的分组(Group by)、排序(Order by)、约束输出(Limit/Offset)
- 时间窗口(Interval)、会话窗口(Session)和状态窗口(State_window)等窗口切分聚合查询
- 数值列及聚合结果的四则运算
- 时间戳对齐的连接查询(Join Query: 隐式连接)操作
- 多种聚合/计算函数: count, max, min, avg, sum, twa, stddev, leastsquares, top, bottom, first, last, percentile, apercentile, last_row, spread, diff 等
......@@ -40,7 +41,7 @@ taos> select * from d1001 where voltage > 215 order by ts desc limit 2;
Query OK, 2 row(s) in set (0.001100s)
```
为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。TDengine 还支持连续查询。
为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。
具体的查询语法请看 [TAOS SQL 的数据查询](/taos-sql/select) 章节。
......@@ -73,7 +74,7 @@ taos> SELECT count(*), max(current) FROM meters where groupId = 2 and ts > now -
Query OK, 1 row(s) in set (0.002136s)
```
TDengine 仅容许对属于同一个超级表的表之间进行聚合查询,不同超级表之间的聚合查询不支持。在 [TAOS SQL 的数据查询](/taos-sql/select) 一章,查询类操作都会注明是否支持超级表。
在 [TAOS SQL 的数据查询](/taos-sql/select) 一章,查询类操作都会注明是否支持超级表。
## 降采样查询、插值
......
......@@ -6,11 +6,11 @@ title: 数据订阅
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 可以是一张超级表,或一张子表。不仅如此,你可以通过标签、表名、列、表达式等多种方法过滤所需数据,并且支持对数据进行函数变换、预处理(包括标量udf计算)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤交给 TDengine,而不是应用完成,有效的减少传输的数据量
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程分布式的消费数据,提高数据通吐率。但不同消费者组即使消费同一个topic, 并不共享消费进度。一个消费者组可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的vnode上,也就是多个shard上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保at least once消费。
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保 at least once 消费。
为了实现上述功能,TDengine 采用了灵活的 WAL (Write-Ahead-Log) 文件切换与保留机制:可以按照时间或文件大小来保留WAL文件(详见create database语句)。在消费时,TDengine 从 WAL 中获取数据,并经过过滤、变换等操作,将数据推送给消费者。
为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
......
......@@ -104,6 +104,7 @@ typedef struct SDataBlockInfo {
uint32_t capacity;
// TODO: optimize and remove following
int64_t version; // used for stream, and need serialization
int64_t ts; // used for stream, and need serialization
int32_t childId; // used for stream, do not serialize
EStreamType type; // used for stream, do not serialize
STimeWindow calWin; // used for stream, do not serialize
......
......@@ -3103,7 +3103,7 @@ typedef struct {
void* msg;
} SBatchRsp;
static FORCE_INLINE void tFreeSBatchRsp(void *p) {
static FORCE_INLINE void tFreeSBatchRsp(void* p) {
if (NULL == p) {
return;
}
......
......@@ -17,6 +17,7 @@
#include "os.h"
#include "query.h"
#include "tdatablock.h"
#include "tdbInt.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tqueue.h"
......@@ -85,6 +86,12 @@ enum {
TASK_OUTPUT__FETCH,
};
enum {
STREAM_QUEUE__SUCESS = 1,
STREAM_QUEUE__FAILED,
STREAM_QUEUE__PROCESSING,
};
typedef struct {
int8_t type;
} SStreamQueueItem;
......@@ -123,12 +130,6 @@ typedef struct {
SSDataBlock* pBlock;
} SStreamTrigger;
enum {
STREAM_QUEUE__SUCESS = 1,
STREAM_QUEUE__FAILED,
STREAM_QUEUE__PROCESSING,
};
typedef struct {
STaosQueue* queue;
STaosQall* qall;
......@@ -233,6 +234,7 @@ typedef struct {
typedef struct SStreamTask {
int64_t streamId;
int32_t taskId;
int32_t totalLevel;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
......@@ -458,9 +460,20 @@ int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);
typedef struct SStreamMeta SStreamMeta;
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask);
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pStateDb;
SHashObj* pTasks;
void* ahandle;
TXN txn;
FTaskExpand* expandFunc;
} SStreamMeta;
SStreamMeta* streamMetaOpen();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
......
......@@ -286,6 +286,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
if (pInst == NULL || NULL == *pInst) {
taosThreadMutexUnlock(&appInfo.mutex);
tscError("cluster not exist, key:%s", key);
taosMemoryFree(pMsg->pData);
tFreeClientHbBatchRsp(&pRsp);
return -1;
}
......
......@@ -1007,7 +1007,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pParam);
if (code != 0) {
tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code);
if (pMsg->pData) taosMemoryFree(pMsg->pData);
if (pMsg->pData) taosMemoryFreeClear(pMsg->pData);
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) {
......@@ -1699,7 +1699,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
tmq_list_destroy(lst);
return rsp;
/*return rsp;*/
return 0;
}
// TODO: free resources
return 0;
......
......@@ -604,11 +604,11 @@ typedef struct {
int64_t createTime;
int64_t updateTime;
int32_t version;
int32_t totalLevel;
int64_t smaId; // 0 for unused
// info
int64_t uid;
int8_t status;
int8_t isDistributed;
// config
int8_t igExpired;
int8_t trigger;
......@@ -647,7 +647,6 @@ typedef struct {
typedef struct {
int64_t uid;
int64_t streamId;
int8_t isDistributed;
int8_t status;
int8_t stage;
} SStreamRecoverObj;
......
......@@ -23,11 +23,11 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->isDistributed) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
......@@ -69,11 +69,11 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->isDistributed) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
......
......@@ -307,10 +307,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
ASSERT(totLevel <= 2);
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
pStream->isDistributed = totLevel == 2;
int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
ASSERT(planTotLevel <= 2);
pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*));
bool hasExtraSink = false;
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
......@@ -320,7 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
if (totLevel == 2 || externalTargetDB || multiTarget) {
if (planTotLevel == 2 || externalTargetDB || multiTarget) {
/*if (true) {*/
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
......@@ -338,8 +337,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
}
}
pStream->totalLevel = planTotLevel + hasExtraSink;
if (totLevel > 1) {
if (planTotLevel > 1) {
SStreamTask* pInnerTask;
// inner level
{
......@@ -371,13 +371,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
#if 0
SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
ASSERT(pDbObj != NULL);
sdbRelease(pSdb, pSourceDb);
pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups;
#endif
if (tsSchedStreamToSnode) {
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
if (pSnode == NULL) {
......@@ -464,7 +457,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
}
if (totLevel == 1) {
if (planTotLevel == 1) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
......
......@@ -36,7 +36,7 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);
/*static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);*/
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
......@@ -55,7 +55,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);
/*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
......@@ -540,6 +540,7 @@ static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
#if 0
int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
if (pStream->isDistributed) {
int32_t lv = taosArrayGetSize(pStream->tasks);
......@@ -573,6 +574,7 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
}
return 0;
}
#endif
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
int32_t lv = taosArrayGetSize(pStream->tasks);
......@@ -755,6 +757,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
#if 0
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
......@@ -817,6 +820,7 @@ static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
#endif
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
......
......@@ -115,12 +115,19 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow);
int64_t version = TSDBROW_VERSION(&row);
tsdbTrace("vgId:%d, vnode snapshot tsdb read for %s, %" PRId64 "(%" PRId64 " , %" PRId64 ")",
TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path, version, pReader->sver, pReader->ever);
if (version < pReader->sver || version > pReader->ever) continue;
code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL);
if (code) goto _err;
}
if (pReader->nBlockData.nRow <= 0) {
continue;
}
// org data
// compress data (todo)
int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData);
......@@ -808,7 +815,8 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
if (code) goto _err;
_exit:
tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path);
tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path);
return code;
_err:
......
......@@ -722,10 +722,12 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
bool vnodeIsLeader(SVnode *pVnode) {
if (!syncIsReady(pVnode->sync)) {
vDebug("vgId:%d, vnode not ready", pVnode->config.vgId);
return false;
}
if (!pVnode->restored) {
vDebug("vgId:%d, vnode not restored", pVnode->config.vgId);
terrno = TSDB_CODE_APP_NOT_READY;
return false;
}
......
......@@ -467,6 +467,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (NULL == taosArrayPush(newBatch.pMsgs, &req)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
msg = NULL;
if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
......@@ -517,6 +518,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (NULL == taosArrayPush(pBatch->pMsgs, &req)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
msg = NULL;
if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
......@@ -545,7 +547,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
}
tNameGetFullDbName(pName, newBatch.dbFName);
tNameGetFullDbName(pName, pBatch->dbFName);
}
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId,
......
......@@ -438,6 +438,14 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
}
}
void ctgFreeTbMetasMsgCtx(SCtgMsgCtx* pCtx) {
ctgFreeMsgCtx(pCtx);
if (pCtx->lastOut) {
ctgFreeSTableMetaOutput((STableMetaOutput*)pCtx->lastOut);
pCtx->lastOut = NULL;
}
}
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput) {
if (NULL == pOutput) {
return;
......@@ -641,7 +649,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
taosArrayDestroy(taskCtx->pFetchs);
// NO NEED TO FREE pNames
taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx);
taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeTbMetasMsgCtx);
if (pTask->msgCtx.lastOut) {
ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut);
......
......@@ -8,7 +8,8 @@ target_include_directories(
target_link_libraries(
stream
PRIVATE os util transport qcom executor tdb
PUBLIC tdb
PRIVATE os util transport qcom executor
)
if(${BUILD_TEST})
......
......@@ -14,22 +14,8 @@
*/
#include "executor.h"
#include "tdbInt.h"
#include "tstream.h"
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask);
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pStateDb;
SHashObj* pTasks;
void* ahandle;
TXN txn;
FTaskExpand* expandFunc;
} SStreamMeta;
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) {
......@@ -150,7 +136,7 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) {
return 0;
}
int32_t streamRestoreTask(SStreamMeta* pMeta) {
int32_t streamLoadTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
ASSERT(0);
......
......@@ -87,53 +87,80 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp
return 0;
}
int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
#if 0
if (pTask->taskStatus != TASK_STATUS__FAIL) {
return 0;
typedef struct {
int32_t vgId;
int32_t childId;
int64_t ver;
} SStreamVgVerCheckpoint;
int32_t tEncodeSStreamVgVerCheckpoint(SEncoder* pEncoder, const SStreamVgVerCheckpoint* pCheckpoint) {
if (tEncodeI32(pEncoder, pCheckpoint->vgId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->ver) < 0) return -1;
return 0;
}
int32_t tDecodeSStreamVgVerCheckpoint(SDecoder* pDecoder, SStreamVgVerCheckpoint* pCheckpoint) {
if (tDecodeI32(pDecoder, &pCheckpoint->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->ver) < 0) return -1;
return 0;
}
typedef struct {
int64_t streamId;
int64_t checkTs;
int64_t checkpointId;
int32_t taskId;
SArray* checkpointVer; // SArray<SStreamVgCheckpointVer>
} SStreamAggVerCheckpoint;
int32_t tEncodeSStreamAggVerCheckpoint(SEncoder* pEncoder, const SStreamAggVerCheckpoint* pCheckpoint) {
if (tEncodeI64(pEncoder, pCheckpoint->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->checkTs) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->taskId) < 0) return -1;
int32_t sz = taosArrayGetSize(pCheckpoint->checkpointVer);
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SStreamVgVerCheckpoint* pOneVgCkpoint = taosArrayGet(pCheckpoint->checkpointVer, i);
if (tEncodeSStreamVgVerCheckpoint(pEncoder, pOneVgCkpoint) < 0) return -1;
}
return 0;
}
if (pTask->isStreamDistributed) {
if (pTask->taskType == TASK_TYPE__SOURCE) {
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
} else if (pTask->taskType != TASK_TYPE__SINK) {
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER;
bool hasCheckpoint = false;
int32_t childSz = taosArrayGetSize(pTask->childEpInfo);
for (int32_t i = 0; i < childSz; i++) {
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
if (pEpInfo->checkpointVer == -1) {
hasCheckpoint = true;
break;
}
}
if (hasCheckpoint) {
// load from checkpoint
} else {
// recover child
}
}
} else {
if (pTask->taskType == TASK_TYPE__SOURCE) {
if (pTask->checkpointVer != -1) {
// load from checkpoint
} else {
// reset stream query task info
// TODO get snapshot ver
pTask->recoverSnapVer = -1;
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
pTask->taskStatus = TASK_STATUS__RECOVERING;
}
}
int32_t tDecodeSStreamAggVerCheckpoint(SDecoder* pDecoder, SStreamAggVerCheckpoint* pCheckpoint) {
if (tDecodeI64(pDecoder, &pCheckpoint->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->checkTs) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->taskId) < 0) return -1;
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SStreamVgVerCheckpoint oneVgCheckpoint;
if (tDecodeSStreamVgVerCheckpoint(pDecoder, &oneVgCheckpoint) < 0) return -1;
taosArrayPush(pCheckpoint->checkpointVer, &oneVgCheckpoint);
}
return 0;
}
if (pTask->taskStatus == TASK_STATUS__RECOVERING) {
if (streamPipelineExec(pTask, 100) < 0) {
// set fail
return -1;
}
int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
// load status
void* pVal = NULL;
int32_t vLen = 0;
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, pVal, vLen);
SStreamAggVerCheckpoint aggCheckpoint;
tDecodeSStreamAggVerCheckpoint(&decoder, &aggCheckpoint);
/*pTask->*/
return 0;
}
#endif
int32_t streamRecoverTask(SStreamTask* pTask) {
//
return 0;
}
......@@ -52,6 +52,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
......@@ -62,7 +63,6 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
/*if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1;*/
int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
......@@ -101,6 +101,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
......@@ -111,7 +112,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
/*if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1;*/
int32_t epSz;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
......
......@@ -2668,7 +2668,22 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
syncNodeEventLog(ths, "I am not follower, can not do leader transfer");
return 0;
}
syncNodeEventLog(ths, "do leader transfer");
if (!ths->restoreFinish) {
syncNodeEventLog(ths, "restore not finish, can not do leader transfer");
return 0;
}
if (ths->vgId > 1) {
syncNodeEventLog(ths, "I am vnode, can not do leader transfer");
return 0;
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "do leader transfer, index:%ld", pEntry->index);
syncNodeEventLog(ths, logBuf);
} while (0);
bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
......
......@@ -482,6 +482,7 @@ void cliReadTimeoutCb(uv_timer_t* handle) {
// set up timeout cb
SCliConn* conn = handle->data;
tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
uv_read_stop(conn->stream);
cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
}
......@@ -993,6 +994,8 @@ static void cliAsyncCb(uv_async_t* handle) {
if (count >= 2) {
tTrace("cli process batch size:%d", count);
}
// if (!uv_is_active((uv_handle_t*)pThrd->prepare)) uv_prepare_start(pThrd->prepare, cliPrepareCb);
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
}
static void cliPrepareCb(uv_prepare_t* handle) {
......@@ -1088,7 +1091,7 @@ static SCliThrd* createThrdObj() {
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
pThrd->prepare->data = pThrd;
uv_prepare_start(pThrd->prepare, cliPrepareCb);
// uv_prepare_start(pThrd->prepare, cliPrepareCb);
int32_t timerSize = 512;
pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
......@@ -1125,7 +1128,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(timer);
}
taosArrayDestroy(pThrd->timerList);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册