提交 0fbf30ce 编写于 作者: H Haojun Liao

refactor: do some internal refactor, and add some check before create exchange operator.

上级 514e86ff
......@@ -689,16 +689,15 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
if (TSDB_CODE_SUCCESS == code) {
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
}
if (TSDB_CODE_SUCCESS == code) {
schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob,
pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest);
} else {
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
......
......@@ -778,9 +778,9 @@ TEST(testCase, async_api_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
taos_query(pConn, "use test");
taos_query(pConn, "use nest");
TAOS_RES* pRes = taos_query(pConn, "desc abc1.tu");
TAOS_RES* pRes = taos_query(pConn, "select NOW() from (select * from regular_table_2 where tbname in ('regular_table_2_1') and q_bigint <= 9223372036854775807 and q_tinyint <= 127 and q_bool in ( true , false) ) order by ts;");
if (taos_errno(pRes) != 0) {
printf("failed, reason:%s\n", taos_errstr(pRes));
}
......
......@@ -322,7 +322,6 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
pColumnInfoData->varmeta.length = pSource->varmeta.length;
} else {
char* tmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(numOfRows));
printf("----------------%d\n", BitmapLen(numOfRows));
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -760,7 +760,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
......
......@@ -1821,7 +1821,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree) {
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
if (pFilterNode == NULL) {
return;
}
......@@ -2829,7 +2829,6 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
return seqLoadRemoteData(pOperator);
} else {
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
// return concurrentlyLoadRemoteData(pOperator);
}
}
......@@ -2855,9 +2854,14 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return TSDB_CODE_SUCCESS;
}
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo) {
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
if (numOfSources == 0) {
qError("%s invalid number: %d of sources in exchange operator", id, (int32_t) numOfSources);
return TSDB_CODE_INVALID_PARA;
}
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
......@@ -2879,7 +2883,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
goto _error;
}
int32_t code = initExchangeOperator(pExNode, pInfo);
int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -2908,7 +2912,7 @@ _error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
pTaskInfo->code = code;
return NULL;
}
......@@ -3677,7 +3681,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code);
}
doFilter(pProjectInfo->pFilterNode, pBlock, true);
doFilter(pProjectInfo->pFilterNode, pBlock);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
......@@ -5189,7 +5193,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
&(*pTaskInfo)->tableqinfoList, pPlan->pTagCond);
if (NULL == (*pTaskInfo)->pRoot) {
code = terrno;
code = (*pTaskInfo)->code;
goto _complete;
}
......@@ -5202,7 +5206,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
_complete:
taosMemoryFreeClear(*pTaskInfo);
terrno = code;
return code;
}
......
......@@ -359,7 +359,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes, true);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
......
......@@ -267,7 +267,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
}
int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock, false);
doFilter(pTableScanInfo->pFilterNode, pBlock);
int64_t et = taosGetTimestampMs();
pTableScanInfo->readRecorder.filterTime += (et - st);
......@@ -939,7 +939,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
}
doFilter(pInfo->pCondition, pInfo->pRes, false);
doFilter(pInfo->pCondition, pInfo->pRes);
blockDataUpdateTsWindow(pInfo->pRes, 0);
break;
}
......@@ -1711,7 +1711,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
}
pRes->info.rows = count;
doFilter(pInfo->pFilterNode, pRes, true);
doFilter(pInfo->pFilterNode, pRes);
pOperator->resultInfo.totalRows += pRes->info.rows;
......
......@@ -296,7 +296,10 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
}
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) {
SArray* pColMatchInfo, SOperatorInfo* pOperator) {
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataCleanup(pDataBlock);
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
......@@ -354,6 +357,8 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
}
blockDataDestroy(p);
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.rows);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}
......@@ -371,7 +376,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
}
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes,
pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pInfo);
pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pOperator);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
......
......@@ -227,6 +227,8 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
// set current source id done
if (pSource->src.pBlock == NULL) {
pSource->src.rowIndex = -1;
++pHandle->numOfCompletedSources;
......@@ -426,8 +428,16 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64,
pHandle->idStr, (int32_t) (sortPass + 1), pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0, pHandle->sortElapsed, pHandle->totalElapsed);
if (sortPass > 0) {
size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0;
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu
", sort elapsed:%" PRId64 ", total elapsed:%" PRId64,
pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed);
} else {
qDebug("%s ordered source:%"PRIzu", available buf:%d, no need internal sort", pHandle->idStr, numOfSources,
pHandle->numOfPages);
}
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册