提交 8548e72d 编写于 作者: G gccgdb1234

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

...@@ -18,7 +18,7 @@ stream_options: { ...@@ -18,7 +18,7 @@ stream_options: {
其中 subquery 是 select 普通查询语法的子集: 其中 subquery 是 select 普通查询语法的子集:
```sql ```sql
subquery: SELECT [DISTINCT] select_list subquery: SELECT select_list
from_clause from_clause
[WHERE condition] [WHERE condition]
[PARTITION BY tag_list] [PARTITION BY tag_list]
...@@ -37,7 +37,7 @@ window_clause: { ...@@ -37,7 +37,7 @@ window_clause: {
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。 其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
窗口的定义与时序数据特色查询中的定义完全相同 窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished)
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。 例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
......
...@@ -9,7 +9,7 @@ import Release from "/components/ReleaseV3"; ...@@ -9,7 +9,7 @@ import Release from "/components/ReleaseV3";
<Release type="tdengine" version="3.0.0.1" /> <Release type="tdengine" version="3.0.0.1" />
## 3.0.0.0 <!-- ## 3.0.0.0
<Release type="tdengine" version="3.0.0.0" /> <Release type="tdengine" version="3.0.0.0" /> -->
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// clang-format off
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
...@@ -94,13 +95,8 @@ int32_t create_stream() { ...@@ -94,13 +95,8 @@ int32_t create_stream() {
} }
taos_free_result(pRes); taos_free_result(pRes);
/*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(pConn, pRes = taos_query(pConn,
"create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, " "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");
"count(k) from st1 partition by tbname interval(20s) ");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -44,6 +44,30 @@ enum { ...@@ -44,6 +44,30 @@ enum {
) )
// clang-format on // clang-format on
typedef struct {
TSKEY ts;
uint64_t groupId;
} SWinKey;
static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SWinKey* pWin1 = (SWinKey*)pKey1;
SWinKey* pWin2 = (SWinKey*)pKey2;
if (pWin1->groupId > pWin2->groupId) {
return 1;
} else if (pWin1->groupId < pWin2->groupId) {
return -1;
}
if (pWin1->ts > pWin2->ts) {
return 1;
} else if (pWin1->ts < pWin2->ts) {
return -1;
}
return 0;
}
enum { enum {
TMQ_MSG_TYPE__DUMMY = 0, TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__POLL_RSP,
......
...@@ -29,7 +29,7 @@ typedef void* DataSinkHandle; ...@@ -29,7 +29,7 @@ typedef void* DataSinkHandle;
struct SRpcMsg; struct SRpcMsg;
struct SSubplan; struct SSubplan;
typedef struct SReadHandle { typedef struct {
void* tqReader; void* tqReader;
void* meta; void* meta;
void* config; void* config;
...@@ -41,6 +41,7 @@ typedef struct SReadHandle { ...@@ -41,6 +41,7 @@ typedef struct SReadHandle {
bool initTableReader; bool initTableReader;
bool initTqReader; bool initTqReader;
int32_t numOfVgroups; int32_t numOfVgroups;
void* pStateBackend;
} SReadHandle; } SReadHandle;
// in queue mode, data streams are seperated by msg // in queue mode, data streams are seperated by msg
...@@ -78,8 +79,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO ...@@ -78,8 +79,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
/** /**
* @brief Cleanup SSDataBlock for StreamScanInfo * @brief Cleanup SSDataBlock for StreamScanInfo
* *
* @param tinfo * @param tinfo
*/ */
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo); void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo);
...@@ -163,7 +164,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t ...@@ -163,7 +164,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len); int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
......
...@@ -263,6 +263,14 @@ typedef struct { ...@@ -263,6 +263,14 @@ typedef struct {
SArray* checkpointVer; SArray* checkpointVer;
} SStreamRecoveringState; } SStreamRecoveringState;
// incremental state storage
typedef struct {
SStreamTask* pOwner;
TDB* db;
TTB* pStateDb;
TXN txn;
} SStreamState;
typedef struct SStreamTask { typedef struct SStreamTask {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
...@@ -312,6 +320,10 @@ typedef struct SStreamTask { ...@@ -312,6 +320,10 @@ typedef struct SStreamTask {
// msg handle // msg handle
SMsgCb* pMsgCb; SMsgCb* pMsgCb;
// state backend
SStreamState* pState;
} SStreamTask; } SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
...@@ -507,7 +519,7 @@ typedef struct SStreamMeta { ...@@ -507,7 +519,7 @@ typedef struct SStreamMeta {
char* path; char* path;
TDB* db; TDB* db;
TTB* pTaskDb; TTB* pTaskDb;
TTB* pStateDb; TTB* pCheckpointDb;
SHashObj* pTasks; SHashObj* pTasks;
SHashObj* pRecoverStatus; SHashObj* pRecoverStatus;
void* ahandle; void* ahandle;
...@@ -528,6 +540,37 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); ...@@ -528,6 +540,37 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta);
SStreamState* streamStateOpen(char* path, SStreamTask* pTask);
void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);
int32_t streamStateAbort(SStreamState* pState);
typedef struct {
TBC* pCur;
} SStreamStateCur;
#if 1
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
void streamFreeVal(void* val);
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key);
void streamStateFreeCur(SStreamStateCur* pCur);
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -76,7 +76,7 @@ bool tsMonitorComp = false; ...@@ -76,7 +76,7 @@ bool tsMonitorComp = false;
// telem // telem
bool tsEnableTelem = true; bool tsEnableTelem = true;
int32_t tsTelemInterval = 86400; int32_t tsTelemInterval = 43200;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
uint16_t tsTelemPort = 80; uint16_t tsTelemPort = 80;
......
...@@ -131,7 +131,9 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) { ...@@ -131,7 +131,9 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
char* pCont = mndBuildTelemetryReport(pMnode); char* pCont = mndBuildTelemetryReport(pMnode);
if (pCont != NULL) { if (pCont != NULL) {
if (taosSendHttpReport(tsTelemServer, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) { if (taosSendHttpReport(tsTelemServer, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) {
mError("failed to send telemetry msg"); mError("failed to send telemetry report");
} else {
mTrace("succeed to send telemetry report");
} }
taosMemoryFree(pCont); taosMemoryFree(pCont);
} }
......
...@@ -1308,7 +1308,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1308,7 +1308,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
if (pTrans->policy == TRN_POLICY_ROLLBACK) { if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->lastAction != 0) { if (pTrans->lastAction != 0) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->lastAction); STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->lastAction);
if (pAction->retryCode != 0 && pAction->retryCode != pAction->errCode) { if (pAction->retryCode != 0 && pAction->retryCode == pAction->errCode) {
if (pTrans->failedTimes < 6) { if (pTrans->failedTimes < 6) {
mError("trans:%d, stage keep on redoAction since action:%d code:0x%x not 0x%x, failedTimes:%d", pTrans->id, mError("trans:%d, stage keep on redoAction since action:%d code:0x%x not 0x%x, failedTimes:%d", pTrans->id,
pTrans->lastAction, pTrans->code, pAction->retryCode, pTrans->failedTimes); pTrans->lastAction, pTrans->code, pAction->retryCode, pTrans->failedTimes);
......
...@@ -5,7 +5,9 @@ target_link_libraries( ...@@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut PUBLIC sut
) )
add_test( if(NOT ${TD_WINDOWS})
NAME smaTest add_test(
COMMAND smaTest NAME smaTest
) COMMAND smaTest
)
endif(NOT ${TD_WINDOWS})
...@@ -5,7 +5,9 @@ target_link_libraries( ...@@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut PUBLIC sut
) )
add_test( if(NOT ${TD_WINDOWS})
NAME stbTest add_test(
COMMAND stbTest NAME stbTest
) COMMAND stbTest
\ No newline at end of file )
endif(NOT ${TD_WINDOWS})
\ No newline at end of file
...@@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
ASSERT(0); ASSERT(0);
} }
if (streamLoadTasks(pTq->pStreamMeta) < 0) {
ASSERT(0);
}
return pTq; return pTq;
} }
...@@ -648,17 +652,28 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ...@@ -648,17 +652,28 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
// expand executor // expand executor
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
if (pTask->pState == NULL) {
return -1;
}
SReadHandle handle = { SReadHandle handle = {
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pTq->pVnode,
.initTqReader = 1, .initTqReader = 1,
.pStateBackend = pTask->pState,
}; };
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor); ASSERT(pTask->exec.executor);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) { } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
if (pTask->pState == NULL) {
return -1;
}
SReadHandle mgHandle = { SReadHandle mgHandle = {
.vnode = NULL, .vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
.pStateBackend = pTask->pState,
}; };
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
ASSERT(pTask->exec.executor); ASSERT(pTask->exec.executor);
......
...@@ -150,6 +150,7 @@ typedef struct { ...@@ -150,6 +150,7 @@ typedef struct {
SQueryTableDataCond tableCond; SQueryTableDataCond tableCond;
int64_t recoverStartVer; int64_t recoverStartVer;
int64_t recoverEndVer; int64_t recoverEndVer;
SStreamState* pState;
} SStreamTaskInfo; } SStreamTaskInfo;
typedef struct { typedef struct {
......
...@@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { ...@@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
pCtx->input.colDataAggIsSet = pStatus->hasAgg; pCtx->input.colDataAggIsSet = pStatus->hasAgg;
pCtx->input.numOfRows = pStatus->numOfRows; pCtx->input.numOfRows = pStatus->numOfRows;
pCtx->input.startRowIndex = pStatus->startOffset; pCtx->input.startRowIndex = pStatus->startOffset;
} }
...@@ -3715,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t ...@@ -3715,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) { const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode); SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey); STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order); w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
...@@ -3988,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -3988,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
bool assignUid = groupbyTbname(group); bool assignUid = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if(assignUid){ if (assignUid) {
for (int32_t i = 0; i < numOfTables; i++) { for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = info->uid; info->groupId = info->uid;
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
} }
}else{ } else {
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -4615,6 +4615,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4615,6 +4615,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete; goto _complete;
} }
if (pHandle && pHandle->pStateBackend) {
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
}
(*pTaskInfo)->sql = sql; (*pTaskInfo)->sql = sql;
sql = NULL; sql = NULL;
(*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pSubplan = pPlan;
......
...@@ -1281,6 +1281,42 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1281,6 +1281,42 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
#if 0
SStreamState* pState = pTaskInfo->streamInfo.pState;
if (pState) {
printf(">>>>>>>> stream write backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char tmp[100] = "abcdefg1";
if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
ASSERT(0);
}
key.ts = 2;
char tmp2[100] = "abcdefg2";
if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
ASSERT(0);
}
key.groupId = 5;
key.ts = 1;
char tmp3[100] = "abcdefg3";
if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
ASSERT(0);
}
char* val2 = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val2, sz);
streamFreeVal(val2);
}
#endif
qDebug("stream scan called"); qDebug("stream scan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) { while (1) {
......
...@@ -358,7 +358,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -358,7 +358,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
FAIL_SHUFFLE_DISPATCH: FAIL_SHUFFLE_DISPATCH:
if (pReqs) { if (pReqs) {
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < vgSz; i++) {
taosArrayDestroy(pReqs[i].data); taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
taosArrayDestroy(pReqs[i].dataLen); taosArrayDestroy(pReqs[i].dataLen);
} }
taosMemoryFree(pReqs); taosMemoryFree(pReqs);
......
...@@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) ...@@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
return 0; return 0;
} }
// TODO: handle version
int32_t streamExecForAll(SStreamTask* pTask) { int32_t streamExecForAll(SStreamTask* pTask) {
while (1) { while (1) {
int32_t batchCnt = 1; int32_t batchCnt = 1;
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
*/ */
#include "executor.h" #include "executor.h"
#include "tstream.h" #include "streamInc.h"
#include "ttimer.h" #include "ttimer.h"
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
...@@ -23,17 +23,23 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -23,17 +23,23 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pMeta->path = strdup(path); int32_t len = strlen(path) + 20;
char* streamPath = taosMemoryCalloc(1, len);
sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = strdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
goto _err; goto _err;
} }
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
mkdir(streamPath, 0755);
taosMemoryFree(streamPath);
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
goto _err; goto _err;
} }
// open state storage backend if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) {
if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) {
goto _err; goto _err;
} }
...@@ -49,16 +55,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -49,16 +55,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle; pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
if (streamLoadTasks(pMeta) < 0) {
goto _err;
}
return pMeta; return pMeta;
_err: _err:
if (pMeta->path) taosMemoryFree(pMeta->path); if (pMeta->path) taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db); if (pMeta->db) tdbClose(pMeta->db);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
return NULL; return NULL;
...@@ -67,7 +70,7 @@ _err: ...@@ -67,7 +70,7 @@ _err:
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
tdbCommit(pMeta->db, &pMeta->txn); tdbCommit(pMeta->db, &pMeta->txn);
tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pStateDb); tdbTbClose(pMeta->pCheckpointDb);
tdbClose(pMeta->db); tdbClose(pMeta->db);
void* pIter = NULL; void* pIter = NULL;
......
...@@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea ...@@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea
} }
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
#if 0
void* buf = NULL; void* buf = NULL;
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
...@@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
FAIL: FAIL:
if (buf) taosMemoryFree(buf); if (buf) taosMemoryFree(buf);
return -1; return -1;
#endif
return 0; return 0;
} }
int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
#if 0
void* pVal = NULL; void* pVal = NULL;
int32_t vLen = 0; int32_t vLen = 0;
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) { if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
...@@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
pTask->nextCheckId = aggCheckpoint.checkpointId + 1; pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
pTask->checkpointInfo = aggCheckpoint.checkpointVer; pTask->checkpointInfo = aggCheckpoint.checkpointVer;
#endif
return 0; return 0;
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "streamInc.h"
#include "tcommon.h"
#include "ttimer.h"
SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
char statePath[300];
sprintf(statePath, "%s/%d", path, pTask->taskId);
if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) {
goto _err;
}
// open state storage backend
if (tdbTbOpen("state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pStateDb) < 0) {
goto _err;
}
if (streamStateBegin(pState) < 0) {
goto _err;
}
pState->pOwner = pTask;
return pState;
_err:
if (pState->pStateDb) tdbTbClose(pState->pStateDb);
if (pState->db) tdbClose(pState->db);
taosMemoryFree(pState);
return NULL;
}
void streamStateClose(SStreamState* pState) {
tdbCommit(pState->db, &pState->txn);
tdbTbClose(pState->pStateDb);
tdbClose(pState->db);
taosMemoryFree(pState);
}
int32_t streamStateBegin(SStreamState* pState) {
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
tdbTxnClose(&pState->txn);
return -1;
}
return 0;
}
int32_t streamStateCommit(SStreamState* pState) {
if (tdbCommit(pState->db, &pState->txn) < 0) {
return -1;
}
memset(&pState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamStateAbort(SStreamState* pState) {
if (tdbAbort(pState->db, &pState->txn) < 0) {
return -1;
}
memset(&pState->txn, 0, sizeof(TXN));
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbBegin(pState->db, &pState->txn) < 0) {
return -1;
}
return 0;
}
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn);
}
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
return tdbTbGet(pState->pStateDb, key, sizeof(SWinKey), pVal, pVLen);
}
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
return tdbTbDelete(pState->pStateDb, key, sizeof(SWinKey), &pState->txn);
}
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
int32_t c;
tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
if (c != 0) {
taosMemoryFree(pCur);
return NULL;
}
return pCur;
}
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
const SWinKey* pKTmp = NULL;
int32_t kLen;
if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
return -1;
}
*pKey = *pKTmp;
return 0;
}
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToFirst(pCur->pCur);
}
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToLast(pCur->pCur);
}
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
taosMemoryFree(pCur);
return NULL;
}
if (c > 0) return pCur;
if (tdbTbcMoveToNext(pCur->pCur) < 0) {
taosMemoryFree(pCur);
return NULL;
}
return pCur;
}
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
int32_t c;
if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
taosMemoryFree(pCur);
return NULL;
}
if (c < 0) return pCur;
if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
taosMemoryFree(pCur);
return NULL;
}
return pCur;
}
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToNext(pCur->pCur);
}
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
//
return tdbTbcMoveToPrev(pCur->pCur);
}
void streamStateFreeCur(SStreamStateCur* pCur) {
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur);
}
void streamFreeVal(void* val) { tdbFree(val); }
...@@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) { ...@@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
} }
if (pTask->pState) streamStateClose(pTask->pState);
taosMemoryFree(pTask); taosMemoryFree(pTask);
} }
...@@ -276,7 +276,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -276,7 +276,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
while (transReadComplete(pBuf)) { while (transReadComplete(pBuf)) {
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
if (true == pBuf->invalid || false == uvHandleReq(conn)) { if (true == pBuf->invalid || false == uvHandleReq(conn)) {
tError("%s conn %p read invalid packet", transLabel(pTransInst), conn); tError("%s conn %p read invalid packet, dst: %s, srv: %s", transLabel(pTransInst), conn, conn->dst, conn->src);
destroyConn(conn, true); destroyConn(conn, true);
return; return;
} }
......
...@@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { ...@@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
if (found == NULL) { if (found == NULL) {
// file corrupted, no complete log // file corrupted, no complete log
// TODO delete and search in previous files // TODO delete and search in previous files
ASSERT(0); /*ASSERT(0);*/
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
int code = walSaveMeta(pWal); int code = walSaveMeta(pWal);
if (code < 0) { if (code < 0) {
taosArrayDestroy(actualLog);
return -1; return -1;
} }
} }
......
...@@ -4,7 +4,7 @@ system sh/exec.sh -n dnode1 -s start ...@@ -4,7 +4,7 @@ system sh/exec.sh -n dnode1 -s start
sql connect sql connect
print =============== conflict stb print =============== conflict stb
sql create database db vgroups 1; sql create database db vgroups 4;
sql use db; sql use db;
sql create table stb (ts timestamp, i int) tags (j int); sql create table stb (ts timestamp, i int) tags (j int);
sql_error create table stb using stb tags (1); sql_error create table stb using stb tags (1);
...@@ -16,6 +16,9 @@ sql_error create table ctb (ts timestamp, i int) tags (j int); ...@@ -16,6 +16,9 @@ sql_error create table ctb (ts timestamp, i int) tags (j int);
sql create table ntb (ts timestamp, i int); sql create table ntb (ts timestamp, i int);
sql_error create table ntb (ts timestamp, i int) tags (j int); sql_error create table ntb (ts timestamp, i int) tags (j int);
sql drop table ntb
sql create table ntb (ts timestamp, i int) tags (j int);
sql drop database db sql drop database db
print =============== create database d1 print =============== create database d1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册