提交 9fb8927c 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last

......@@ -43,6 +43,7 @@ def pre_test(){
cd ${WKC}
git reset --hard
git clean -fxd
rm -rf examples/rust/
git remote prune origin
git fetch
'''
......
......@@ -98,10 +98,9 @@ int32_t create_stream() {
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes =
taos_query(pConn,
"create stream stream1 trigger max_delay 10s into outstb as select _wstart, sum(k) from st1 partition "
"by tbname session(ts, 10s) ");
pRes = taos_query(pConn,
"create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, "
"count(k) from st1 partition by tbname interval(20s) ");
if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1;
......
......@@ -60,6 +60,7 @@ enum {
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DESTROY,
};
typedef enum EStreamType {
......
......@@ -53,6 +53,7 @@ enum {
TASK_SCHED_STATUS__WAITING,
TASK_SCHED_STATUS__ACTIVE,
TASK_SCHED_STATUS__FAILED,
TASK_SCHED_STATUS__DROPPING,
};
enum {
......@@ -127,6 +128,10 @@ typedef struct {
int8_t type;
} SStreamCheckpoint;
typedef struct {
int8_t type;
} SStreamTaskDestroy;
typedef struct {
int8_t type;
SSDataBlock* pBlock;
......@@ -211,7 +216,6 @@ typedef struct {
void* vnode;
FTbSink* tbSinkFunc;
STSchema* pTSchema;
SHashObj* pHash; // groupId to tbuid
} STaskSinkTb;
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
......
......@@ -291,6 +291,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
#define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3)
#define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
......
......@@ -636,6 +636,7 @@ typedef struct {
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
void tFreeStreamObj(SStreamObj* pObj);
typedef struct {
char streamName[TSDB_STREAM_FNAME_LEN];
......
......@@ -34,6 +34,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate);
SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
void mndFreeStb(SStbObj *pStb);
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
......
......@@ -116,6 +116,25 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
return 0;
}
void tFreeStreamObj(SStreamObj *pStream) {
taosMemoryFree(pStream->sql);
taosMemoryFree(pStream->ast);
taosMemoryFree(pStream->physicalPlan);
if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema);
int32_t sz = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t taskSz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < taskSz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
tFreeSStreamTask(pTask);
}
taosArrayDestroy(pLevel);
}
taosArrayDestroy(pStream->tasks);
}
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
if (pVgEpNew == NULL) return NULL;
......
......@@ -424,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
mndAddTaskToTaskSet(taskSourceLevel, pTask);
pTask->triggerParam = 0;
// source
pTask->taskLevel = TASK_LEVEL__SOURCE;
......
......@@ -266,6 +266,15 @@ _OVER:
return pRow;
}
void mndFreeStb(SStbObj *pStb) {
taosArrayDestroy(pStb->pFuncs);
taosMemoryFreeClear(pStb->pColumns);
taosMemoryFreeClear(pStb->pTags);
taosMemoryFreeClear(pStb->comment);
taosMemoryFreeClear(pStb->pAst1);
taosMemoryFreeClear(pStb->pAst2);
}
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb);
return 0;
......@@ -273,12 +282,7 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
taosArrayDestroy(pStb->pFuncs);
taosMemoryFreeClear(pStb->pColumns);
taosMemoryFreeClear(pStb->pTags);
taosMemoryFreeClear(pStb->comment);
taosMemoryFreeClear(pStb->pAst1);
taosMemoryFreeClear(pStb->pAst2);
mndFreeStb(pStb);
return 0;
}
......@@ -2021,8 +2025,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableId != suid) {
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
if (pCol->tableId == suid) {
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
return -1;
......@@ -2045,6 +2048,16 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->smaId != 0) {
sdbRelease(pSdb, pStream);
continue;
}
if (pStream->targetStbUid == suid) {
sdbRelease(pSdb, pStream);
return -1;
}
SNode *pAst = NULL;
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
ASSERT(0);
......@@ -2057,8 +2070,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableId != suid) {
mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
if (pCol->tableId == suid) {
sdbRelease(pSdb, pStream);
nodesDestroyNode(pAst);
return -1;
......
......@@ -167,6 +167,9 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
mTrace("stream:%s, perform delete action", pStream->name);
taosWLockLatch(&pStream->lock);
tFreeStreamObj(pStream);
taosWUnLockLatch(&pStream->lock);
return 0;
}
......@@ -493,10 +496,17 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
stbObj.uid = pStream->targetStbUid;
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
mndFreeStb(&stbObj);
goto _OVER;
}
tFreeSMCreateStbReq(&createReq);
mndFreeStb(&stbObj);
return 0;
_OVER:
tFreeSMCreateStbReq(&createReq);
mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb);
return -1;
......@@ -715,6 +725,7 @@ _OVER:
mndReleaseDb(pMnode, pDb);
tFreeSCMCreateStreamReq(&createStreamReq);
tFreeStreamObj(&streamObj);
return code;
}
......
......@@ -628,8 +628,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
int32_t code = 0;
if (pTask->taskLevel == TASK_LEVEL__AGG) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
}
......@@ -640,8 +638,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask->outputQueue = streamQueueOpen();
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
code = -1;
goto FAIL;
return -1;
}
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
......@@ -686,14 +683,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
streamSetupTrigger(pTask);
tqInfo("deploy stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
tqInfo("expand stream task on vg %d, task id %d, child id %d", TD_VID(pTq->pVnode), pTask->taskId,
pTask->selfChildId);
FAIL:
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
// TODO free executor
return code;
return 0;
}
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
......
......@@ -231,34 +231,35 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, &deleteReq);
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
int32_t code;
int32_t len;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code < 0) {
//
ASSERT(0);
}
SEncoder encoder;
void* buf = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
int32_t code;
int32_t len;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code < 0) {
//
ASSERT(0);
}
SEncoder encoder;
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
((SMsgHead*)buf)->vgId = pVnode->config.vgId;
((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
SRpcMsg msg = {
.msgType = TDMT_VND_BATCH_DEL,
.pCont = buf,
.pCont = serializedDeleteReq,
.contLen = len + sizeof(SMsgHead),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(serializedDeleteReq);
tqDebug("failed to put into write-queue since %s", terrstr());
}
}
......@@ -268,11 +269,12 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
// build write msg
SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT,
.pCont = pReq,
.contLen = ntohl(pReq->length),
.pCont = submitReq,
.contLen = ntohl(submitReq->length),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(submitReq);
tqDebug("failed to put into write-queue since %s", terrstr());
}
}
......@@ -32,7 +32,6 @@ typedef struct {
static SStreamGlobalEnv streamEnv;
int32_t streamExec(SStreamTask* pTask);
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
int32_t streamDispatch(SStreamTask* pTask);
......
......@@ -185,7 +185,9 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
tFreeStreamDispatchReq(pReq);
if (exec) {
streamTryExec(pTask);
if (streamTryExec(pTask) < 0) {
return -1;
}
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
......@@ -221,7 +223,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
}
int32_t streamProcessRunReq(SStreamTask* pTask) {
streamTryExec(pTask);
if (streamTryExec(pTask) < 0) {
return -1;
}
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
......
......@@ -15,6 +15,7 @@
#include "executor.h"
#include "tstream.h"
#include "ttimer.h"
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
......@@ -99,16 +100,19 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char*
goto FAIL;
}
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
goto FAIL;
}
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) {
taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t));
ASSERT(0);
return -1;
goto FAIL;
}
return 0;
FAIL:
if (pTask) taosMemoryFree(pTask);
if (pTask) tFreeSStreamTask(pTask);
return -1;
}
......@@ -158,11 +162,28 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask* pTask = *ppTask;
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
}
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
/*return -1;*/
if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
/*return -1;*/
}
if (pTask->triggerParam != 0) {
taosTmrStop(pTask->timer);
}
while (1) {
int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
tFreeSStreamTask(pTask);
break;
} else if (schedStatus == TASK_SCHED_STATUS__DROPPING) {
break;
}
taosMsleep(10);
}
}
return 0;
}
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstream.h"
#include "streamInc.h"
SStreamQueue* streamQueueOpen() {
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
......@@ -36,9 +36,12 @@ void streamQueueClose(SStreamQueue* queue) {
while (1) {
void* qItem = streamQueueNextItem(queue);
if (qItem) {
taosFreeQitem(qItem);
streamFreeQitem(qItem);
} else {
return;
break;
}
}
taosFreeQall(queue->qall);
taosCloseQueue(queue->queue);
taosMemoryFree(queue);
}
......@@ -152,9 +152,17 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
void tFreeSStreamTask(SStreamTask* pTask) {
streamQueueClose(pTask->inputQueue);
streamQueueClose(pTask->outputQueue);
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
taosArrayDestroy(pTask->childEpInfo);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema);
}
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
}
taosMemoryFree(pTask);
}
......@@ -293,6 +293,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first")
// mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册