提交 9d16e862 编写于 作者: H Haojun Liao

[td-13039] refactor.

上级 fb36f5e2
......@@ -501,12 +501,6 @@ typedef struct SProjectOperatorInfo {
int64_t curOutput;
} SProjectOperatorInfo;
typedef struct SLimitOperatorInfo {
SLimit limit;
int64_t currentOffset;
int64_t currentRows;
} SLimitOperatorInfo;
typedef struct SSLimitOperatorInfo {
int64_t groupTotal;
int64_t currentGroupOffset;
......@@ -525,11 +519,6 @@ typedef struct SSLimitOperatorInfo {
int64_t threshold;
} SSLimitOperatorInfo;
typedef struct SFilterOperatorInfo {
SSingleColumnFilterInfo* pFilterInfo;
int32_t numOfFilterCols;
} SFilterOperatorInfo;
typedef struct SFillOperatorInfo {
struct SFillInfo* pFillInfo;
SSDataBlock* pRes;
......@@ -682,22 +671,12 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput);
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters);
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win);
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
......
......@@ -194,9 +194,6 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
}
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset);
void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset);
static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
......@@ -214,8 +211,6 @@ static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
// static STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win);
static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
......@@ -264,10 +259,6 @@ static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision,
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput,
SExprInfo* pExpr);
static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput,
SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId,
SExecTaskInfo* pTaskInfo);
......@@ -3344,11 +3335,6 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCt
offset += pLocalExprInfo->base.resSchema.bytes;
}
// todo : use index to avoid iterator all possible output columns
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddev(pRuntimeEnv, pCtx, numOfOutput, pExprInfo);
}
}
// set the tsBuf start position before check each data block
......@@ -3544,19 +3530,6 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput)
}
}
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity) {
SSDataBlock* pDataBlock = pBInfo->pRes;
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
int32_t functionId = pBInfo->pCtx[i].functionId;
if (functionId < 0) {
memset(pBInfo->pCtx[i].pOutput, 0, pColInfo->info.bytes * (*bufCapacity));
}
}
}
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
for (int32_t j = 0; j < size; ++j) {
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
......@@ -3714,23 +3687,6 @@ STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow
return pTableQueryInfo;
}
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win) {
STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(1, sizeof(STableQueryInfo));
// pTableQueryInfo->win = win;
pTableQueryInfo->lastKey = win.skey;
// set more initial size of interval/groupby query
int32_t initialSize = 16;
int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableQueryInfo);
return NULL;
}
return pTableQueryInfo;
}
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
if (pTableQueryInfo == NULL) {
return;
......@@ -3834,30 +3790,6 @@ void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKe
pAggInfo->groupId = tableGroupId;
}
void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfCols,
int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
SFilePage* page = getBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
int16_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResult->offset, offset);
offset += pCtx[i].resDataInfo.bytes;
int32_t functionId = pCtx[i].functionId;
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF ||
functionId == FUNCTION_DERIVATIVE) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
}
/*
* set the output buffer information and intermediate buffer,
* not all queries require the interResultBuf, such as COUNT
*/
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
}
}
void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
......@@ -4626,73 +4558,6 @@ static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_
return terrno;
}
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner, SArray* pOperator,
void* param) {
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
pQueryAttr->tsdb = tsdb;
if (tsdb != NULL) {
int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pQueryAttr->interBufSize = getOutputInterResultBufSize(pQueryAttr);
pRuntimeEnv->groupResInfo.totalGroup = (int32_t)(pQueryAttr->stableQuery ? GET_NUM_OF_TABLEGROUP(pRuntimeEnv) : 0);
pRuntimeEnv->enableGroupData = false;
pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pTsBuf = pTsBuf;
pRuntimeEnv->cur.vgroupIndex = -1;
setResultBufSize(pQueryAttr, &pRuntimeEnv->resultInfo);
if (sourceOptr != NULL) {
assert(pRuntimeEnv->proot == NULL);
pRuntimeEnv->proot = sourceOptr;
}
if (pTsBuf != NULL) {
int16_t order = (pQueryAttr->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order);
}
int32_t ps = 4096;
getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize);
int32_t TENMB = 1024 * 1024 * 10;
int32_t code = createDiskbasedBuf(&pRuntimeEnv->pResultBuf, ps, TENMB, "", "/tmp");
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// create runtime environment
int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
pQInfo->summary.queryProfEvents = taosArrayInit(512, sizeof(SQueryProfEvent));
if (pQInfo->summary.queryProfEvents == NULL) {
// qDebug("QInfo:0x%"PRIx64" failed to allocate query prof events array", pQInfo->qId);
}
pQInfo->summary.operatorProfResults =
taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK);
if (pQInfo->summary.operatorProfResults == NULL) {
// qDebug("QInfo:0x%"PRIx64" failed to allocate operator prof results hash", pQInfo->qId);
}
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t)pQueryAttr->tableGroupInfo.numOfTables, pOperator, param);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// setTaskStatus(pOperator->pTaskInfo, QUERY_NOT_COMPLETED);
return TSDB_CODE_SUCCESS;
}
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
#if 0
if (order == TSDB_ORDER_ASC) {
......@@ -6890,58 +6755,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL;
}
static SSDataBlock* doLimit(SOperatorInfo* pOperator, bool* newgroup) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SLimitOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL;
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
while (1) {
publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pDownstream->getNextFn(pDownstream, newgroup);
publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
return NULL;
}
if (pInfo->currentOffset == 0) {
break;
} else if (pInfo->currentOffset >= pBlock->info.rows) {
pInfo->currentOffset -= pBlock->info.rows;
} else { // TODO handle the data movement
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
pBlock->info.rows = remain;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes);
}
pInfo->currentOffset = 0;
break;
}
}
if (pInfo->currentRows + pBlock->info.rows >= pInfo->limit.limit) {
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->currentRows);
pInfo->currentRows = pInfo->limit.limit;
doSetOperatorCompleted(pOperator);
} else {
pInfo->currentRows += pBlock->info.rows;
}
return pBlock;
}
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
......@@ -7791,11 +7604,6 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pSortInfo);
}
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*)param;
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
}
static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param;
taosHashCleanup(pInfo->pSet);
......@@ -9175,41 +8983,6 @@ _complete:
return code;
}
int32_t cloneExprFilterInfo(SColumnFilterInfo** dst, SColumnFilterInfo* src, int32_t filterNum) {
if (filterNum <= 0) {
return TSDB_CODE_SUCCESS;
}
*dst = taosMemoryCalloc(filterNum, sizeof(*src));
if (*dst == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(*dst, src, sizeof(*src) * filterNum);
for (int32_t i = 0; i < filterNum; i++) {
if ((*dst)[i].filterstr && dst[i]->len > 0) {
void* pz = taosMemoryCalloc(1, (size_t)(*dst)[i].len + 1);
if (pz == NULL) {
if (i == 0) {
taosMemoryFree(*dst);
} else {
freeColumnFilterInfo(*dst, i);
}
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(pz, (void*)src->pz, (size_t)src->len + 1);
(*dst)[i].pz = (int64_t)pz;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs,
int32_t numOfOutput, int32_t tagLen, bool superTable) {
for (int32_t i = 0; i < numOfOutput; ++i) {
......@@ -9232,113 +9005,6 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol
return TSDB_CODE_SUCCESS;
}
// TODO tag length should be passed from client, refactor
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters) {
tExprNode* expr = NULL;
TRY(TSDB_MAX_TAG_CONDITIONS) { expr = exprTreeFromBinary(data, len); }
CATCH(code) {
CLEANUP_EXECUTE();
return code;
}
END_TRY
if (expr == NULL) {
// qError("failed to create expr tree");
return TSDB_CODE_QRY_APP_ERROR;
}
// int32_t ret = filterInitFromTree(expr, pFilters, 0);
// tExprTreeDestroy(expr, NULL);
// return ret;
}
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo**
// pFilterInfo, uint64_t qId) {
// *pFilterInfo = taosMemoryCalloc(1, sizeof(SSingleColumnFilterInfo) * numOfFilterCols);
// if (*pFilterInfo == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
//
// for (int32_t i = 0, j = 0; i < numOfCols; ++i) {
// if (pCols[i].flist.numOfFilters > 0) {
// SSingleColumnFilterInfo* pFilter = &((*pFilterInfo)[j]);
//
// memcpy(&pFilter->info, &pCols[i], sizeof(SColumnInfo));
// pFilter->info = pCols[i];
//
// pFilter->numOfFilters = pCols[i].flist.numOfFilters;
// pFilter->pFilters = taosMemoryCalloc(pFilter->numOfFilters, sizeof(SColumnFilterElem));
// if (pFilter->pFilters == NULL) {
// return TSDB_CODE_QRY_OUT_OF_MEMORY;
// }
//
// for (int32_t f = 0; f < pFilter->numOfFilters; ++f) {
// SColumnFilterElem* pSingleColFilter = &pFilter->pFilters[f];
// pSingleColFilter->filterInfo = pCols[i].flist.filterInfo[f];
//
// int32_t lower = pSingleColFilter->filterInfo.lowerRelOptr;
// int32_t upper = pSingleColFilter->filterInfo.upperRelOptr;
// if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) {
// //qError("QInfo:0x%"PRIx64" invalid filter info", qId);
// return TSDB_CODE_QRY_INVALID_MSG;
// }
//
// pSingleColFilter->fp = getFilterOperator(lower, upper);
// if (pSingleColFilter->fp == NULL) {
// //qError("QInfo:0x%"PRIx64" invalid filter info", qId);
// return TSDB_CODE_QRY_INVALID_MSG;
// }
//
// pSingleColFilter->bytes = pCols[i].bytes;
//
// if (lower == TSDB_RELATION_IN) {
//// buildFilterSetFromBinary(&pSingleColFilter->q, (char *)(pSingleColFilter->filterInfo.pz),
///(int32_t)(pSingleColFilter->filterInfo.len));
// }
// }
//
// j++;
// }
// }
//
// return TSDB_CODE_SUCCESS;
//}
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
// for (int32_t i = 0; i < numOfFilterCols; ++i) {
// if (pFilterInfo[i].numOfFilters > 0) {
// if (pFilterInfo[i].pFilters->filterInfo.lowerRelOptr == TSDB_RELATION_IN) {
// taosHashCleanup((SHashObj *)(pFilterInfo[i].pFilters->q));
// }
// taosMemoryFreeClear(pFilterInfo[i].pFilters);
// }
// }
//
// taosMemoryFreeClear(pFilterInfo);
return NULL;
}
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId) {
for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) {
// if (pQueryAttr->tableCols[i].flist.numOfFilters > 0 && pQueryAttr->tableCols[i].flist.filterInfo != NULL) {
// pQueryAttr->numOfFilterCols++;
// }
}
if (pQueryAttr->numOfFilterCols == 0) {
return TSDB_CODE_SUCCESS;
}
// doCreateFilterInfo(pQueryAttr->tableCols, pQueryAttr->numOfCols, pQueryAttr->numOfFilterCols,
// &pQueryAttr->pFilterInfo, qId);
pQueryAttr->createFilterOperator = true;
return TSDB_CODE_SUCCESS;
}
static void doUpdateExprColumnIndex(STaskAttr* pQueryAttr) {
assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册