提交 96328779 编写于 作者: L Liu Jicong

refactor(mnode): drop stream task

上级 853e6e29
......@@ -216,7 +216,6 @@ typedef struct {
void* vnode;
FTbSink* tbSinkFunc;
STSchema* pTSchema;
SHashObj* pHash; // groupId to tbuid
} STaskSinkTb;
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
......
......@@ -2044,7 +2044,12 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->smaId == 0 && pStream->targetStbUid == suid) {
if (pStream->smaId != 0) {
sdbRelease(pSdb, pStream);
continue;
}
if (pStream->targetStbUid == suid) {
sdbRelease(pSdb, pStream);
return -1;
}
......
......@@ -231,34 +231,35 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, &deleteReq);
SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
int32_t code;
int32_t len;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code < 0) {
//
ASSERT(0);
}
SEncoder encoder;
void* buf = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
int32_t code;
int32_t len;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code < 0) {
//
ASSERT(0);
}
SEncoder encoder;
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
((SMsgHead*)buf)->vgId = pVnode->config.vgId;
((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
SRpcMsg msg = {
.msgType = TDMT_VND_BATCH_DEL,
.pCont = buf,
.pCont = serializedDeleteReq,
.contLen = len + sizeof(SMsgHead),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(serializedDeleteReq);
tqDebug("failed to put into write-queue since %s", terrstr());
}
}
......@@ -268,11 +269,12 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
// build write msg
SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT,
.pCont = pReq,
.contLen = ntohl(pReq->length),
.pCont = submitReq,
.contLen = ntohl(submitReq->length),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(submitReq);
tqDebug("failed to put into write-queue since %s", terrstr());
}
}
......@@ -15,6 +15,7 @@
#include "executor.h"
#include "tstream.h"
#include "ttimer.h"
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
......@@ -166,13 +167,20 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
/*return -1;*/
}
if (pTask->triggerParam != 0) {
taosTmrStop(pTask->timer);
}
while (1) {
int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
tFreeSStreamTask(pTask);
break;
} else if (schedStatus == TASK_SCHED_STATUS__DROPPING) {
break;
}
taosMsleep(10);
}
}
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstream.h"
#include "streamInc.h"
SStreamQueue* streamQueueOpen() {
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
......@@ -36,11 +36,12 @@ void streamQueueClose(SStreamQueue* queue) {
while (1) {
void* qItem = streamQueueNextItem(queue);
if (qItem) {
taosFreeQitem(qItem);
streamFreeQitem(qItem);
} else {
break;
}
}
taosFreeQall(queue->qall);
taosCloseQueue(queue->queue);
taosMemoryFree(queue);
}
......@@ -156,5 +156,13 @@ void tFreeSStreamTask(SStreamTask* pTask) {
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg);
if (pTask->exec.executor) qDestroyTask(pTask->exec.executor);
taosArrayDestroy(pTask->childEpInfo);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema);
}
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
}
taosMemoryFree(pTask);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册