提交 ab52d28c 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 6ae82b07
...@@ -254,17 +254,18 @@ typedef struct SExchangeInfo { ...@@ -254,17 +254,18 @@ typedef struct SExchangeInfo {
SArray* pSourceDataInfo; SArray* pSourceDataInfo;
tsem_t ready; tsem_t ready;
void* pTransporter; void* pTransporter;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator // passed by downstream operator
SArray* pResultBlockList; SArray* pReadyBlocks;
int32_t rspBlockIndex; // indicate the return block index in pResultBlockList SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default bool seqLoadData; // sequential load data or not, false by default
int32_t current; int32_t current;
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
uint64_t self; uint64_t self;
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
} SExchangeInfo; } SExchangeInfo;
typedef struct SScanInfo { typedef struct SScanInfo {
......
...@@ -74,9 +74,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -74,9 +74,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
STableKeyInfo* pList = tableListGetInfo(pTableList, 0); STableKeyInfo* pList = tableListGetInfo(pTableList, 0);
size_t num = tableListGetSize(pTableList);
uint64_t suid = tableListGetSuid(pTableList); uint64_t suid = tableListGetSuid(pTableList);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader); taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
......
...@@ -100,14 +100,23 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -100,14 +100,23 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
int32_t index = 0; int32_t index = 0;
char* pStart = pRetrieveRsp->data; char* pStart = pRetrieveRsp->data;
while (index++ < pRetrieveRsp->numOfBlocks) { while (index++ < pRetrieveRsp->numOfBlocks) {
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); SSDataBlock* pb = NULL;
void* p = taosArrayPop(pExchangeInfo->pRecycledBlocks);
if (p != NULL) {
pb = *(SSDataBlock**) p;
blockDataCleanup(pb);
} else {
pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
}
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
if (code != 0) { if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp); taosMemoryFreeClear(pDataInfo->pRsp);
goto _error; goto _error;
} }
taosArrayPush(pExchangeInfo->pResultBlockList, &pb); taosArrayPush(pExchangeInfo->pReadyBlocks, &pb);
} }
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator); updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator);
...@@ -170,23 +179,26 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { ...@@ -170,23 +179,26 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
size_t size = taosArrayGetSize(pExchangeInfo->pResultBlockList); // we have buffered retrieved datablock, return it directly
if (size == 0 || pExchangeInfo->rspBlockIndex >= size) { SSDataBlock** p = taosArrayPop(pExchangeInfo->pReadyBlocks);
pExchangeInfo->rspBlockIndex = 0; if (p != NULL) {
taosArrayClearEx(pExchangeInfo->pResultBlockList, freeBlock); taosArrayPush(pExchangeInfo->pRecycledBlocks, p);
return *p;
} else {
if (pExchangeInfo->seqLoadData) { if (pExchangeInfo->seqLoadData) {
seqLoadRemoteData(pOperator); seqLoadRemoteData(pOperator);
} else { } else {
concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
} }
if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) { if (taosArrayGetSize(pExchangeInfo->pReadyBlocks) == 0) {
return NULL; return NULL;
} else {
p = taosArrayPop(pExchangeInfo->pReadyBlocks);
taosArrayPush(pExchangeInfo->pRecycledBlocks, p);
return *p;
} }
} }
// we have buffered retrieved datablock, return it directly
return taosArrayGetP(pExchangeInfo->pResultBlockList, pExchangeInfo->rspBlockIndex++);
} }
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
...@@ -284,7 +296,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode ...@@ -284,7 +296,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc); pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES); pInfo->pReadyBlocks = taosArrayInit(64, POINTER_BYTES);
pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
pInfo->seqLoadData = false; pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter; pInfo->pTransporter = pTransporter;
...@@ -326,11 +339,9 @@ void doDestroyExchangeOperatorInfo(void* param) { ...@@ -326,11 +339,9 @@ void doDestroyExchangeOperatorInfo(void* param) {
taosArrayDestroy(pExInfo->pSources); taosArrayDestroy(pExInfo->pSources);
taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo); taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
if (pExInfo->pResultBlockList != NULL) { taosArrayDestroyEx(pExInfo->pReadyBlocks, freeBlock);
taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock); taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
pExInfo->pResultBlockList = NULL;
}
blockDataDestroy(pExInfo->pDummyBlock); blockDataDestroy(pExInfo->pDummyBlock);
tsem_destroy(&pExInfo->ready); tsem_destroy(&pExInfo->ready);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册