未验证 提交 91e2a6b9 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17936 from taosdata/feature/3_liaohj

refactor: refactor to avoid spin operator in exchange operator.
...@@ -251,10 +251,10 @@ typedef struct SLimitInfo { ...@@ -251,10 +251,10 @@ typedef struct SLimitInfo {
} SLimitInfo; } SLimitInfo;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray* pSources; SArray* pSources;
SArray* pSourceDataInfo; SArray* pSourceDataInfo;
tsem_t ready; tsem_t ready;
void* pTransporter; void* pTransporter;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that // SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator // passed by downstream operator
SArray* pResultBlockList; SArray* pResultBlockList;
......
...@@ -1853,16 +1853,34 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -1853,16 +1853,34 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
int32_t completed = 0;
for (int32_t k = 0; k < totalSources; ++k) {
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
}
}
if (completed == totalSources) {
setAllSourcesCompleted(pOperator, startTs);
return;
}
while (1) { while (1) {
int32_t completed = 0; // printf("1\n");
tsem_wait(&pExchangeInfo->ready);
// printf("2\n");
for (int32_t i = 0; i < totalSources; ++i) { for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1; // printf("========:%d is completed\n", i);
continue; continue;
} }
// printf("index:%d - status:%d\n", i, pDataInfo->status);
if (pDataInfo->status != EX_SOURCE_DATA_READY) { if (pDataInfo->status != EX_SOURCE_DATA_READY) {
// printf("-----------%d, status:%d, continue\n", i, pDataInfo->status);
continue; continue;
} }
...@@ -1874,22 +1892,31 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -1874,22 +1892,31 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
SRetrieveTableRsp* pRsp = pDataInfo->pRsp; SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
// todo
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
// printf("%d completed, try next\n", i);
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources); pExchangeInfo->loadInfo.totalRows, completed, i + 1, totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed += 1;
taosMemoryFreeClear(pDataInfo->pRsp); taosMemoryFreeClear(pDataInfo->pRsp);
continue;
// if (completed == totalSources) {
// return;
// } else {
// break;
// }
break;
} }
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
int32_t index = 0; int32_t index = 0;
char* pStart = pRetrieveRsp->data; char* pStart = pRetrieveRsp->data;
while (index++ < pRetrieveRsp->numOfBlocks) { while (index++ < pRetrieveRsp->numOfBlocks) {
printf("results, numOfBLock: %d\n", pRetrieveRsp->numOfBlocks);
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
if (code != 0) { if (code != 0) {
...@@ -1902,20 +1929,26 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -1902,20 +1929,26 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
// int32_t completed = 0;
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
// for (int32_t k = 0; k < totalSources; ++k) {
// SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
// if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
// completed += 1;
// }
// }
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d" " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
" index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb, completed:%d try next %d/%" PRIzu,
", total:%.2f Kb,"
" completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
completed + 1, i + 1, totalSources); completed, i + 1, totalSources);
completed += 1;
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
} }
...@@ -1931,15 +1964,24 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ...@@ -1931,15 +1964,24 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
} }
} }
// if (completed == totalSources) {
// setAllSourcesCompleted(pOperator, startTs);
// }
return; return;
} }
int32_t completed = 0;
for (int32_t k = 0; k < totalSources; ++k) {
SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k);
if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
}
}
if (completed == totalSources) { if (completed == totalSources) {
setAllSourcesCompleted(pOperator, startTs);
return; return;
} }
sched_yield();
} }
_error: _error:
...@@ -1970,6 +2012,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { ...@@ -1970,6 +2012,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
pOperator->cost.openCost = taosGetTimestampUs() - startTs; pOperator->cost.openCost = taosGetTimestampUs() - startTs;
tsem_wait(&pExchangeInfo->ready); tsem_wait(&pExchangeInfo->ready);
tsem_post(&pExchangeInfo->ready);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -956,8 +956,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -956,8 +956,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5); pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5);
taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false); if (pInfo->metaCache.pTableMetaEntryCache == NULL) {
code = terrno;
goto _error;
}
taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
getTableScannerExecInfo); getTableScannerExecInfo);
......
...@@ -630,13 +630,16 @@ static int getDefaultCacheShardBits(size_t capacity) { ...@@ -630,13 +630,16 @@ static int getDefaultCacheShardBits(size_t capacity) {
SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) { SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) {
if (numShardBits >= 20) { if (numShardBits >= 20) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL; return NULL;
} }
if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) { if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL; return NULL;
} }
SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache)); SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache));
if (!cache) { if (!cache) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -648,7 +651,7 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo ...@@ -648,7 +651,7 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard)); cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard));
if (!cache->shards) { if (!cache->shards) {
taosMemoryFree(cache); taosMemoryFree(cache);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
......
...@@ -186,6 +186,7 @@ endi ...@@ -186,6 +186,7 @@ endi
sql select t1, count(*), first(c9) from $stb partition by t1 order by t1 asc slimit 3 sql select t1, count(*), first(c9) from $stb partition by t1 order by t1 asc slimit 3
if $rows != 3 then if $rows != 3 then
print expect 3, actual: $rows
return -1 return -1
endi endi
if $data(1)[1] != 1 then if $data(1)[1] != 1 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册