diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index eb83da180352ccfbb9994379b8aec7dd9678df65..6199732cc8fb991929876f75ff01dcb2572c143e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -171,8 +171,8 @@ typedef struct { } STaskDispatcherFixedEp; typedef struct { - // int8_t hashMethod; char stbFullName[TSDB_TABLE_FNAME_LEN]; + int32_t waitingRspCnt; SUseDbRsp dbInfo; } STaskDispatcherShuffle; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a46938590e19d2465d66eec9d012caa9172c6ca3..34c96a5ec450f2bdd4eb635d7039d9a3ccd6f9f6 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -114,18 +114,26 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { pTask->sinkType = TASK_SINK__NONE; + + bool isShuffle = false; + if (pStream->fixedSinkVgId == 0) { pTask->dispatchType = TASK_DISPATCH__SHUFFLE; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); ASSERT(pDb); - - if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { - ASSERT(0); - return -1; + if (pDb->cfg.numOfVgroups > 1) { + isShuffle = true; + if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { + ASSERT(0); + return -1; + } } + sdbRelease(pMnode->pSdb, pDb); + } + if (isShuffle) { memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t sz = taosArrayGetSize(pVgs); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d6661904ab2b3fb0c2f333b0de4b4305ad625593..ab2e89bfda0ee6e6751622531ad5f3b67fcc8745 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -468,7 +468,6 @@ typedef struct SStreamScanInfo { SSDataBlock* pUpdateDataRes; // status for tmq // SSchemaWrapper schema; - STqOffset offset; SNodeList* pGroupTags; SNode* pTagCond; SNode* pTagIndexCond; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index f249321a767b847c35c7bb1d23d6e740d1cc3f7c..7740246d9112c2c4bf5d7eca5966814c9919fd57 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,12 +14,12 @@ */ #include "executor.h" -#include "tref.h" #include "executorimpl.h" #include "planner.h" #include "tdatablock.h" -#include "vnode.h" +#include "tref.h" #include "tudf.h" +#include "vnode.h" static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; int32_t exchangeObjRefPool = -1; @@ -95,16 +95,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } -#if 0 -int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) { - if (tinfo == NULL) { - return TSDB_CODE_QRY_APP_ERROR; - } - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_INPUT__TABLE_SCAN, 0, NULL); -} -#endif - int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) { return qSetMultiStreamInput(tinfo, input, 1, type, assignUid); } @@ -258,8 +248,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } // todo refactor STableList - size_t bufLen = (pScanInfo->pGroupTags != NULL)? getTableTagsBufLen(pScanInfo->pGroupTags):0; - char* keyBuf = NULL; + size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0; + char* keyBuf = NULL; if (bufLen > 0) { keyBuf = taosMemoryMalloc(bufLen); if (keyBuf == NULL) { @@ -267,13 +257,13 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } - for(int32_t i = 0; i < taosArrayGetSize(qa); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) { uint64_t* uid = taosArrayGet(qa, i); STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; if (bufLen > 0) { code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf, - &keyInfo.groupId); + &keyInfo.groupId); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -349,7 +339,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); } - _error: +_error: // if failed to add ref for all tables in this query, abort current query return code; } @@ -573,11 +563,6 @@ const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) { return pInfo->tqReader->pSchemaWrapper; } -const STqOffset* qExtractStatusFromStreamScanner(void* scanner) { - SStreamScanInfo* pInfo = scanner; - return &pInfo->offset; -} - void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); @@ -600,12 +585,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { while (1) { uint8_t type = pOperator->operatorType; pOperator->status = OP_OPENED; - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamScanInfo* pInfo = pOperator->info; - if (pOffset->type == TMQ_OFFSET__LOG) { - STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTSInfo->dataReader); - pTSInfo->dataReader = NULL; + // TODO add more check + if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + ASSERT(pOperator->numOfDownstream == 1); + pOperator = pOperator->pDownstream[0]; + } + + SStreamScanInfo* pInfo = pOperator->info; + if (pOffset->type == TMQ_OFFSET__LOG) { + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTSInfo->dataReader); + pTSInfo->dataReader = NULL; #if 0 if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && pInfo->tqReader->pWalReader->curVersion != pOffset->version) { @@ -614,102 +604,74 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) { ASSERT(0); } #endif - if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { + if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) { + return -1; + } + ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1); + } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ + int64_t uid = pOffset->uid; + int64_t ts = pOffset->ts; + + if (uid == 0) { + if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); + uid = pTableInfo->uid; + ts = INT64_MIN; + } else { return -1; } - ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1); - } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/ - int64_t uid = pOffset->uid; - int64_t ts = pOffset->ts; - - if (uid == 0) { - if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); - uid = pTableInfo->uid; - ts = INT64_MIN; - } else { - return -1; - } - } + } - /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ - /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ + /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); #ifndef NDEBUG - qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable, - pInfo->pTableScanOp->resultInfo.totalRows); - pInfo->pTableScanOp->resultInfo.totalRows = 0; + qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable, + pInfo->pTableScanOp->resultInfo.totalRows); + pInfo->pTableScanOp->resultInfo.totalRows = 0; #endif - bool found = false; - for (int32_t i = 0; i < tableSz; i++) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); - if (pTableInfo->uid == uid) { - found = true; - pTableScanInfo->currentTable = i; - break; - } + bool found = false; + for (int32_t i = 0; i < tableSz; i++) { + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); + if (pTableInfo->uid == uid) { + found = true; + pTableScanInfo->currentTable = i; + break; } + } - // TODO after dropping table, table may be not found - ASSERT(found); + // TODO after dropping table, table may be not found + ASSERT(found); - if (pTableScanInfo->dataReader == NULL) { - if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, - pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 || - pTableScanInfo->dataReader == NULL) { - ASSERT(0); - } + if (pTableScanInfo->dataReader == NULL) { + if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, + pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 || + pTableScanInfo->dataReader == NULL) { + ASSERT(0); } + } - tsdbSetTableId(pTableScanInfo->dataReader, uid); - int64_t oldSkey = pTableScanInfo->cond.twindows.skey; - pTableScanInfo->cond.twindows.skey = ts + 1; - tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); - pTableScanInfo->cond.twindows.skey = oldSkey; - pTableScanInfo->scanTimes = 0; + tsdbSetTableId(pTableScanInfo->dataReader, uid); + int64_t oldSkey = pTableScanInfo->cond.twindows.skey; + pTableScanInfo->cond.twindows.skey = ts + 1; + tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); + pTableScanInfo->cond.twindows.skey = oldSkey; + pTableScanInfo->scanTimes = 0; - qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts, - pTableScanInfo->currentTable, tableSz); - /*}*/ + qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts, + pTableScanInfo->currentTable, tableSz); + /*}*/ - } else { - ASSERT(0); - } - return 0; } else { - ASSERT(pOperator->numOfDownstream == 1); - pOperator = pOperator->pDownstream[0]; + ASSERT(0); } + return 0; } } return 0; } - - -#if 0 -int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - - if (uid == 0) { - if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); - uid = pTableInfo->uid; - ts = INT64_MIN; - } - } - - return doPrepareScan(pTaskInfo->pRoot, uid, ts); -} - -int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - - return doGetScanStatus(pTaskInfo->pRoot, uid, ts); -} -#endif - diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 463ac0e69c4354c6f8062573a71c1d438d75b6c0..2015ae8f086717106f283654509d8d89deee0f0f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1396,13 +1396,11 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNo static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; -#if 1 if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info; destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput); taosMemoryFreeClear(pStreamScan->pTableScanOp); } -#endif if (pStreamScan->tqReader) { tqCloseReader(pStreamScan->tqReader); } @@ -2855,7 +2853,8 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity, SOperatorInfo* pOperator) { +SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity, + SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2874,7 +2873,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* } } - qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows); return (pResBlock->info.rows > 0) ? pResBlock : NULL; } @@ -2905,7 +2903,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { } SSDataBlock* pBlock = NULL; while (pInfo->tableStartIndex < tableListSize) { - pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); + pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, + pOperator); if (pBlock != NULL) { pBlock->info.groupId = pInfo->groupId; pOperator->resultInfo.totalRows += pBlock->info.rows; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d69ceaf8df1163ef3dd0cbdad42383524bafafc9..4b9baa3657aa4bc27f331d1b8864d40d33eb4758 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -773,9 +773,9 @@ int32_t binarySearch(void* keyList, int num, TSKEY key, int order, __get_value_f } int32_t comparePullWinKey(void* pKey, void* data, int32_t index) { - SArray* res = (SArray*)data; + SArray* res = (SArray*)data; SPullWindowInfo* pos = taosArrayGet(res, index); - SPullWindowInfo* pData = (SPullWindowInfo*) pKey; + SPullWindowInfo* pData = (SPullWindowInfo*)pKey; if (pData->window.skey == pos->window.skey) { if (pData->groupId > pos->groupId) { return 1; @@ -810,7 +810,7 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { int32_t compareResKey(void* pKey, void* data, int32_t index) { SArray* res = (SArray*)data; SResKeyPos* pos = taosArrayGetP(res, index); - SWinRes* pData = (SWinRes*) pKey; + SWinRes* pData = (SWinRes*)pKey; if (pData->ts == *(int64_t*)pos->key) { if (pData->groupId > pos->groupId) { return 1; @@ -880,7 +880,7 @@ int64_t getWinReskey(void* data, int32_t index) { int32_t compareWinRes(void* pKey, void* data, int32_t index) { SArray* res = (SArray*)data; SWinRes* pos = taosArrayGetP(res, index); - SResKeyPos* pData = (SResKeyPos*) pKey; + SResKeyPos* pData = (SResKeyPos*)pKey; if (*(int64_t*)pData->key == pos->ts) { if (pData->groupId > pos->groupId) { return 1; @@ -1417,15 +1417,15 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); pGpDatas = (uint64_t*)pGpCol->pData; } - int32_t step = 0; - int32_t startPos = 0; + int32_t step = 0; + int32_t startPos = 0; for (int32_t i = 0; i < pBlock->info.rows; i++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); while (win.ekey <= endTsCols[i]) { uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId; - bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput); + bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput); if (pUpWins && res) { SWinRes winRes = {.ts = win.skey, .groupId = winGpId}; taosArrayPush(pUpWins, &winRes); @@ -1596,7 +1596,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } if (pBlock->info.type == STREAM_NORMAL) { - //set input version + // set input version pTaskInfo->version = pBlock->info.version; } @@ -1642,7 +1642,7 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { } static void freeItem(void* param) { - SGroupKeys *pKey = (SGroupKeys*) param; + SGroupKeys* pKey = (SGroupKeys*)param; taosMemoryFree(pKey->pData); } @@ -2345,10 +2345,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, (SNodeListNode*)pInterpPhyNode->pFillValues); - pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pInfo->win = pInterpPhyNode->timeRange; + pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + pInfo->win = pInterpPhyNode->timeRange; pInfo->interval.interval = pInterpPhyNode->interval; - pInfo->current = pInfo->win.skey; + pInfo->current = pInfo->win.skey; pOperator->name = "TimeSliceOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC; @@ -2968,8 +2968,8 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) { pBlock->info.groupId = 0; pBlock->info.rows = 0; pBlock->info.type = type; - pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + - sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY); + pBlock->info.rowSize = + sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY); pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); SColumnInfoData infoData = {0}; @@ -4321,10 +4321,10 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } else { return; } - + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; blockDataEnsureCapacity(pAggSup->pScanBlock, pSDataBlock->info.rows); - SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); + SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); for (int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) { if (pInfo->ignoreExpiredData && isOverdue(tsCols[i], &pInfo->twAggSup)) { i++; @@ -4339,7 +4339,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl &allEqual, pInfo->pSeDeleted); if (!allEqual) { appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, - &pSDataBlock->info.groupId); + &pSDataBlock->info.groupId); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex); continue; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 093242c610327359b9f3da2907aa245a92b0a981..06bd6539fd9a239be8666d588c0a45b3f38c5435 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -38,14 +38,14 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); -int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet); +int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); -void streamFreeQitem(SStreamQueueItem* data); +void streamFreeQitem(SStreamQueueItem* data); #ifdef __cplusplus } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 99a06575a948f19f42cb798fa3ea6dad4d4f7521..8c9e8ca2dbc67c989bf8cc03e35b8ecc285776ed 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -219,6 +219,12 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { qDebug("task %d receive dispatch rsp", pTask->taskId); + if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + qDebug("task %d is shuffle, left waiting rsp %d", pTask->taskId, leftRsp); + if (leftRsp > 0) return 0; + } + int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); ASSERT(old == TASK_OUTPUT_STATUS__WAIT); if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5d4adb2896cd3612fe73c3fd77fa52adab17d353..ecad1c96e0801b862df998913332b608f1b2ac94 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -198,6 +198,154 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis return 0; } +int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { + void* buf = NULL; + int32_t code = -1; + SRpcMsg msg = {0}; + + // serialize + int32_t tlen; + tEncodeSize(tEncodeStreamDispatchReq, pReq, tlen, code); + if (code < 0) goto FAIL; + code = -1; + buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + goto FAIL; + } + + ((SMsgHead*)buf)->vgId = htonl(vgId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) { + goto FAIL; + } + tEncoderClear(&encoder); + + msg.contLen = tlen + sizeof(SMsgHead); + msg.pCont = buf; + msg.msgType = pTask->dispatchMsgType; + + tmsgSendReq(pEpSet, &msg); + + code = 0; +FAIL: + if (code < 0 && buf) rpcFreeCont(buf); + return 0; +} + +int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { + int32_t code = -1; + int32_t blockNum = taosArrayGetSize(pData->blocks); + ASSERT(blockNum != 0); + + if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + SStreamDispatchReq req = { + .streamId = pTask->streamId, + .dataSrcVgId = pData->srcVgId, + .upstreamTaskId = pTask->taskId, + .upstreamChildId = pTask->selfChildId, + .upstreamNodeId = pTask->nodeId, + .blockNum = blockNum, + }; + + req.data = taosArrayInit(blockNum, sizeof(void*)); + req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); + if (req.data == NULL || req.dataLen == NULL) { + goto FAIL_FIXED_DISPATCH; + } + + for (int32_t i = 0; i < blockNum; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); + if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) { + goto FAIL_FIXED_DISPATCH; + } + } + int32_t vgId = pTask->fixedEpDispatcher.nodeId; + SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; + int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; + + req.taskId = downstreamTaskId; + + qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId, + downstreamTaskId, vgId); + + if (streamDispatchOneReq(pTask, &req, vgId, pEpSet) < 0) { + goto FAIL_FIXED_DISPATCH; + } + code = 0; + FAIL_FIXED_DISPATCH: + taosArrayDestroy(req.data); + taosArrayDestroy(req.dataLen); + return code; + + } else if (pTask->dispatchMsgType == TASK_DISPATCH__SHUFFLE) { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + ASSERT(pTask->shuffleDispatcher.waitingRspCnt == 0); + int32_t vgSz = taosArrayGetSize(vgInfo); + SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq)); + if (pReqs == NULL) { + return -1; + } + + for (int32_t i = 0; i < vgSz; i++) { + pReqs[i].streamId = pTask->streamId; + pReqs[i].dataSrcVgId = pData->srcVgId; + pReqs[i].upstreamTaskId = pTask->taskId; + pReqs[i].upstreamChildId = pTask->selfChildId; + pReqs[i].upstreamNodeId = pTask->nodeId; + pReqs[i].blockNum = 0; + pReqs[i].data = taosArrayInit(0, sizeof(void*)); + pReqs[i].dataLen = taosArrayInit(0, sizeof(int32_t)); + if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) { + goto FAIL_SHUFFLE_DISPATCH; + } + } + for (int32_t i = 0; i < blockNum; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); + char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pDataBlock->info.groupId); + + // TODO: get hash function by hashMethod + uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); + + // TODO: optimize search + int32_t j; + for (j = 0; j < vgSz; j++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); + ASSERT(pVgInfo->vgId > 0); + if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { + if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { + goto FAIL_SHUFFLE_DISPATCH; + } + break; + } + } + } + for (int32_t i = 0; i < vgSz; i++) { + if (pReqs[i].blockNum > 0) { + // send + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + if (streamDispatchOneReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { + goto FAIL_SHUFFLE_DISPATCH; + } + pTask->shuffleDispatcher.waitingRspCnt++; + } + } + code = 0; + FAIL_SHUFFLE_DISPATCH: + if (pReqs) { + for (int32_t i = 0; i < vgSz; i++) { + taosArrayDestroy(pReqs[i].data); + taosArrayDestroy(pReqs[i].dataLen); + } + taosMemoryFree(pReqs); + } + return code; + } + return 0; +} + int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { void* buf = NULL; int32_t code = -1; @@ -262,29 +410,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId, downstreamTaskId, vgId); - // serialize - int32_t tlen; - tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code); - if (code < 0) goto FAIL; - code = -1; - buf = rpcMallocCont(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - goto FAIL; - } - - ((SMsgHead*)buf)->vgId = htonl(vgId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamDispatchReq(&encoder, &req)) < 0) { - goto FAIL; - } - tEncoderClear(&encoder); - - pMsg->contLen = tlen + sizeof(SMsgHead); - pMsg->pCont = buf; - pMsg->msgType = pTask->dispatchMsgType; + streamDispatchOneReq(pTask, &req, vgId, *ppEpSet); code = 0; FAIL: @@ -314,6 +440,18 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { qDebug("stream continue dispatching: task %d", pTask->taskId); + int32_t code = 0; + if (streamDispatchAllBlocks(pTask, pBlock) < 0) { + ASSERT(0); + code = -1; + // TODO set status fail + goto FREE; + } + /*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/ +FREE: + taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); + taosFreeQitem(pBlock); +#if 0 SRpcMsg dispatchMsg = {0}; SEpSet* pEpSet = NULL; if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) { @@ -325,5 +463,6 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { taosFreeQitem(pBlock); tmsgSendReq(pEpSet, &dispatchMsg); - return 0; +#endif + return code; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 047354c4aa44c5faa4bf50ec087ab939b8df0855..c939c8c43685ef3dc6ea0b4fde1cd5fbfb33a8d1 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -83,7 +83,16 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { // set config memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg)); + pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; + if (pWal->cfg.retentionSize > 0) { + pWal->cfg.retentionSize *= 1024; + } + + if (pWal->cfg.segSize > 0) { + pWal->cfg.segSize *= 1024; + } + if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; tstrncpy(pWal->path, path, sizeof(pWal->path)); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index d869e6e2ceee749657a53970c1981d97ca07db1a..491e5b0e08ff190ed9b042170e942738a22e5f36 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -401,6 +401,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy pWal->writeHead.head.version = index; pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.msgType = msgType; + pWal->writeHead.head.ingestTs = taosGetTimestampMs(); // sync info for sync module pWal->writeHead.head.syncMeta = syncMeta; @@ -457,14 +458,14 @@ int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const vo return -1; } - if (pWal->pIdxFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) { + if (pWal->pLogFile == NULL || pWal->pIdxFile == NULL || pWal->writeCur < 0) { if (walInitWriteFile(pWal) < 0) { taosThreadMutexUnlock(&pWal->mutex); return -1; } } - ASSERT(pWal->pIdxFile != NULL && pWal->pLogFile != NULL && pWal->writeCur >= 0); + ASSERT(pWal->pLogFile != NULL && pWal->pIdxFile != NULL && pWal->writeCur >= 0); if (walWriteImpl(pWal, index, msgType, syncMeta, body, bodyLen) < 0) { taosThreadMutexUnlock(&pWal->mutex);