提交 232a4415 编写于 作者: H Haojun Liao

[td-2895] refactor

上级 75984ed3
......@@ -958,7 +958,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
}
while (1) {
int64_t newRows = taosFillResultDataBlock(pFillInfo, pResPages, pLocalMerge->resColModel->capacity);
int64_t newRows = taosFillResultDataBlock(pFillInfo, (void**)pResPages, pLocalMerge->resColModel->capacity);
if (pQueryInfo->limit.offset < newRows) {
newRows -= pQueryInfo->limit.offset;
......
......@@ -185,6 +185,7 @@ typedef struct SSDataBlock {
SDataBlockInfo info;
} SSDataBlock;
typedef struct SQuery {
SLimitVal limit;
......@@ -395,19 +396,22 @@ typedef struct SHashIntervalOperatorInfo {
SQueryRuntimeEnv *pRuntimeEnv;
SQLFunctionCtx *pCtx;
SResultRowInfo resultRowInfo;
SSDataBlock *pRes;
} SHashIntervalOperatorInfo;
typedef struct SFillOperatorInfo {
SResultRowInfo *pResultRowInfo;
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
SSDataBlock *pRes;
} SFillOperatorInfo;
typedef struct SFilterOperatorInfo {
typedef struct SHashGroupbyOperatorInfo {
SResultRowInfo *pResultRowInfo;
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
} SFilterOperatorInfo;
SQLFunctionCtx *pCtx;
SResultRowInfo resultRowInfo;
SSDataBlock *pRes;
} SHashGroupbyOperatorInfo;
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
......
......@@ -24,6 +24,8 @@ extern "C" {
#include "qExtbuffer.h"
#include "taosdef.h"
struct SSDataBlock;
typedef struct {
STColumn col; // column info
int16_t functionId; // sql function id
......@@ -80,6 +82,8 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput);
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput);
bool taosFillHasMoreResults(SFillInfo* pFillInfo);
......@@ -88,7 +92,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity);
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity);
#ifdef __cplusplus
}
......
......@@ -186,8 +186,12 @@ static SOperatorInfo* createArithOperatorInfo(STableQueryInfo* pTableQueryInfo,
static SOperatorInfo* createLimitOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createOffsetOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createFilterOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
//static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size);
......@@ -1310,6 +1314,7 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOpera
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
tsCols = pColDataInfo->pData;
assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows-1] == pSDataBlock->info.window.ekey);
}
TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info, tsCols, step);
......@@ -1384,6 +1389,39 @@ static void hashIntervalAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOpera
}
}
static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* item = pQuery->current;
SDataBlockInfo* pBlockInfo = &pSDataBlock->info;
int16_t type = 0;
int16_t bytes = 0;
char* groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pSDataBlock->pDataBlock);
for (int32_t j = 0; j < pBlockInfo->rows; ++j) {
int32_t offset = GET_COL_DATA_POS(pQuery, j, 1);
char *val = groupbyColumnData + bytes * offset;
if (isNull(val, type)) { // ignore the null value
continue;
}
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
}
}
}
/**
* todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions
* @param pRuntimeEnv
......@@ -2333,6 +2371,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
if (pQuery->fillType != TSDB_FILL_NONE) {
pRuntimeEnv->proot = createFillOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->proot);
}
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkResultBuf == 1);
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
......@@ -3013,7 +3055,10 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SResultRowInfo * pWindowResInfo,
void* pQueryHandle, SSDataBlock* pBlock, uint32_t* status) {
*status = BLK_DATA_NO_NEEDED;
pBlock->pDataBlock = NULL;
pBlock->pBlockStatis = NULL;
SQuery *pQuery = pRuntimeEnv->pQuery;
int64_t groupId = pQuery->current->groupIndex;
......@@ -4613,6 +4658,7 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG
static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
pBlock->info.rows = 0;
if (!hasRemainData(pGroupResInfo)) {
return;
}
......@@ -4620,6 +4666,12 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
doCopyToSData_rv(pRuntimeEnv, pGroupResInfo, orderType, pBlock);
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
STimeWindow* w = &pBlock->info.window;
w->skey = *(int64_t*)pInfoData->pData;
w->ekey = *(int64_t*)(pInfoData->pData + TSDB_KEYSIZE * (pBlock->info.rows - 1));
}
static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv) {
......@@ -4763,8 +4815,8 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
}
}
#if 0
int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) {
SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
......@@ -4774,13 +4826,13 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) {
// todo apply limit output function
/* reached the start position of according to offset value, return immediately */
if (pQuery->limit.offset == 0) {
qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows", pQInfo, pFillInfo->numOfRows, ret);
qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows", pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret);
return ret;
}
if (pQuery->limit.offset < ret) {
qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%" PRId64 ". Discard due to offset, remain:%" PRId64 ", new offset:%d",
pQInfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, ret - pQuery->limit.offset, 0);
pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, ret - pQuery->limit.offset, 0);
ret -= (int32_t)pQuery->limit.offset;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { //???pExpr1 or pExpr2
......@@ -4792,7 +4844,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) {
return ret;
} else {
qDebug("QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%" PRId64 ". Discard due to offset, "
"remain:%d, new offset:%" PRId64, pQInfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, 0,
"remain:%d, new offset:%" PRId64, pRuntimeEnv->qinfo, pFillInfo->numOfRows, ret, pQuery->limit.offset, 0,
pQuery->limit.offset - ret);
pQuery->limit.offset -= ret;
......@@ -4806,6 +4858,28 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst) {
}
}
}
#endif
int32_t doFillGapsInResults_rv(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock *pOutput) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES);
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
p[i] = pColInfoData->pData;
}
pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, (int32_t)pQuery->rec.capacity);
// no data in current data after fill
int32_t numOfTotal = (int32_t)getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, (int32_t)pQuery->rec.capacity);
if (numOfTotal == 0) {
return 0;
}
return pOutput->info.rows;
}
void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
......@@ -6525,12 +6599,19 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
SHashIntervalOperatorInfo* pIntervalInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv;
if (hasRemainData(&pRuntimeEnv->groupResInfo)) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = true;
}
return pIntervalInfo->pRes;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
SOperatorInfo* upstream = pOperator->upstream;
pQuery->pos = 0;
......@@ -6550,59 +6631,96 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
hashIntervalAgg(pRuntimeEnv, pOperator, pIntervalInfo->pCtx, pBlock);
}
pOperator->completed = true;
closeAllResultRows(&pRuntimeEnv->resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pRuntimeEnv);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pRes);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
return pRes;
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = true;
}
return pIntervalInfo->pRes;
}
static SSDataBlock* doFill(void* param) {
static SSDataBlock* doHashGroupbyAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed) {
return NULL;
}
SFillOperatorInfo *pInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
SHashIntervalOperatorInfo* pIntervalInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv;
if (hasRemainData(&pRuntimeEnv->groupResInfo)) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = true;
}
return pIntervalInfo->pRes;
}
SOperatorInfo* upstream = pOperator->upstream;
pRuntimeEnv->pQuery->pos = 0;
while(1) {
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
SSDataBlock* pBlock = upstream->exec(upstream);
if (pBlock == NULL) {
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
return NULL;
break;
}
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
memcpy(pQuery->sdata[i]->data, pColInfoData->pData, pColInfoData->info.bytes*pBlock->info.rows);
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order);
hashGroupbyAgg(pRuntimeEnv, pOperator, pIntervalInfo->pCtx, pBlock);
}
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
taosFillSetDataBlockFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage **)pQuery->sdata);
closeAllResultRows(&pRuntimeEnv->resultRowInfo);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pRuntimeEnv);
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// here the pQuery->rec.rows == 0
if (!hasRemainData(&pRuntimeEnv->groupResInfo) && !taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) {
break;
}
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->completed = true;
}
return pIntervalInfo->pRes;
}
static SSDataBlock* doFill(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed) {
return NULL;
}
return NULL;
}
SFillOperatorInfo *pInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv;
//SSDataBlock* doFilter(void* param) {
//
//}
if (taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) {
doFillGapsInResults_rv(pRuntimeEnv, pInfo->pRes);
return pInfo->pRes;
}
while(1) {
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) {
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, 0, pRuntimeEnv->pQuery->window.ekey);
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
} else {
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
taosFillSetInputDataBlock(pRuntimeEnv->pFillInfo, pBlock);
}
doFillGapsInResults_rv(pRuntimeEnv, pInfo->pRes);
return pInfo->pRes;
}
return pInfo->pRes;
}
// todo set the attribute of query scan count
static int32_t getNumOfScanTimes(SQuery* pQuery) {
......@@ -6734,36 +6852,41 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ
pOperator->optInfo = pInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
return pOperator;
}
UNUSED_FUNC SOperatorInfo* createFilterOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo));
static UNUSED_FUNC SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SHashGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SHashGroupbyOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pTableQueryInfo = pTableQueryInfo;
SQuery* pQuery = pRuntimeEnv->pQuery;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "FilterOp";
pOperator->blockingOptr = false;
pOperator->name = "HashGroupbyOp";
pOperator->blockingOptr = true;
pOperator->completed = false;
pOperator->upstream = upstream;
pOperator->exec = NULL;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
pOperator->exec = doHashGroupbyAgg;
pOperator->pExpr = pQuery->pExpr1;
pOperator->numOfOutput = pQuery->numOfOutput;
pOperator->optInfo = pInfo;
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
return pOperator;
}
static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pTableQueryInfo = pTableQueryInfo;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
......@@ -6776,6 +6899,7 @@ static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTable
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
pOperator->optInfo = pInfo;
pInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
return pOperator;
}
......@@ -6796,7 +6920,7 @@ void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
}
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows;
pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows:0;
}
static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
......@@ -6857,7 +6981,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
#endif
}
static void copyAndFillResult(SQInfo* pQInfo) {
static UNUSED_FUNC void copyAndFillResult(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
......@@ -6877,7 +7001,7 @@ static void copyAndFillResult(SQInfo* pQInfo) {
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, (int32_t)pQuery->rec.rows, lastKey);
taosFillSetDataBlockFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage **)pQuery->sdata);
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata);
// pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata);
if (pQuery->rec.rows > 0) {
limitOperator(pQuery, pQInfo);
......@@ -6908,46 +7032,21 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
}
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows;
#if 0
// scanOneTableDataBlocks(pRuntimeEnv, newStartKey);
// finalizeQueryResult(pRuntimeEnv);
// skip offset result rows
// pQuery->rec.rows = 0;
// not fill or no result generated during this query
if (pQuery->fillType == TSDB_FILL_NONE || pRuntimeEnv->resultRowInfo.size == 0 || isPointInterpoQuery(pQuery)) {
// all data scanned, the group by normal column can return
int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->resultRowInfo);
if (pQuery->limit.offset > numOfClosed || numOfClosed == 0) {
return;
}
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset);
copyToOutputBuf(pQInfo, &pRuntimeEnv->resultRowInfo);
doSecondaryArithmeticProcess(pQuery);
limitOperator(pQuery, pQInfo);
} else {
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, 0);
return copyAndFillResult(pQInfo);
}
#endif
pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows:0;
}
void tableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
#if 0
if (hasNotReturnedResults(pRuntimeEnv, &pRuntimeEnv->groupResInfo)) {
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
/*
* There are remain results that are not returned due to result interpolation
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
*/
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata);
// pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata);
if (pQuery->rec.rows > 0) {
limitOperator(pQuery, pQInfo);
qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
......@@ -6975,6 +7074,7 @@ void tableQueryImpl(SQInfo *pQInfo) {
return;
}
#endif
// number of points returned during this query
pQuery->rec.rows = 0;
......
......@@ -23,18 +23,19 @@
#include "qFill.h"
#include "qExtbuffer.h"
#include "queryLog.h"
#include "qExecutor.h"
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t genRows) {
static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) {
SFillColInfo* pCol = &pFillInfo->pFillCol[j];
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
continue;
}
char* val1 = elePtrAt(data[j]->data, pCol->col.bytes, genRows);
char* val1 = elePtrAt(data[j], pCol->col.bytes, genRows);
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
......@@ -44,17 +45,17 @@ static void setTagsValue(SFillInfo* pFillInfo, tFilePage** data, int32_t genRows
}
}
static void setNullValueForRow(SFillInfo* pFillInfo, tFilePage** data, int32_t numOfCol, int32_t rowIndex) {
static void setNullValueForRow(SFillInfo* pFillInfo, void** data, int32_t numOfCol, int32_t rowIndex) {
// the first are always the timestamp column, so start from the second column.
for (int32_t i = 1; i < numOfCol; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
char* output = elePtrAt(data[i]->data, pCol->col.bytes, rowIndex);
char* output = elePtrAt(data[i], pCol->col.bytes, rowIndex);
setNull(output, pCol->col.type, pCol->col.bytes);
}
}
static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** srcData, int64_t ts, bool outOfBound) {
static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData, int64_t ts, bool outOfBound) {
char* prev = pFillInfo->prevValues;
char* next = pFillInfo->nextValues;
......@@ -63,7 +64,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
// set the primary timestamp column value
int32_t index = pFillInfo->numOfCurrent;
char* val = elePtrAt(data[0]->data, TSDB_KEYSIZE, index);
char* val = elePtrAt(data[0], TSDB_KEYSIZE, index);
*(TSKEY*) val = pFillInfo->currentKey;
// set the other values
......@@ -77,7 +78,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
continue;
}
char* output = elePtrAt(data[i]->data, pCol->col.bytes, index);
char* output = elePtrAt(data[i], pCol->col.bytes, index);
assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type);
}
} else { // no prev value yet, set the value for NULL
......@@ -93,7 +94,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
continue;
}
char* output = elePtrAt(data[i]->data, pCol->col.bytes, index);
char* output = elePtrAt(data[i], pCol->col.bytes, index);
assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type);
}
} else { // no prev value yet, set the value for NULL
......@@ -111,7 +112,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
int16_t type = pCol->col.type;
int16_t bytes = pCol->col.bytes;
char *val1 = elePtrAt(data[i]->data, pCol->col.bytes, index);
char *val1 = elePtrAt(data[i], pCol->col.bytes, index);
if (type == TSDB_DATA_TYPE_BINARY|| type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BOOL) {
setNull(val1, pCol->col.type, bytes);
continue;
......@@ -132,7 +133,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
continue;
}
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, index);
char* val1 = elePtrAt(data[i], pCol->col.bytes, index);
assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type);
}
}
......@@ -162,7 +163,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* bu
}
}
static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t outputRows) {
static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputRows) {
pFillInfo->numOfCurrent = 0;
char** srcData = pFillInfo->pData;
......@@ -213,7 +214,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou
continue;
}
char* output = elePtrAt(data[i]->data, pCol->col.bytes, pFillInfo->numOfCurrent);
char* output = elePtrAt(data[i], pCol->col.bytes, pFillInfo->numOfCurrent);
char* src = elePtrAt(srcData[i], pCol->col.bytes, pFillInfo->index);
if (i == 0 || (pCol->functionId != TSDB_FUNC_COUNT && !isNull(src, pCol->col.type)) ||
......@@ -255,7 +256,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t ou
return pFillInfo->numOfCurrent;
}
static int64_t appendFilledResult(SFillInfo* pFillInfo, tFilePage** output, int64_t resultCapacity) {
static int64_t appendFilledResult(SFillInfo* pFillInfo, void** output, int64_t resultCapacity) {
/*
* These data are generated according to fill strategy, since the current timestamp is out of the time window of
* real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
......@@ -420,6 +421,15 @@ void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pI
}
}
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) {
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
pFillInfo->pData[i] = pColData->pData;
// memcpy(pFillInfo->pData[i], pInput[i]->data, pFillInfo->numOfRows * pFillInfo->pFillCol[i].col.bytes);
}
}
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput) {
assert(pFillInfo->numOfRows == pInput->num);
......@@ -490,7 +500,7 @@ int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint*
return TSDB_CODE_SUCCESS;
}
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity) {
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity) {
int32_t remain = taosNumOfRemainRows(pFillInfo);
int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册