提交 9932fa6b 编写于 作者: H Haojun Liao

refactor: refactor to avoid spin operator in exchange operator.

上级 0e11d344
......@@ -251,10 +251,10 @@ typedef struct SLimitInfo {
} SLimitInfo;
typedef struct SExchangeInfo {
SArray* pSources;
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
SArray* pSources;
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator
SArray* pResultBlockList;
......
......@@ -1854,6 +1854,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
while (1) {
tsem_wait(&pExchangeInfo->ready);
int32_t completed = 0;
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
......@@ -1874,6 +1876,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
// todo
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
......@@ -1883,7 +1886,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
completed += 1;
taosMemoryFreeClear(pDataInfo->pRsp);
continue;
break;
// continue;
}
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
......@@ -1904,10 +1908,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d"
" index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb,"
" completed:%d try next %d/%" PRIzu,
" execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", total:%.2f Kb, completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
completed + 1, i + 1, totalSources);
......@@ -1915,7 +1917,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
} else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
" execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
}
......@@ -1939,7 +1941,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
return;
}
sched_yield();
// sched_yield();
}
_error:
......@@ -1970,6 +1972,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
tsem_wait(&pExchangeInfo->ready);
tsem_post(&pExchangeInfo->ready);
return TSDB_CODE_SUCCESS;
}
......
......@@ -956,8 +956,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->pTaskInfo = pTaskInfo;
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5);
taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false);
if (pInfo->metaCache.pTableMetaEntryCache == NULL) {
code = terrno;
goto _error;
}
taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
getTableScannerExecInfo);
......
......@@ -630,13 +630,16 @@ static int getDefaultCacheShardBits(size_t capacity) {
SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) {
if (numShardBits >= 20) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache));
if (!cache) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
......@@ -648,7 +651,7 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard));
if (!cache->shards) {
taosMemoryFree(cache);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册