提交 bc1af6de 编写于 作者: L Liu Jicong

refactor(stream): batch optimization for submit msg

上级 159f1c7b
...@@ -87,6 +87,14 @@ void taosArrayRemoveBatch(SArray* pArray, const int32_t* pData, int32_t numOfEle ...@@ -87,6 +87,14 @@ void taosArrayRemoveBatch(SArray* pArray, const int32_t* pData, int32_t numOfEle
*/ */
void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)); void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*));
/**
*
* @param pArray
* @param comparFn
* @param fp
*/
void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*));
/** /**
* add all element from the source array list into the destination * add all element from the source array list into the destination
* @param pArray * @param pArray
......
...@@ -452,7 +452,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -452,7 +452,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
SArray *newSub = subscribe.topicNames; SArray *newSub = subscribe.topicNames;
taosArraySortString(newSub, taosArrayCompareString); taosArraySortString(newSub, taosArrayCompareString);
taosArrayRemoveDuplicate(newSub, taosArrayCompareString, taosMemoryFree); taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree);
int32_t newTopicNum = taosArrayGetSize(newSub); int32_t newTopicNum = taosArrayGetSize(newSub);
// check topic existance // check topic existance
......
...@@ -299,6 +299,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { ...@@ -299,6 +299,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
} }
if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) { if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
} }
......
...@@ -43,16 +43,24 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -43,16 +43,24 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
// TODO: if a block was set but not consumed, // TODO: if a block was set but not consumed,
// prevent setting a different type of block // prevent setting a different type of block
pInfo->blockType = type; pInfo->blockType = type;
pInfo->validBlockIndex = 0;
taosArrayClear(pInfo->pBlockLists);
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) { /*if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {*/
qError("submit msg messed up when initing stream block, %s" PRIx64, id); /*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/
return TSDB_CODE_QRY_APP_ERROR; /*return TSDB_CODE_QRY_APP_ERROR;*/
/*}*/
taosArrayClear(pInfo->pBlockLists);
for (int32_t i = 0; i < numOfBlocks; i++) {
SSubmitReq* pReq = POINTER_SHIFT(input, i * sizeof(void*));
taosArrayPush(pInfo->pBlockLists, &pReq);
} }
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
// TODO optimize
SSDataBlock* p = createOneDataBlock(pDataBlock, false); SSDataBlock* p = createOneDataBlock(pDataBlock, false);
p->info = pDataBlock->info; p->info = pDataBlock->info;
...@@ -153,7 +161,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { ...@@ -153,7 +161,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
return pTaskInfo; return pTaskInfo;
} }
static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) { static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList,
const char* idstr) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
// let's discard the tables those are not created according to the queried super table. // let's discard the tables those are not created according to the queried super table.
......
...@@ -1393,24 +1393,47 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1393,24 +1393,47 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->tqReader)) { int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
SSDataBlock block = {0};
// todo refactor while (1) {
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader); if (pInfo->tqReader->pMsg == NULL) {
if (pInfo->validBlockIndex >= totBlockNum) {
return NULL;
}
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { int32_t current = pInfo->validBlockIndex++;
pTaskInfo->code = code; SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
return NULL; if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
totBlockNum);
pInfo->tqReader->pMsg = NULL;
continue;
}
} }
setBlockIntoRes(pInfo, &block); blockDataCleanup(pInfo->pRes);
while (tqNextDataBlock(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue;
}
setBlockIntoRes(pInfo, &block);
if (pBlockInfo->rows > 0) {
break;
}
}
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
break; break;
} }
/*blockDataCleanup(pInfo->pRes);*/
pInfo->tqReader->pMsg = NULL;
} }
// record the scan action. // record the scan action.
...@@ -2558,30 +2581,30 @@ typedef struct STableMergeScanInfo { ...@@ -2558,30 +2581,30 @@ typedef struct STableMergeScanInfo {
SArray* pSortInfo; SArray* pSortInfo;
SSortHandle* pSortHandle; SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock; SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
SArray* sortSourceParams; SArray* sortSourceParams;
SFileBlockLoadRecorder readRecorder; SFileBlockLoadRecorder readRecorder;
int64_t numOfRows; int64_t numOfRows;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo; SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset; int32_t* rowEntryInfoOffset;
SExprInfo* pExpr; SExprInfo* pExpr;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SArray* pColMatchInfo; SArray* pColMatchInfo;
int32_t numOfOutput; int32_t numOfOutput;
SExprInfo* pPseudoExpr; SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr; int32_t numOfPseudoExpr;
SqlFunctionCtx* pPseudoCtx; SqlFunctionCtx* pPseudoCtx;
SQueryTableDataCond cond; SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag; int32_t dataBlockLoadFlag;
// if the upstream is an interval operator, the interval info is also kept here to get the time // if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded. // window to check if current data block needs to be loaded.
SInterval interval; SInterval interval;
...@@ -2589,7 +2612,8 @@ typedef struct STableMergeScanInfo { ...@@ -2589,7 +2612,8 @@ typedef struct STableMergeScanInfo {
} STableMergeScanInfo; } STableMergeScanInfo;
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idStr) { STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idStr) {
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -81,7 +81,7 @@ SStreamMergedSubmit* streamMergedSubmitNew() { ...@@ -81,7 +81,7 @@ SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM); SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM);
if (pMerged == NULL) return NULL; if (pMerged == NULL) return NULL;
pMerged->reqs = taosArrayInit(0, sizeof(void*)); pMerged->reqs = taosArrayInit(0, sizeof(void*));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); pMerged->dataRefs = taosArrayInit(0, sizeof(int32_t*));
if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL; if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL;
return pMerged; return pMerged;
FAIL: FAIL:
...@@ -93,7 +93,7 @@ FAIL: ...@@ -93,7 +93,7 @@ FAIL:
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
taosArrayPush(pMerged->dataRefs, pSubmit->dataRef); taosArrayPush(pMerged->dataRefs, pSubmit->dataRef);
taosArrayPush(pMerged->reqs, pSubmit->data); taosArrayPush(pMerged->reqs, &pSubmit->data);
pMerged->ver = pSubmit->ver; pMerged->ver = pSubmit->ver;
return 0; return 0;
} }
...@@ -167,7 +167,7 @@ void streamFreeQitem(SStreamQueueItem* data) { ...@@ -167,7 +167,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
int32_t* ref = taosArrayGet(pMerge->dataRefs, i); int32_t* ref = taosArrayGet(pMerge->dataRefs, i);
(*ref)--; (*ref)--;
if (*ref == 0) { if (*ref == 0) {
void* data = taosArrayGet(pMerge->reqs, i); void* data = taosArrayGetP(pMerge->reqs, i);
taosMemoryFree(data); taosMemoryFree(data);
taosMemoryFree(ref); taosMemoryFree(ref);
} }
......
...@@ -417,7 +417,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -417,7 +417,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
} }
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) { if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) {
wError("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId,
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer); ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
...@@ -425,7 +425,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -425,7 +425,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
if (pRead->curInvalid || pRead->curVersion != ver) { if (pRead->curInvalid || pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) { if (walReadSeekVer(pRead, ver) < 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr()); wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr());
return -1; return -1;
} }
seeked = true; seeked = true;
...@@ -452,7 +452,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -452,7 +452,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
contLen = walValidHeadCksum(pRead->pHead); contLen = walValidHeadCksum(pRead->pHead);
if (contLen != 0) { if (contLen != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId,
ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
...@@ -479,7 +480,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -479,7 +480,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
} }
if (pRead->pHead->head.version != ver) { if (pRead->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pRead->pHead->head.version, ver); pRead->pHead->head.version, ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
...@@ -489,7 +490,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -489,7 +490,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
contLen = walValidBodyCksum(pRead->pHead); contLen = walValidBodyCksum(pRead->pHead);
if (contLen != 0) { if (contLen != 0) {
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
......
...@@ -173,6 +173,46 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp) ...@@ -173,6 +173,46 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
pArray->size = pos + 1; pArray->size = pos + 1;
} }
void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) {
assert(pArray);
size_t size = pArray->size;
if (size <= 1) {
return;
}
int32_t pos = 0;
for (int32_t i = 1; i < size; ++i) {
char* p1 = taosArrayGet(pArray, pos);
char* p2 = taosArrayGet(pArray, i);
if (comparFn(p1, p2) == 0) {
// do nothing
} else {
if (pos + 1 != i) {
void* p = taosArrayGet(pArray, pos + 1);
if (fp != NULL) {
fp(p);
}
taosArraySet(pArray, pos + 1, p2);
pos += 1;
} else {
pos += 1;
}
}
}
if (fp != NULL) {
for (int32_t i = pos + 1; i < pArray->size; ++i) {
void* p = taosArrayGetP(pArray, i);
fp(p);
}
}
pArray->size = pos + 1;
}
void* taosArrayAddAll(SArray* pArray, const SArray* pInput) { void* taosArrayAddAll(SArray* pArray, const SArray* pInput) {
if (pInput) { if (pInput) {
return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput)); return taosArrayAddBatch(pArray, pInput->pData, (int32_t)taosArrayGetSize(pInput));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册