提交 602d2d1c 编写于 作者: H Haojun Liao

[td-225] refactor

上级 86bfad2b
...@@ -179,6 +179,12 @@ typedef struct { ...@@ -179,6 +179,12 @@ typedef struct {
SArray* pResult; // SArray<SStddevInterResult> SArray* pResult; // SArray<SStddevInterResult>
} SInterResult; } SInterResult;
typedef struct SSDataBlock {
SDataStatis *pBlockStatis;
SArray *pDataBlock;
SDataBlockInfo info;
} SSDataBlock;
typedef struct SQuery { typedef struct SQuery {
int16_t numOfCols; int16_t numOfCols;
int16_t numOfTags; int16_t numOfTags;
...@@ -214,6 +220,7 @@ typedef struct SQuery { ...@@ -214,6 +220,7 @@ typedef struct SQuery {
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
SSDataBlock *ouptputBuf;
} SQuery; } SQuery;
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
...@@ -307,12 +314,6 @@ typedef struct SQueryParam { ...@@ -307,12 +314,6 @@ typedef struct SQueryParam {
SSqlGroupbyExpr *pGroupbyExpr; SSqlGroupbyExpr *pGroupbyExpr;
} SQueryParam; } SQueryParam;
typedef struct SSDataBlock {
SDataStatis *pBlockStatis;
SArray *pDataBlock;
SDataBlockInfo info;
} SSDataBlock;
typedef struct STableScanInfo { typedef struct STableScanInfo {
SQInfo* pQInfo; SQInfo* pQInfo;
void *pQueryHandle; void *pQueryHandle;
......
...@@ -176,6 +176,22 @@ static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQ ...@@ -176,6 +176,22 @@ static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQ
static STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime); static STableScanInfo* createTableScanInfo(void* pTsdbQueryHandle, SQInfo* pQInfo, int32_t repeatTime);
static int32_t getNumOfScanTimes(SQuery* pQuery); static int32_t getNumOfScanTimes(SQuery* pQuery);
static SSDataBlock* createOutputBuf(SQuery* pQuery) {
// setup the output buffer
SSDataBlock *res = calloc(1, sizeof(SSDataBlock));
res->info.numOfCols = pQuery->numOfOutput;
res->pDataBlock = taosArrayInit(pQuery->numOfOutput, sizeof(SColumnInfoData));
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SColumnInfoData idata = {0};
idata.info.type = pQuery->pExpr1[i].type;
idata.info.bytes = pQuery->pExpr1[i].bytes;
idata.info.colId = pQuery->pExpr1[i].base.resColId;
idata.pData = calloc(4096, idata.info.bytes);
taosArrayPush(res->pDataBlock, &idata);
}
}
bool doFilterData(SQuery *pQuery, int32_t elemPos) { bool doFilterData(SQuery *pQuery, int32_t elemPos) {
for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) {
SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k]; SSingleColumnFilterInfo *pFilterInfo = &pQuery->pFilterInfo[k];
...@@ -1150,6 +1166,36 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc ...@@ -1150,6 +1166,36 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
} }
} }
static void aggApplyFunctions_rv(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
SArray *pDataBlock) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery * pQuery = pRuntimeEnv->pQuery;
TSKEY *tsCols = NULL;
if (pDataBlock != NULL) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0);
tsCols = (TSKEY *)(pColInfo->pData);
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &pRuntimeEnv->sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &pQuery->pExpr1[k]);
}
/*
* the sqlfunctionCtx parameters should be set done before all functions are invoked,
* since the selectivity + tag_prj query needs all parameters been set done.
* tag_prj function are changed to be TSDB_FUNC_TAG_DUMMY
*/
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
pCtx[k].startTs = pQuery->window.skey;
aAggs[functionId].xFunction(&pCtx[k]);
}
}
}
/** /**
* todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions
* @param pRuntimeEnv * @param pRuntimeEnv
...@@ -5634,37 +5680,46 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) { ...@@ -5634,37 +5680,46 @@ static void doSecondaryArithmeticProcess(SQuery* pQuery) {
tfree(arithSup.data); tfree(arithSup.data);
} }
static SSDataBlock* doTableScan(void* param) { static SSDataBlock* doScanImpl(STableScanInfo *pTableScanInfo) {
STableScanInfo * pTableScanInfo = (STableScanInfo *)param;
SQueryRuntimeEnv *pRuntimeEnv = &pTableScanInfo->pQInfo->runtimeEnv;
SSDataBlock *pBlock = &pTableScanInfo->block; SSDataBlock *pBlock = &pTableScanInfo->block;
while (pTableScanInfo->current < pTableScanInfo->times) {
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
pTableScanInfo->numOfBlocks += 1;
// todo check for query cancel while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
pTableScanInfo->numOfBlocks += 1;
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); // todo check for query cancel
SDataStatis *pStatis = pBlock->pBlockStatis; tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
// this function never returns error? SDataStatis *pStatis = pBlock->pBlockStatis;
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis);
pTableScanInfo->numOfBlockStatis += 1;
if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block // this function never returns error?
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis);
pTableScanInfo->numOfRows += pBlock->info.rows; pTableScanInfo->numOfBlockStatis += 1;
}
return pBlock; if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL);
pTableScanInfo->numOfRows += pBlock->info.rows;
}
return pBlock;
}
}
static SSDataBlock* doTableScan(void* param) {
STableScanInfo * pTableScanInfo = (STableScanInfo *)param;
SQueryRuntimeEnv *pRuntimeEnv = &pTableScanInfo->pQInfo->runtimeEnv;
while (pTableScanInfo->current < pTableScanInfo->times) {
SSDataBlock* p = doScanImpl(pTableScanInfo);
if (p != NULL) {
return p;
} }
if (++pTableScanInfo->current >= pTableScanInfo->times) { if (++pTableScanInfo->current >= pTableScanInfo->times) {
return NULL; return NULL;
} }
// do prepare for the next round table scan operation
tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle); tsdbCleanupQueryHandle(pTableScanInfo->pQueryHandle);
STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window); STsdbQueryCond cond = createTsdbQueryCond(pRuntimeEnv->pQuery, &pRuntimeEnv->pQuery->window);
pTableScanInfo->pQueryHandle = pTableScanInfo->pQueryHandle =
...@@ -5697,29 +5752,16 @@ static SSDataBlock* doTableScan(void* param) { ...@@ -5697,29 +5752,16 @@ static SSDataBlock* doTableScan(void* param) {
tsdbQueryTables(pTableScanInfo->pQInfo->tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo, tsdbQueryTables(pTableScanInfo->pQInfo->tsdb, &cond, &pTableScanInfo->pQInfo->tableGroupInfo,
pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->memRef); pTableScanInfo->pQInfo, &pTableScanInfo->pQInfo->memRef);
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
pTableScanInfo->numOfBlocks += 1; pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey);
// todo check for query cancel
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
SDataStatis *pStatis = pBlock->pBlockStatis;
// this function never returns error?
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->pQueryHandle, &pStatis);
pTableScanInfo->numOfBlockStatis += 1;
if (pBlock->pBlockStatis == NULL) { // data block statistics does not exist, load data block pTableScanInfo->times = 1;
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pQueryHandle, NULL); pTableScanInfo->current = 0;
pTableScanInfo->numOfRows += pBlock->info.rows;
}
return pBlock; SSDataBlock* p = doScanImpl(pTableScanInfo);
if (p != NULL) {
return p;
} }
qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
pTableScanInfo->pQInfo, cond.twindow.skey, cond.twindow.ekey);
} }
return NULL; return NULL;
...@@ -5761,27 +5803,6 @@ static UNUSED_FUNC int32_t getTableScanTime(STableScanInfo* pTableScanInfo) { ...@@ -5761,27 +5803,6 @@ static UNUSED_FUNC int32_t getTableScanTime(STableScanInfo* pTableScanInfo) {
static SSDataBlock* doAggOperator(void* param) { static SSDataBlock* doAggOperator(void* param) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
// setup the output buffer
SSDataBlock* res = calloc(1, sizeof(SSDataBlock));
SQuery* pQuery = pInfo->pRuntimeEnv->pQuery;
res->info.numOfCols = pQuery->numOfOutput;
res->pDataBlock = taosArrayInit(pQuery->numOfOutput, sizeof(SColumnInfoData));
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SColumnInfoData idata = {0};
idata.info.type = pQuery->pExpr1[i].type;
idata.info.bytes = pQuery->pExpr1[i].bytes;
idata.info.colId = pQuery->pExpr1[i].base.resColId;
idata.pData = calloc(4096, idata.info.bytes);
taosArrayPush(res->pDataBlock, &idata);
pInfo->pRuntimeEnv->pCtx[i].pOutput = idata.pData;
}
pQuery->pos = 0;
int32_t countId = 0; int32_t countId = 0;
while(1) { while(1) {
SSDataBlock* pBlock = pInfo->pTableScanInfo->apply(pInfo->pTableScanInfo); SSDataBlock* pBlock = pInfo->pTableScanInfo->apply(pInfo->pTableScanInfo);
...@@ -5793,7 +5814,7 @@ static SSDataBlock* doAggOperator(void* param) { ...@@ -5793,7 +5814,7 @@ static SSDataBlock* doAggOperator(void* param) {
needRepeatScan(pInfo->pRuntimeEnv); needRepeatScan(pInfo->pRuntimeEnv);
} }
blockwiseApplyFunctions(pInfo->pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pInfo->pResultRowInfo, binarySearchForKey, pBlock->pDataBlock); aggApplyFunctions_rv(pInfo->pRuntimeEnv, pBlock->pBlockStatis, &pBlock->info, pBlock->pDataBlock);
} }
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
...@@ -6872,6 +6893,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr ...@@ -6872,6 +6893,7 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
} }
doUpdateExprColumnIndex(pQuery); doUpdateExprColumnIndex(pQuery);
pQuery->ouptputBuf = createOutputBuf(pQuery);
int32_t ret = createFilterInfo(pQInfo, pQuery); int32_t ret = createFilterInfo(pQInfo, pQuery);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册