diff --git a/cmake/cmake.define b/cmake/cmake.define index 0de3fba0c15361094fb5b55b874b07c4f568788a..0ae4f56f71db237ca08a7a0a10bcbfe99e58b2d6 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -71,8 +71,8 @@ ELSE () ENDIF () IF (${SANITIZER} MATCHES "true") - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3") + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -g3") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -g3") MESSAGE(STATUS "Will compile with Address Sanitizer!") ELSE () SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3") diff --git a/example/src/tstream.c b/example/src/tstream.c index 537bfebededab2d807b2df1a73a6c53ed98a96dd..97ff2886fcb95fcfaca19ba4baf70b64160855ca 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -82,9 +82,7 @@ int32_t create_stream() { /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query( - pConn, - "create stream stream1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k " - "from tu1 interval(10m)"); + pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from tu1 interval(10m)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1604749af8cb8f3073bd4b1ef46e30d45a37ddff..d18f609d543e375eee495f0516aa93a25c649653 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -114,17 +114,12 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) int32_t streamDataBlockEncode(void** buf, const SStreamDataBlock* pOutput); void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput); -typedef struct { - void* inputHandle; - void* executor; -} SStreamRunner; - typedef struct { int8_t parallelizable; char* qmsg; // followings are not applicable to encoder and decoder - int8_t numOfRunners; - SStreamRunner* runners; + void* inputHandle; + void* executor; } STaskExec; typedef struct { @@ -320,17 +315,15 @@ int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input); int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input); int32_t streamDequeueOutput(SStreamTask* pTask, void** output); -int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId); - int32_t streamTaskRun(SStreamTask* pTask); int32_t streamTaskHandleInput(SStreamTask* pTask, void* data); int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb); -int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg); -int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp); -int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); -int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp); +int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pMsg); +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp); +int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg); +int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp); #ifdef __cplusplus } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 7d7c01a870a3b2a07e1fe27ffb56bbc079dfb73b..ec75ffcae1b2b00bf2881b1fbf6e32c5d4f8a481 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -57,9 +57,7 @@ void sndMetaDelete(SStreamMeta *pMeta) { } int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { - for (int i = 0; i < pTask->exec.numOfRunners; i++) { - pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); - } + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index a8a3e4f601ed40bc3d7b2e618da08d1145c84a53..3017523aa9879d94dd8d12cc05dcebc2f8661185 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -173,8 +173,7 @@ typedef struct { } STqExec; struct STQ { - char* path; - // STqMetaStore* tqMeta; + char* path; SHashObj* pushMgr; // consumerId -> STqExec* SHashObj* execs; // subKey -> STqExec SHashObj* pStreamTasks; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1c3819c3d31a652e96068439c6ccbb661875cfcc..24b3f458b1962089e05d8eadcf2b23b177956109 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -123,11 +123,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); -#if 0 -int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); -int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); -#endif -int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data); +int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 25fa716d4e386b596d2777b996d2f3cc09cc20e1..bc9a053a43a352a620de9884be641a37c99842b0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -36,15 +36,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { /*ASSERT(0);*/ /*}*/ -#if 0 - pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, - (FTqDelete)taosMemoryFree, 0); - if (pTq->tqMeta == NULL) { - taosMemoryFree(pTq); - return NULL; - } -#endif - pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -65,48 +56,6 @@ void tqClose(STQ* pTq) { // TODO } -static void tdSRowDemo() { -#define DEMO_N_COLS 3 - - int16_t schemaVersion = 0; - int32_t numOfCols = DEMO_N_COLS; // ts + int - SRowBuilder rb = {0}; - - SSchema schema[DEMO_N_COLS] = { - {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .name = "ts", .bytes = 8, .flags = COL_SMA_ON}, - {.type = TSDB_DATA_TYPE_INT, .colId = 2, .name = "c1", .bytes = 4, .flags = COL_SMA_ON}, - {.type = TSDB_DATA_TYPE_INT, .colId = 3, .name = "c2", .bytes = 4, .flags = COL_SMA_ON}}; - - SSchema* pSchema = schema; - STSchema* pTSChema = tdGetSTSChemaFromSSChema(&pSchema, numOfCols); - - tdSRowInit(&rb, schemaVersion); - tdSRowSetTpInfo(&rb, numOfCols, pTSChema->flen); - int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSChema); - void* row = taosMemoryCalloc(1, maxLen); // make sure the buffer is enough - - // set row buf - tdSRowResetBuf(&rb, row); - - for (int32_t idx = 0; idx < pTSChema->numOfCols; ++idx) { - STColumn* pColumn = pTSChema->columns + idx; - if (idx == 0) { - int64_t tsKey = 1651234567; - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &tsKey, true, pColumn->offset, idx); - } else if (idx == 1) { - int32_t val1 = 10; - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, &val1, true, pColumn->offset, idx); - } else { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, true, pColumn->offset, idx); - } - } - - // print - tdSRowPrint(row, pTSChema, __func__); - - taosMemoryFree(pTSChema); -} - int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { void* pIter = NULL; while (1) { @@ -261,32 +210,20 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ } int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - if (msgType != TDMT_VND_SUBMIT) return 0; + if (msgType == TDMT_VND_SUBMIT) { + if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0; - // make sure msgType == TDMT_VND_SUBMIT - if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) { - return -1; - } - - if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0; + if (tdUpdateExpireWindow(pTq->pVnode->pSma, msg, ver) != 0) { + // TODO error handle + } + void* data = taosMemoryMalloc(msgLen); + if (data == NULL) { + return -1; + } + memcpy(data, msg, msgLen); - void* data = taosMemoryMalloc(msgLen); - if (data == NULL) { - return -1; + tqProcessStreamTrigger(pTq, data); } - memcpy(data, msg, msgLen); - - tqProcessStreamTriggerNew(pTq, data); - -#if 0 - SRpcMsg req = { - .msgType = TDMT_VND_STREAM_TRIGGER, - .pCont = data, - .contLen = msgLen, - }; - - tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req); -#endif return 0; } @@ -685,213 +622,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; } -#if 0 -int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { - SMqPollReq* pReq = pMsg->pCont; - int64_t consumerId = pReq->consumerId; - int64_t fetchOffset; - int64_t blockingTime = pReq->blockingTime; - int32_t reqEpoch = pReq->epoch; - - if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { - fetchOffset = walGetFirstVer(pTq->pWal); - } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { - fetchOffset = walGetLastVer(pTq->pWal); - } else { - fetchOffset = pReq->currentOffset + 1; - } - - tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, - TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); - - SMqPollRspV2 rspV2 = {0}; - rspV2.dataLen = 0; - - STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); - if (pConsumer == NULL) { - vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode)); - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = -1; - tmsgSendRsp(pMsg); - return 0; - } - - int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch); - while (consumerEpoch < reqEpoch) { - consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch); - } - - STqTopic* pTopic = NULL; - int32_t topicSz = taosArrayGetSize(pConsumer->topics); - for (int32_t i = 0; i < topicSz; i++) { - STqTopic* topic = taosArrayGet(pConsumer->topics, i); - // TODO race condition - ASSERT(pConsumer->consumerId == consumerId); - if (strcmp(topic->topicName, pReq->topic) == 0) { - pTopic = topic; - break; - } - } - if (pTopic == NULL) { - vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, - TD_VID(pTq->pVnode)); - pMsg->pCont = NULL; - pMsg->contLen = 0; - pMsg->code = -1; - tmsgSendRsp(pMsg); - return 0; - } - - tqDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, - TD_VID(pTq->pVnode)); - - rspV2.reqOffset = pReq->currentOffset; - rspV2.skipLogNum = 0; - - while (1) { - /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/ - // TODO - consumerEpoch = atomic_load_32(&pConsumer->epoch); - if (consumerEpoch > reqEpoch) { - tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", - consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); - break; - } - SWalReadHead* pHead; - if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) { - // TODO: no more log, set timer to wait blocking time - // if data inserted during waiting, launch query and - // response to user - tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, - TD_VID(pTq->pVnode), fetchOffset); - break; - } - tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, - TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); - /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ - /*pHead = pTopic->pReadhandle->pHead;*/ - if (pHead->msgType == TDMT_VND_SUBMIT) { - SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - qTaskInfo_t task = pTopic->buffer.output[workerId].task; - ASSERT(task); - qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts; - if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(false); - } - if (pDataBlock == NULL) { - /*pos = fetchOffset % TQ_BUFFER_SIZE;*/ - break; - } - - taosArrayPush(pRes, pDataBlock); - } - - if (taosArrayGetSize(pRes) == 0) { - tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, - pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); - fetchOffset++; - rspV2.skipLogNum++; - taosArrayDestroy(pRes); - continue; - } - rspV2.rspOffset = fetchOffset; - - int32_t blockSz = taosArrayGetSize(pRes); - int32_t dataBlockStrLen = 0; - for (int32_t i = 0; i < blockSz; i++) { - SSDataBlock* pBlock = taosArrayGet(pRes, i); - dataBlockStrLen += sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); - } - - void* dataBlockBuf = taosMemoryMalloc(dataBlockStrLen); - if (dataBlockBuf == NULL) { - pMsg->code = -1; - taosMemoryFree(pHead); - } - - rspV2.blockData = dataBlockBuf; - - int32_t pos; - rspV2.blockPos = taosArrayInit(blockSz, sizeof(int32_t)); - for (int32_t i = 0; i < blockSz; i++) { - pos = 0; - SSDataBlock* pBlock = taosArrayGet(pRes, i); - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)dataBlockBuf; - pRetrieve->useconds = 0; - pRetrieve->precision = 0; - pRetrieve->compressed = 0; - pRetrieve->completed = 1; - pRetrieve->numOfRows = htonl(pBlock->info.rows); - blockCompressEncode(pBlock, pRetrieve->data, &pos, pBlock->info.numOfCols, false); - taosArrayPush(rspV2.blockPos, &rspV2.dataLen); - - int32_t totLen = sizeof(SRetrieveTableRsp) + pos; - pRetrieve->compLen = htonl(totLen); - rspV2.dataLen += totLen; - dataBlockBuf = POINTER_SHIFT(dataBlockBuf, totLen); - } - ASSERT(POINTER_DISTANCE(dataBlockBuf, rspV2.blockData) <= dataBlockStrLen); - - int32_t msgLen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2); - void* buf = rpcMallocCont(msgLen); - - ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - ((SMqRspHead*)buf)->epoch = pReq->epoch; - ((SMqRspHead*)buf)->consumerId = consumerId; - - void* msgBodyBuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqPollRspV2(&msgBodyBuf, &rspV2); - - /*rsp.pBlockData = pRes;*/ - - /*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/ - SRpcMsg resp = {.info = pMsg->info, pCont = buf, .contLen = msgLen, .code = 0}; - tqDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset, - pHead->msgType, consumerId, pReq->epoch); - tmsgSendRsp(&resp); - taosMemoryFree(pHead); - return 0; - } else { - taosMemoryFree(pHead); - fetchOffset++; - rspV2.skipLogNum++; - } - } - - /*if (blockingTime != 0) {*/ - /*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/ - /*} else {*/ - - rspV2.rspOffset = fetchOffset - 1; - - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - pMsg->code = -1; - return -1; - } - ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - ((SMqRspHead*)buf)->epoch = pReq->epoch; - ((SMqRspHead*)buf)->consumerId = consumerId; - - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqPollRspV2(&abuf, &rspV2); - - SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0}; - tmsgSendRsp(&resp); - tqDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId, - pReq->epoch); - /*}*/ - - return 0; -} -#endif - int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; @@ -981,7 +711,18 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0); } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { +int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { + SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + if (tDecodeSStreamTask(&decoder, pTask) < 0) { + ASSERT(0); + } + tDecoderClear(&decoder); + pTask->status = TASK_STATUS__IDLE; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; @@ -994,57 +735,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL) goto FAIL; + // exec if (pTask->execType != TASK_EXEC__NONE) { // expand runners - pTask->exec.numOfRunners = parallel; - pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner)); - if (pTask->exec.runners == NULL) { - goto FAIL; - } - for (int32_t i = 0; i < parallel; i++) { - STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); - SReadHandle handle = { - .reader = pStreamReader, - .meta = pTq->pVnode->pMeta, - .pMsgCb = &pTq->pVnode->msgCb, - .vnode = pTq->pVnode, - }; - pTask->exec.runners[i].inputHandle = pStreamReader; - pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); - ASSERT(pTask->exec.runners[i].executor); - } - } - - if (pTask->sinkType == TASK_SINK__TABLE) { - pTask->tbSink.vnode = pTq->pVnode; - pTask->tbSink.tbSinkFunc = tqTableSink; - } - - return 0; -FAIL: - if (pTask->inputQ) taosCloseQueue(pTask->inputQ); - if (pTask->outputQ) taosCloseQueue(pTask->outputQ); - if (pTask->inputQAll) taosFreeQall(pTask->inputQAll); - if (pTask->outputQAll) taosFreeQall(pTask->outputQAll); - if (pTask) taosMemoryFree(pTask); - return -1; -} - -int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - if (tDecodeSStreamTask(&decoder, pTask) < 0) { - ASSERT(0); - } - tDecoderClear(&decoder); - - // exec - if (tqExpandTask(pTq, pTask, 4) < 0) { - ASSERT(0); + STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + SReadHandle handle = { + .reader = pStreamReader, + .meta = pTq->pVnode->pMeta, + .pMsgCb = &pTq->pVnode->msgCb, + .vnode = pTq->pVnode, + }; + pTask->exec.inputHandle = pStreamReader; + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); + ASSERT(pTask->exec.executor); } // sink @@ -1052,8 +755,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaSink = smaHandleRes; } else if (pTask->sinkType == TASK_SINK__TABLE) { + pTask->tbSink.vnode = pTq->pVnode; + pTask->tbSink.tbSinkFunc = tqTableSink; + ASSERT(pTask->tbSink.pSchemaWrapper); ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); + pTask->tbSink.pTSchema = tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols); ASSERT(pTask->tbSink.pTSchema); @@ -1061,94 +768,17 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); - return 0; -} - -int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId) { - void* pIter = NULL; - - while (1) { - pIter = taosHashIterate(pTq->pStreamTasks, pIter); - if (pIter == NULL) break; - SStreamTask* pTask = (SStreamTask*)pIter; - - if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, workerId) < 0) { - // TODO - } - } - return 0; -} - -#if 0 -int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) { - SStreamDataSubmit* pSubmit = NULL; - - // build data - pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); - if (pSubmit == NULL) return -1; - pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t)); - if (pSubmit->dataRef == NULL) goto FAIL; - *pSubmit->dataRef = 1; - pSubmit->data = data; - pSubmit->type = STREAM_INPUT__DATA_BLOCK; - - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pTq->pStreamTasks, pIter); - if (pIter == NULL) break; - SStreamTask* pTask = (SStreamTask*)pIter; - if (pTask->inputType == TASK_INPUT_TYPE__SUMBIT_BLOCK) { - streamEnqueueDataSubmit(pTask, pSubmit); - // TODO cal back pressure - } - // check run - int8_t execStatus = atomic_load_8(&pTask->status); - if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { - SStreamTaskRunReq* pReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq)); - if (pReq == NULL) continue; - // TODO: do we need htonl? - pReq->head.vgId = pTq->pVnode->config.vgId; - pReq->streamId = pTask->streamId; - pReq->taskId = pTask->taskId; - SRpcMsg msg = { - .msgType = 0, - .pCont = pReq, - .contLen = sizeof(SStreamTaskRunReq), - }; - tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg); - } - } - streamDataSubmitRefDec(pSubmit); - return 0; FAIL: - if (pSubmit) { - if (pSubmit->dataRef) { - taosMemoryFree(pSubmit->dataRef); - } - taosFreeQitem(pSubmit); - } + if (pTask->inputQ) taosCloseQueue(pTask->inputQ); + if (pTask->outputQ) taosCloseQueue(pTask->outputQ); + if (pTask->inputQAll) taosFreeQall(pTask->inputQAll); + if (pTask->outputQAll) taosFreeQall(pTask->outputQAll); + if (pTask) taosMemoryFree(pTask); return -1; } -#endif - -int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId) { - SStreamTaskExecReq req; - tDecodeSStreamTaskExecReq(msg, &req); - - int32_t taskId = req.taskId; - ASSERT(taskId); - - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - ASSERT(pTask); - - if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, workerId) < 0) { - // TODO - } - return 0; -} -int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* pReq) { +int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { void* pIter = NULL; bool failed = false; @@ -1234,7 +864,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - streamTaskProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); + streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); return 0; } @@ -1242,7 +872,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - streamTaskProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); + streamProcessRecoverReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); return 0; } @@ -1250,7 +880,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = pMsg->pCont; int32_t taskId = pRsp->taskId; SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - streamTaskProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp); + streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp); return 0; } @@ -1258,6 +888,6 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverRsp* pRsp = pMsg->pCont; int32_t taskId = pRsp->taskId; SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - streamTaskProcessRecoverRsp(pTask, pRsp); + streamProcessRecoverRsp(pTask, pRsp); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 390073025b5998411b334340fa14b8bb0f182161..6d593f7619ef732fe212e42ddfad5149b9379ae2 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -142,13 +142,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg pMsg->contLen - sizeof(SMsgHead)) < 0) { } } break; -#if 0 - case TDMT_VND_TASK_WRITE_EXEC: { - if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead), - 0) < 0) { - } - } break; -#endif case TDMT_VND_ALTER_VNODE: break; default: @@ -231,17 +224,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { case TDMT_VND_TASK_RECOVER_RSP: return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg); -#if 0 - case TDMT_VND_TASK_PIPE_EXEC: - case TDMT_VND_TASK_MERGE_EXEC: - return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0); - case TDMT_VND_STREAM_TRIGGER:{ - // refactor, avoid double free - int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0); - pMsg->pCont = NULL; - return code; - } -#endif case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 66a661481e8f751b7a3a030bc7b85b38c75040d5..0acec0e4e6102e0a5622abd166bd4c4025d36f69 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -134,7 +134,7 @@ int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) { } static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) { - void* exec = pTask->exec.runners[0].executor; + void* exec = pTask->exec.executor; // set input if (pTask->inputType == STREAM_INPUT__DATA_SUBMIT) { @@ -171,12 +171,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) } // TODO: handle version -int32_t streamTaskExec2(SStreamTask* pTask, SMsgCb* pMsgCb) { +int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) return -1; while (1) { int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); - void* exec = pTask->exec.runners[0].executor; + void* exec = pTask->exec.executor; if (execStatus == TASK_STATUS__IDLE) { // first run, from qall, handle failure from last exec while (1) { @@ -278,7 +278,7 @@ FAIL: return -1; } -int32_t streamTaskSink(SStreamTask* pTask, SMsgCb* pMsgCb) { +int32_t streamSink(SStreamTask* pTask, SMsgCb* pMsgCb) { bool firstRun = 1; while (1) { SStreamDataBlock* pBlock = NULL; @@ -407,7 +407,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* return 0; } -int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { +int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { // 1. handle input streamTaskEnqueue(pTask, pReq, pRsp); @@ -415,172 +415,42 @@ int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStream // 2.1. idle: exec // 2.2. executing: return // 2.3. closing: keep trying - streamTaskExec2(pTask, pMsgCb); + streamExec(pTask, pMsgCb); // 3. handle output // 3.1 check and set status // 3.2 dispatch / sink - streamTaskSink(pTask, pMsgCb); + streamSink(pTask, pMsgCb); return 0; } -int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) { +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) { atomic_store_8(&pTask->inputStatus, pRsp->inputStatus); if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { // TODO: init recover timer } // continue dispatch - streamTaskSink(pTask, pMsgCb); + streamSink(pTask, pMsgCb); return 0; } int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb) { - streamTaskExec2(pTask, pMsgCb); - streamTaskSink(pTask, pMsgCb); + streamExec(pTask, pMsgCb); + streamSink(pTask, pMsgCb); return 0; } -int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) { +int32_t streamProcessRecoverReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg) { // return 0; } -int32_t streamTaskProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) { +int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) { // return 0; } -int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) { - SArray* pRes = NULL; - // source - if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0; - - // exec - if (pTask->execType != TASK_EXEC__NONE) { - ASSERT(workId < pTask->exec.numOfRunners); - void* exec = pTask->exec.runners[workId].executor; - pRes = taosArrayInit(0, sizeof(SSDataBlock)); - if (pRes == NULL) { - return -1; - } - if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - qSetStreamInput(exec, input, inputType); - while (1) { - SSDataBlock* output; - uint64_t ts; - if (qExecTask(exec, &output, &ts) < 0) { - ASSERT(false); - } - if (output == NULL) { - break; - } - taosArrayPush(pRes, output); - } - } else if (inputType == STREAM_DATA_TYPE_SSDATA_BLOCK) { - const SArray* blocks = (const SArray*)input; - /*int32_t sz = taosArrayGetSize(blocks);*/ - /*for (int32_t i = 0; i < sz; i++) {*/ - /*SSDataBlock* pBlock = taosArrayGet(blocks, i);*/ - /*qSetStreamInput(exec, pBlock, inputType);*/ - qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK); - while (1) { - SSDataBlock* output; - uint64_t ts; - if (qExecTask(exec, &output, &ts) < 0) { - ASSERT(false); - } - if (output == NULL) { - break; - } - taosArrayPush(pRes, output); - } - /*}*/ - } else { - ASSERT(0); - } - } else { - ASSERT(inputType == STREAM_DATA_TYPE_SSDATA_BLOCK); - pRes = (SArray*)input; - } - - if (pRes == NULL || taosArrayGetSize(pRes) == 0) return 0; - - // sink - if (pTask->sinkType == TASK_SINK__TABLE) { - // blockDebugShowData(pRes); - pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes); - } else if (pTask->sinkType == TASK_SINK__SMA) { - pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); - // - } else if (pTask->sinkType == TASK_SINK__FETCH) { - // - } else { - ASSERT(pTask->sinkType == TASK_SINK__NONE); - } - - // dispatch - - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - SRpcMsg dispatchMsg = {0}; - if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) { - ASSERT(0); - return -1; - } - - int32_t qType; - if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { - qType = FETCH_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC || - pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) { - qType = MERGE_QUEUE; - } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) { - qType = WRITE_QUEUE; - } else { - ASSERT(0); - } - tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); - - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { - SRpcMsg dispatchMsg = {0}; - SEpSet* pEpSet = NULL; - if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) { - ASSERT(0); - return -1; - } - - tmsgSendReq(pEpSet, &dispatchMsg); - - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - SHashObj* pShuffleRes = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - if (pShuffleRes == NULL) { - return -1; - } - - int32_t sz = taosArrayGetSize(pRes); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pRes, i); - SArray* pArray = taosHashGet(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t)); - if (pArray == NULL) { - pArray = taosArrayInit(0, sizeof(SSDataBlock)); - if (pArray == NULL) { - return -1; - } - taosHashPut(pShuffleRes, &pDataBlock->info.groupId, sizeof(int64_t), &pArray, sizeof(void*)); - } - taosArrayPush(pArray, pDataBlock); - } - - if (streamShuffleDispatch(pTask, pMsgCb, pShuffleRes) < 0) { - return -1; - } - - } else { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); - } - return 0; -} - int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->streamId); @@ -607,20 +477,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) { pTask->streamId = streamId; pTask->status = TASK_STATUS__IDLE; - pTask->inputQ = taosOpenQueue(); - pTask->outputQ = taosOpenQueue(); - pTask->inputQAll = taosAllocateQall(); - pTask->outputQAll = taosAllocateQall(); - if (pTask->inputQ == NULL || pTask->outputQ == NULL || pTask->inputQAll == NULL || pTask->outputQAll == NULL) - goto FAIL; return pTask; -FAIL: - if (pTask->inputQ) taosCloseQueue(pTask->inputQ); - if (pTask->outputQ) taosCloseQueue(pTask->outputQ); - if (pTask->inputQAll) taosFreeQall(pTask->inputQAll); - if (pTask->outputQAll) taosFreeQall(pTask->outputQAll); - if (pTask) taosMemoryFree(pTask); - return NULL; } int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { @@ -722,11 +579,7 @@ void tFreeSStreamTask(SStreamTask* pTask) { taosCloseQueue(pTask->outputQ); // TODO if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); - for (int32_t i = 0; i < pTask->exec.numOfRunners; i++) { - qDestroyTask(pTask->exec.runners[i].executor); - } - taosMemoryFree(pTask->exec.runners); - /*taosMemoryFree(pTask->executor);*/ + qDestroyTask(pTask->exec.executor); taosMemoryFree(pTask); }