diff --git a/include/common/tmsg.h b/include/common/tmsg.h index aabc84659ce91d0750481a51d3c9471d7aa4ac18..66131451e8574f949d434d04b7f31cfc4a28d35d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3134,7 +3134,8 @@ int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes); typedef struct { - int64_t uid; + // int64_t uid; + char tbname[TSDB_TABLE_NAME_LEN]; int64_t ts; } SSingleDeleteReq; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1c3f905e23a1a401cd662c20a495fa61d2e49ade..0354078b7b9dfae731cc05e4aa846e46b2ab6cd0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -317,6 +317,8 @@ typedef struct SStreamTask { int8_t inputStatus; int8_t outputStatus; + // STaosQueue* inputQueue1; + // STaosQall* inputQall; SStreamQueue* inputQueue; SStreamQueue* outputQueue; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 8e7faf48f64054ca7a1de34de3b33bf6e1deb37a..ab44236d96ad877988231bcbf15369c14df35855 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1649,7 +1649,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", + tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", pollRspWrapper->dataRsp.head.epoch, consumerEpoch); taosFreeQitem(pollRspWrapper); } @@ -1667,7 +1667,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", + tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", pollRspWrapper->metaRsp.head.epoch, consumerEpoch); taosFreeQitem(pollRspWrapper); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c39d4b0e114117551177cd7566f42bc4de50d983..795b1145c8c32363e1e27b4bb92f2cdc20e803ca 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6131,13 +6131,13 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) { } int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { - if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1; + if (tEncodeCStr(pEncoder, pReq->tbname) < 0) return -1; if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1; return 0; } int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) { - if (tDecodeI64(pDecoder, &pReq->uid) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pReq->tbname) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1; return 0; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 65e8d69994cada8f023b7a20a080cf271025f505..913fa67bd6a0e5fec7bcc4fd3f35880328bef32c 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -25,6 +25,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + tqDebug("stream delete msg: row %d", totRow); + for (int32_t row = 0; row < totRow; row++) { int64_t ts = *(int64_t*)colDataGetData(pTsCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); @@ -36,11 +38,14 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl } else { name = buildCtbNameByGroupId(stbFullName, groupId); } - tqDebug("stream delete msg: groupId :%" PRId64 ", name: %s", groupId, name); + tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, ts:%" PRId64, pVnode->config.vgId, groupId, + name, ts); +#if 0 SMetaReader mr = {0}; metaReaderInit(&mr, pVnode->pMeta, 0); if (metaGetTableEntryByName(&mr, name) < 0) { metaReaderClear(&mr); + tqDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name); taosMemoryFree(name); continue; } @@ -48,10 +53,13 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl int64_t uid = mr.me.uid; metaReaderClear(&mr); taosMemoryFree(name); +#endif SSingleDeleteReq req = { .ts = ts, - .uid = uid, }; + strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN); + taosMemoryFree(name); + /*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/ taosArrayPush(deleteReq->deleteReqs, &req); } return 0; @@ -309,6 +317,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); deleteReq.suid = suid; tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq); + if (taosArrayGetSize(deleteReq.deleteReqs) == 0) { + taosArrayDestroy(deleteReq.deleteReqs); + continue; + } int32_t len; int32_t code; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 48b3e9ff7797b30a4b6b3e71c81683c5205e2d79..c663e2b5268eec7aa0ad8313110fb403dd483979 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -117,12 +117,12 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info); } if (pMsgIter->sversion != info.skmVer) { - tsdbError("vgId:%d, req sver:%d, skmVer:%d suid:%" PRId64 " uid:%" PRId64, - TD_VID(pTsdb->pVnode), pMsgIter->sversion, info.skmVer, suid, uid); + tsdbError("vgId:%d, req sver:%d, skmVer:%d suid:%" PRId64 " uid:%" PRId64, TD_VID(pTsdb->pVnode), + pMsgIter->sversion, info.skmVer, suid, uid); code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER; goto _err; } - + pRsp->sver = info.skmVer; // create/get STbData to op @@ -198,14 +198,14 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid } tsdbInfo("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 - " since %s", - TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); + " at version %" PRId64 " since %s", + TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code)); return code; _err: tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 - " since %s", - TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); + " at version %" PRId64 " since %s", + TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 089905ee203ac8a894d9889f9eff93bab19195b3..688335454791c456b8d417d5b81dd5ab57530c68 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -945,7 +945,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name); tbCreated = true; } - + msgIter.uid = createTbReq.uid; if (createTbReq.type == TSDB_CHILD_TABLE) { msgIter.suid = createTbReq.ctb.suid; @@ -958,7 +958,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq #endif tDecoderClear(&decoder); taosArrayDestroy(createTbReq.ctb.tagName); - } + } if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) { submitBlkRsp.code = terrno; @@ -1169,16 +1169,28 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void tDecoderInit(&decoder, pReq, len); tDecodeSBatchDeleteReq(&decoder, &deleteReq); + SMetaReader mr = {0}; + metaReaderInit(&mr, pVnode->pMeta, 0); + int32_t sz = taosArrayGetSize(deleteReq.deleteReqs); for (int32_t i = 0; i < sz; i++) { SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i); - int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, pOneReq->uid, pOneReq->ts, pOneReq->ts); + char *name = pOneReq->tbname; + if (metaGetTableEntryByName(&mr, name) < 0) { + vDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name); + continue; + } + + int64_t uid = mr.me.uid; + + int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->ts, pOneReq->ts); if (code < 0) { terrno = code; vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64, - TD_VID(pVnode), terrstr(), deleteReq.suid, pOneReq->uid, pOneReq->ts, pOneReq->ts); + TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->ts, pOneReq->ts); } } + metaReaderClear(&mr); taosArrayDestroy(deleteReq.deleteReqs); return 0; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cd9f29978d644755f39adf2c44fdf6c5a9da57ff..771ab10140a2190ea841d5c00eb7568a2e347787 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -87,11 +87,11 @@ typedef struct SLimit { typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder; typedef struct STaskCostInfo { - int64_t created; - int64_t start; - uint64_t elapsedTime; - double extractListTime; - double groupIdMapTime; + int64_t created; + int64_t start; + uint64_t elapsedTime; + double extractListTime; + double groupIdMapTime; SFileBlockLoadRecorder* pRecoder; } STaskCostInfo; @@ -184,8 +184,7 @@ enum { typedef struct SOperatorFpSet { __optr_open_fn_t _openFn; // DO NOT invoke this function directly __optr_fn_t getNextFn; - __optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it - __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP + __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_close_fn_t closeFn; __optr_encode_fn_t encodeResultRow; __optr_decode_fn_t decodeResultRow; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b7c3eed069e3d1c339a01fcd55e7c20c43a8f97c..efb9a49df28c7ec0b3a4433b017e4d2da2b09ac1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -114,7 +114,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, SOperatorFpSet fpSet = { ._openFn = openFn, .getNextFn = nextFn, - .getStreamResFn = streamFn, .cleanupFn = cleanup, .closeFn = closeFn, .getExplainFn = explain, @@ -819,7 +818,7 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo) { // abort current query execution. if (pTaskInfo->owner != 0 && ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec()) - /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) { + /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) { assert(pTaskInfo->cost.start != 0); // qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64 // ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec()); @@ -1899,7 +1898,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn // printf("%d completed, try next\n", i); qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, completed, i + 1, totalSources); taosMemoryFreeClear(pDataInfo->pRsp); @@ -1984,7 +1983,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } } - _error: +_error: pTaskInfo->code = code; } @@ -2046,7 +2045,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 " try next", + ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows); @@ -2063,7 +2062,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); @@ -2072,7 +2071,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 - ", totalBytes:%" PRIu64, + ", totalBytes:%" PRIu64, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize); } @@ -2260,7 +2259,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL); return pOperator; - _error: +_error: if (pInfo != NULL) { doDestroyExchangeOperatorInfo(pInfo); } @@ -3173,8 +3172,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType - ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval - : &((SIntervalAggOperatorInfo*)downstream->info)->interval; + ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval + : &((SIntervalAggOperatorInfo*)downstream->info)->interval; int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t type = convertFillType(pPhyFillNode->mode); @@ -3353,9 +3352,9 @@ bool groupbyTbname(SNodeList* pGroupList) { SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser) { - int32_t type = nodeType(pPhyNode); + int32_t type = nodeType(pPhyNode); STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; - const char* idstr = GET_TASKID(pTaskInfo); + const char* idstr = GET_TASKID(pTaskInfo); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { SOperatorInfo* pOperator = NULL; @@ -3428,7 +3427,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo for (int32_t i = 0; i < sz; i++) { STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); - qDebug("add table uid:%" PRIu64", gid:%"PRIu64, pKeyInfo->uid, pKeyInfo->groupId); + qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId); } #endif } @@ -3461,7 +3460,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - for(int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) { + for (int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) { STableKeyInfo* p = taosArrayGet(pList, i); tableListAddTableInfo(pTableListInfo, p->uid, 0); } @@ -3501,7 +3500,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOperator; } - size_t size = LIST_LENGTH(pPhyNode->pChildren); + size_t size = LIST_LENGTH(pPhyNode->pChildren); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); if (ops == NULL) { return NULL; @@ -3712,7 +3711,7 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32 *length = *(int32_t*)(*result); } - _downstream: +_downstream: for (int32_t i = 0; i < ops->numOfDownstream; ++i) { code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal); if (code != TDB_CODE_SUCCESS) { @@ -3971,8 +3970,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat int32_t size = 0; void* pVal = NULL; SWinKey key = { - .ts = *(TSKEY*)pPos->key, - .groupId = pPos->groupId, + .ts = *(TSKEY*)pPos->key, + .groupId = pPos->groupId, }; int32_t code = streamStateGet(pState, &key, &pVal, &size); ASSERT(code == 0); diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index ebc3a962d3064e5735f2a193caba80dac00b6952..9f06e639b353c57cd90b9b9f19683e3f32fc5239 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1691,14 +1691,14 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi pInfo->srcRowIndex = 0; - pOperator->name = "FillOperator"; + pOperator->name = "StreamFillOperator"; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo, - NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 88f3e4cff99457265ca2913b6e79ab2d5a658901..7b6ed5b67dd1e8b92c93640ade09a6fd8cb441e2 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -12,8 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "filter.h" #include "executorimpl.h" +#include "filter.h" #include "function.h" #include "functionMgt.h" #include "tcommon.h" @@ -986,7 +986,7 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf // current result is done in computing final results. if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { closeResultRow(pResult); - SListNode *pNode = tdListPopHead(pResultRowInfo->openWindow); + SListNode* pNode = tdListPopHead(pResultRowInfo->openWindow); taosMemoryFree(pNode); } } @@ -1255,35 +1255,33 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pInfo->binfo.pRes; - if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - return pOperator->fpSet.getStreamResFn(pOperator); - } else { - pTaskInfo->code = pOperator->fpSet._openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - return NULL; - } + ASSERT(pInfo->execModel == OPTR_EXEC_MODEL_BATCH); - blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); - while (1) { - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; + } - bool hasRemain = hasRemainResults(&pInfo->groupResInfo); - if (!hasRemain) { - doSetOperatorCompleted(pOperator); - break; - } + blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); + while (1) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); - if (pBlock->info.rows > 0) { - break; - } + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); + if (!hasRemain) { + doSetOperatorCompleted(pOperator); + break; } - size_t rows = pBlock->info.rows; - pOperator->resultInfo.totalRows += rows; - - return (rows == 0) ? NULL : pBlock; + if (pBlock->info.rows > 0) { + break; + } } + + size_t rows = pBlock->info.rows; + pOperator->resultInfo.totalRows += rows; + + return (rows == 0) ? NULL : pBlock; } static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) { diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 6a3bdb59c9277d392ae82a6bed15674672cf56c8..0fc75c4798c2e24f92893a763077dea373ccade1 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -32,8 +32,6 @@ typedef struct { static SStreamGlobalEnv streamEnv; -// int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch); - int32_t streamDispatch(SStreamTask* pTask); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index b71562cf454f0e7be57ddccc00ae4fcdf410c88b..e6d5859163f56150d4ede3a3903daaf79a3be4f5 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -240,43 +240,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { return 0; } -#if 0 -int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) { - void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp)); - ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); - - SStreamTaskRecoverRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); - pCont->inputStatus = pTask->inputStatus; - pCont->streamId = pTask->streamId; - pCont->reqTaskId = pTask->taskId; - pCont->rspTaskId = pReq->upstreamTaskId; - - pRsp->pCont = buf; - pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp); - tmsgSendRsp(pRsp); - return 0; -} - -int32_t streamProcessRecoverRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) { - streamProcessRunReq(pTask); - - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - // scan data to recover - pTask->inputStatus = TASK_INPUT_STATUS__RECOVER; - pTask->taskStatus = TASK_STATUS__RECOVER_SELF; - qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer); - if (streamPipelineExec(pTask, 100, true) < 0) { - return -1; - } - } else { - pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; - pTask->taskStatus = TASK_STATUS__NORMAL; - } - - return 0; -} -#endif - int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 46fab536591f128282f22a7c7ef1a9adb2e441df..e7f2b60704b7c6d17af8390b6d724fed77dbbd44 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -138,63 +138,39 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { } #if 0 -int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) { - ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); - - void* exec = pTask->exec.executor; - - while (1) { - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - if (pRes == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - int32_t batchCnt = 0; - while (1) { - SSDataBlock* output = NULL; - uint64_t ts = 0; - if (qExecTask(exec, &output, &ts) < 0) { - ASSERT(0); - } - if (output == NULL) break; - - SSDataBlock block = {0}; - assignOneDataBlock(&block, output); - block.info.childId = pTask->selfChildId; - taosArrayPush(pRes, &block); - - if (++batchCnt >= batchNum) break; - } - if (taosArrayGetSize(pRes) == 0) { - taosArrayDestroy(pRes); - break; +int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { + // fetch all queue item, merge according to batchLimit + int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall); + if (numOfItems == 0) { + qDebug("task: %d, stream task exec over, queue empty", pTask->taskId); + return 0; + } + SStreamQueueItem* pMerged = NULL; + SStreamQueueItem* pItem = NULL; + taosGetQitem(pTask->inputQall, (void**)&pItem); + if (pItem == NULL) { + if (pMerged != NULL) { + // process merged item + } else { + return 0; } - if (dispatch) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return -1; - } - - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - qRes->childId = pTask->selfChildId; + } - if (streamTaskOutput(pTask, qRes) < 0) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); - return -1; - } + // if drop + if (pItem->type == STREAM_INPUT__DESTROY) { + // set status drop + return -1; + } - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - streamDispatch(pTask); - } - } else { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - } + if (pTask->taskLevel == TASK_LEVEL__SINK) { + ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK); + streamTaskOutput(pTask, (SStreamDataBlock*)pItem); } + // exec impl + + // output + // try dispatch return 0; } #endif diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index c723618a4b942213dd4a4d9aee471a67f68e77dd..b683ba19263f32406f52beef51a0ba1ca452ef58 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -482,7 +482,8 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); - wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s", pWal->cfg.vgId, index, TMSG_INFO(msgType)); + wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s, cksum head %u cksum body %u", pWal->cfg.vgId, index, + TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody); code = walWriteIndex(pWal, index, offset); if (code < 0) { diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index c8f128e6663f1a0545cb0374a174a60bd303a7a8..0f992184b579fa8ab1e4855f43e61f81b772c07c 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -208,7 +208,7 @@ STaosQall *taosAllocateQall() { void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); } int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { - int32_t code = 0; + int32_t numOfItems = 0; bool empty; taosThreadMutexLock(&queue->mutex); @@ -219,13 +219,14 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { qall->current = queue->head; qall->start = queue->head; qall->numOfItems = queue->numOfItems; - code = qall->numOfItems; + numOfItems = qall->numOfItems; queue->head = NULL; queue->tail = NULL; queue->numOfItems = 0; queue->memOfItems = 0; - uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems); + uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems, + queue->memOfItems); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); } @@ -237,7 +238,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { qall->start = NULL; qall->numOfItems = 0; } - return code; + return numOfItems; } int32_t taosGetQitem(STaosQall *qall, void **ppItem) {