提交 51db5f57 编写于 作者: H Haojun Liao

[td-225] refactor

上级 966ca160
...@@ -856,33 +856,28 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -856,33 +856,28 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
for (int32_t i = 0; i < output; ++i) { for (int32_t i = 0; i < output; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SSqlExpr *pExpr = pField->pSqlExpr; SSqlExpr *pExpr = pField->pSqlExpr;
// this should be switched to projection query
if (pExpr != NULL) { if (pExpr != NULL) {
if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) {
tscError("%p table schema is not matched with parsed sql", pSql); tscError("%p table schema is not matched with parsed sql", pSql);
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
pSqlFuncExpr1->colInfo.colId = htons(pExpr->colInfo.colId); pSqlFuncExpr1->numOfParams = 0; // no params for projection query
pSqlFuncExpr1->colInfo.colIndex = htons(pExpr->colInfo.colIndex); pSqlFuncExpr1->functionId = htons(TSDB_FUNC_PRJ);
pSqlFuncExpr1->colInfo.flag = htons(pExpr->colInfo.flag); pSqlFuncExpr1->colInfo.colId = htons(pExpr->resColId);
pSqlFuncExpr1->colInfo.flag = htons(TSDB_COL_NORMAL);
pSqlFuncExpr1->functionId = htons(pExpr->functionId);
pSqlFuncExpr1->numOfParams = htons(pExpr->numOfParams);
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { for (int32_t f = 0; f < tscSqlExprNumOfExprs(pQueryInfo); ++f) {
// todo add log SSqlExpr *pe = tscSqlExprGet(pQueryInfo, f);
pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); if (pe == pExpr) {
pSqlFuncExpr1->arg[j].argBytes = htons(pExpr->param[j].nLen); pSqlFuncExpr1->colInfo.colIndex = htons(f);
break;
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
pMsg += pExpr->param[j].nLen;
} else {
pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64);
} }
} }
pMsg += sizeof(SSqlFuncMsg);
pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
} else { } else {
assert(pField->pArithExprInfo != NULL); assert(pField->pArithExprInfo != NULL);
......
...@@ -283,12 +283,12 @@ typedef struct SQueryRuntimeEnv { ...@@ -283,12 +283,12 @@ typedef struct SQueryRuntimeEnv {
SArithmeticSupport *sasArray; SArithmeticSupport *sasArray;
SOperatorInfo* pi; SOperatorInfo* pi;
SSDataBlock *outputBuf; SSDataBlock *outputBuf;
int32_t groupIndex; int32_t groupIndex;
int32_t tableIndex; int32_t tableIndex;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SOperatorInfo* proot; SOperatorInfo *proot;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
...@@ -362,8 +362,6 @@ typedef struct STableScanInfo { ...@@ -362,8 +362,6 @@ typedef struct STableScanInfo {
int64_t elapsedTime; int64_t elapsedTime;
} STableScanInfo; } STableScanInfo;
SOperatorInfo optrList[5];
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
SResultRowInfo *pResultRowInfo; SResultRowInfo *pResultRowInfo;
STableQueryInfo *pTableQueryInfo; STableQueryInfo *pTableQueryInfo;
...@@ -396,10 +394,19 @@ typedef struct SHashIntervalOperatorInfo { ...@@ -396,10 +394,19 @@ typedef struct SHashIntervalOperatorInfo {
SQLFunctionCtx *pCtx; SQLFunctionCtx *pCtx;
} SHashIntervalOperatorInfo; } SHashIntervalOperatorInfo;
typedef struct SFillOperatorInfo {
SResultRowInfo *pResultRowInfo;
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
} SFillOperatorInfo;
void freeParam(SQueryParam *param); void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
SColumnInfo* pTagCols); SColumnInfo* pTagCols);
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr);
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code); SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql); SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql);
......
...@@ -1925,6 +1925,7 @@ static void last_function(SQLFunctionCtx *pCtx) { ...@@ -1925,6 +1925,7 @@ static void last_function(SQLFunctionCtx *pCtx) {
continue; continue;
} }
} }
memcpy(pCtx->pOutput, data, pCtx->inputBytes); memcpy(pCtx->pOutput, data, pCtx->inputBytes);
TSKEY ts = GET_TS_DATA(pCtx, i); TSKEY ts = GET_TS_DATA(pCtx, i);
......
...@@ -1220,10 +1220,11 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc ...@@ -1220,10 +1220,11 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
} }
static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock, int32_t order) {
if (pCtx[0].pInput == NULL && pSDataBlock->pDataBlock != NULL) { if (pCtx[0].pInput == NULL && pSDataBlock->pDataBlock != NULL) {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].size = pSDataBlock->info.rows; pCtx[i].size = pSDataBlock->info.rows;
pCtx[i].order = order;
SColIndex *pCol = &pOperator->pExpr[i].base.colInfo; SColIndex *pCol = &pOperator->pExpr[i].base.colInfo;
if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) {
...@@ -1234,16 +1235,17 @@ static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, S ...@@ -1234,16 +1235,17 @@ static void setInputSDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, S
SQLFunctionCtx* pCtx1 = &pCtx[i]; SQLFunctionCtx* pCtx1 = &pCtx[i];
pCtx1->pInput = p->pData; pCtx1->pInput = p->pData;
uint32_t status = aAggs[pCtx->functionId].status; uint32_t status = aAggs[pCtx1->functionId].status;
if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) { if ((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) {
SColumnInfoData *tsInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData *tsInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
pCtx->ptsList = tsInfo->pData; pCtx1->ptsList = tsInfo->pData;
} }
} }
} }
} else { } else {
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
pCtx[i].size = pSDataBlock->info.rows; pCtx[i].size = pSDataBlock->info.rows;
pCtx[i].order = order;
} }
} }
} }
...@@ -6307,7 +6309,7 @@ static UNUSED_FUNC int32_t getTableScanId(STableScanInfo* pTableScanInfo) { ...@@ -6307,7 +6309,7 @@ static UNUSED_FUNC int32_t getTableScanId(STableScanInfo* pTableScanInfo) {
return pTableScanInfo->current; return pTableScanInfo->current;
} }
static UNUSED_FUNC int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
return pTableScanInfo->order; return pTableScanInfo->order;
} }
...@@ -6324,9 +6326,12 @@ static SSDataBlock* doAggregation(void* param) { ...@@ -6324,9 +6326,12 @@ static SSDataBlock* doAggregation(void* param) {
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
SOperatorInfo* upstream = pOperator->upstream;
setDefaultOutputBuf(pRuntimeEnv, pCtx, pRuntimeEnv->outputBuf); SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
setDefaultOutputBuf(pRuntimeEnv, pCtx, pRes);
int32_t order = pQuery->order.order;
SOperatorInfo* upstream = pOperator->upstream;
pQuery->pos = 0; pQuery->pos = 0;
while(1) { while(1) {
...@@ -6335,8 +6340,13 @@ static SSDataBlock* doAggregation(void* param) { ...@@ -6335,8 +6340,13 @@ static SSDataBlock* doAggregation(void* param) {
break; break;
} }
if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) {
STableScanInfo* pScanInfo = upstream->optInfo;
order = getTableScanOrder(pScanInfo);
}
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pCtx, pBlock); setInputSDataBlock(pOperator, pCtx, pBlock, order);
aggApplyFunctions(pRuntimeEnv, pOperator, pCtx, pBlock); aggApplyFunctions(pRuntimeEnv, pOperator, pCtx, pBlock);
} }
...@@ -6344,10 +6354,10 @@ static SSDataBlock* doAggregation(void* param) { ...@@ -6344,10 +6354,10 @@ static SSDataBlock* doAggregation(void* param) {
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pRuntimeEnv); finalizeQueryResult(pRuntimeEnv);
pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv); pRes->info.rows = getNumOfResult(pRuntimeEnv);
destroySQLFunctionCtx(pCtx, pRes->info.numOfCols);
destroySQLFunctionCtx(pCtx, pRuntimeEnv->outputBuf->info.numOfCols); return pRes;
return pRuntimeEnv->outputBuf;
} }
static SSDataBlock* doArithmeticOperation(void* param) { static SSDataBlock* doArithmeticOperation(void* param) {
...@@ -6357,19 +6367,17 @@ static SSDataBlock* doArithmeticOperation(void* param) { ...@@ -6357,19 +6367,17 @@ static SSDataBlock* doArithmeticOperation(void* param) {
SQueryRuntimeEnv* pRuntimeEnv = pArithInfo->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pArithInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
if (pArithInfo->pCtx == NULL) { if (pArithInfo->pCtx == NULL) {
pArithInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); pArithInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
} }
setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, pRes); setDefaultOutputBuf(pRuntimeEnv, pArithInfo->pCtx, pRes);
SOperatorInfo* upstream = pOperator->upstream;
pRuntimeEnv->pQuery->pos = 0; pRuntimeEnv->pQuery->pos = 0;
while(1) { while(1) {
SSDataBlock* pBlock = upstream->exec(upstream); SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
break; break;
...@@ -6387,6 +6395,8 @@ static SSDataBlock* doArithmeticOperation(void* param) { ...@@ -6387,6 +6395,8 @@ static SSDataBlock* doArithmeticOperation(void* param) {
SColumnInfoData *p = taosArrayGet(pBlock->pDataBlock, j); SColumnInfoData *p = taosArrayGet(pBlock->pDataBlock, j);
if (p->info.colId == pCol->colId) { if (p->info.colId == pCol->colId) {
pArithInfo->pCtx[i].pInput = p->pData; pArithInfo->pCtx[i].pInput = p->pData;
pArithInfo->pCtx[i].inputType = p->info.type;
pArithInfo->pCtx[i].inputBytes = p->info.bytes;
break; break;
} }
} }
...@@ -6401,7 +6411,6 @@ static SSDataBlock* doArithmeticOperation(void* param) { ...@@ -6401,7 +6411,6 @@ static SSDataBlock* doArithmeticOperation(void* param) {
} }
} }
pRuntimeEnv->outputBuf = pRes;
return pRes; return pRes;
} }
...@@ -6413,8 +6422,7 @@ static SSDataBlock* doLimit(void* param) { ...@@ -6413,8 +6422,7 @@ static SSDataBlock* doLimit(void* param) {
SLimitOperatorInfo* pInfo = pOperator->optInfo; SLimitOperatorInfo* pInfo = pOperator->optInfo;
SOperatorInfo* upstream = pOperator->upstream; SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream);
SSDataBlock* pBlock = upstream->exec(upstream);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED);
pOperator->completed = true; pOperator->completed = true;
...@@ -6435,10 +6443,9 @@ static SSDataBlock* doOffset(void* param) { ...@@ -6435,10 +6443,9 @@ static SSDataBlock* doOffset(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo *)param; SOperatorInfo *pOperator = (SOperatorInfo *)param;
SOffsetOperatorInfo *pInfo = pOperator->optInfo; SOffsetOperatorInfo *pInfo = pOperator->optInfo;
SOperatorInfo* upstream = pOperator->upstream;
while (1) { while (1) {
SSDataBlock *pBlock = upstream->exec(upstream); SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED); setQueryStatus(pInfo->pRuntimeEnv->pQuery, QUERY_COMPLETED);
return NULL; return NULL;
...@@ -6471,14 +6478,17 @@ static SSDataBlock* doHashIntervalAgg(void* param) { ...@@ -6471,14 +6478,17 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
return NULL; return NULL;
} }
SAggOperatorInfo* pAggInfo = pOperator->optInfo; SHashIntervalOperatorInfo* pIntervalInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv;
SQueryRuntimeEnv* pRuntimeEnv = pIntervalInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId); SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, pQuery->order.order, pQuery->vgId);
SOperatorInfo* upstream = pOperator->upstream; SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
int32_t order = pQuery->order.order;
SOperatorInfo* upstream = pOperator->upstream;
pQuery->pos = 0; pQuery->pos = 0;
while(1) { while(1) {
...@@ -6487,8 +6497,13 @@ static SSDataBlock* doHashIntervalAgg(void* param) { ...@@ -6487,8 +6497,13 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
break; break;
} }
if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) {
STableScanInfo* pScanInfo = upstream->optInfo;
order = getTableScanOrder(pScanInfo);
}
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pCtx, pBlock); setInputSDataBlock(pOperator, pCtx, pBlock, order);
hashIntervalAgg(pRuntimeEnv, pOperator, pCtx, pBlock); hashIntervalAgg(pRuntimeEnv, pOperator, pCtx, pBlock);
} }
...@@ -6501,10 +6516,47 @@ static SSDataBlock* doHashIntervalAgg(void* param) { ...@@ -6501,10 +6516,47 @@ static SSDataBlock* doHashIntervalAgg(void* param) {
destroySQLFunctionCtx(pCtx, pOperator->numOfOutput); destroySQLFunctionCtx(pCtx, pOperator->numOfOutput);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pRuntimeEnv->resultRowInfo, pQuery->limit.offset);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pRuntimeEnv->outputBuf); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pRes);
// pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv); return pRes;
return pRuntimeEnv->outputBuf; }
static SSDataBlock* doFill(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->completed) {
return NULL;
}
SFillOperatorInfo *pInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
while(1) {
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) {
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
return NULL;
}
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
memcpy(pQuery->sdata[i]->data, pColInfoData->pData, pColInfoData->info.bytes*pBlock->info.rows);
}
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
taosFillSetDataBlockFromFilePage(pRuntimeEnv->pFillInfo, (const tFilePage **)pQuery->sdata);
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata);
// here the pQuery->rec.rows == 0
if (!hasRemainData(&pRuntimeEnv->groupResInfo) && !taosFillHasMoreResults(pRuntimeEnv->pFillInfo)) {
break;
}
return NULL;
}
return NULL;
} }
// todo set the attribute of query scan count // todo set the attribute of query scan count
...@@ -6630,6 +6682,28 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ ...@@ -6630,6 +6682,28 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ
return pOperator; return pOperator;
} }
static UNUSED_FUNC SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->pTableQueryInfo = pTableQueryInfo;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "FillOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->upstream = inputOptr;
pOperator->exec = doFill;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
pOperator->optInfo = pInfo;
return pOperator;
}
/* /*
* in each query, this function will be called only once, no retry for further result. * in each query, this function will be called only once, no retry for further result.
* *
...@@ -6644,8 +6718,8 @@ void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -6644,8 +6718,8 @@ void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
return; return;
} }
SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = pResBlock->info.rows; pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows;
} }
static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
...@@ -6664,8 +6738,8 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -6664,8 +6738,8 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
// return; // return;
// } // }
SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = (pResBlock != NULL)? pResBlock->info.rows : 0; pQuery->rec.rows = (pRuntimeEnv->outputBuf != NULL)? pRuntimeEnv->outputBuf->info.rows : 0;
#if 0 #if 0
while (1) { while (1) {
...@@ -6756,8 +6830,8 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { ...@@ -6756,8 +6830,8 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
} }
} }
SSDataBlock* pResBlock = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = pResBlock->info.rows; pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows;
#if 0 #if 0
// scanOneTableDataBlocks(pRuntimeEnv, newStartKey); // scanOneTableDataBlocks(pRuntimeEnv, newStartKey);
// finalizeQueryResult(pRuntimeEnv); // finalizeQueryResult(pRuntimeEnv);
...@@ -7469,6 +7543,114 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu ...@@ -7469,6 +7543,114 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr) {
*pExprInfo = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SExprInfo *pExprs = (SExprInfo *)calloc(numOfOutput, sizeof(SExprInfo));
if (pExprs == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType);
for (int32_t i = 0; i < numOfOutput; ++i) {
pExprs[i].base = *pExprMsg[i];
pExprs[i].bytes = 0;
int16_t type = 0;
int16_t bytes = 0;
// parse the arithmetic expression
if (pExprs[i].base.functionId == TSDB_FUNC_ARITHM) {
code = buildArithmeticExprFromMsg(&pExprs[i], pQueryMsg);
if (code != TSDB_CODE_SUCCESS) {
tfree(pExprs);
return code;
}
type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypes[type].bytes;
// } else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX && pExprs[i].base.functionId == TSDB_FUNC_TAGPRJ) { // parse the normal column
// SSchema* s = tGetTbnameColumnSchema();
// type = s->type;
// bytes = s->bytes;
// } else if (pExprs[i].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
// SSchema s = tGetBlockDistColumnSchema();
// type = s.type;
// bytes = s.bytes;
// } else if (pExprs[i].base.colInfo.colId <= TSDB_UD_COLUMN_INDEX) {
// // it is a user-defined constant value column
// assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ);
//
// type = pExprs[i].base.arg[1].argType;
// bytes = pExprs[i].base.arg[1].argBytes;
//
// if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
// bytes += VARSTR_HEADER_SIZE;
// }
} else {
int32_t index = pExprs[i].base.colInfo.colIndex;
assert(prevExpr[index].base.resColId == pExprs[i].base.colInfo.colId);
// int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
// if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) {
// if (j < TSDB_BLOCK_DIST_COLUMN_INDEX || j >= pQueryMsg->numOfTags) {
// return TSDB_CODE_QRY_INVALID_MSG;
// }
// } else {
// if (j < PRIMARYKEY_TIMESTAMP_COL_INDEX || j >= pQueryMsg->numOfCols) {
// return TSDB_CODE_QRY_INVALID_MSG;
// }
// }
//
// if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX && j >= 0) {
// SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j];
// type = pCol->type;
// bytes = pCol->bytes;
// } else {
// SSchema* s = tGetTbnameColumnSchema();
type = prevExpr[index].type;
bytes = prevExpr[index].bytes;
// }
}
int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64;
if (getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].type, &pExprs[i].bytes,
&pExprs[i].interBytes, 0, isSuperTable) != TSDB_CODE_SUCCESS) {
tfree(pExprs);
return TSDB_CODE_QRY_INVALID_MSG;
}
assert(isValidDataType(pExprs[i].type));
}
// // TODO refactor
// for (int32_t i = 0; i < numOfOutput; ++i) {
// pExprs[i].base = *pExprMsg[i];
// int16_t functId = pExprs[i].base.functionId;
//
// if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM) {
// int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
// if (j < 0 || j >= pQueryMsg->numOfCols) {
// return TSDB_CODE_QRY_INVALID_MSG;
// } else {
// SColumnInfo *pCol = &pQueryMsg->colList[j];
// int32_t ret =
// getResultDataInfo(pCol->type, pCol->bytes, functId, (int32_t)pExprs[i].base.arg[0].argValue.i64,
// &pExprs[i].type, &pExprs[i].bytes, &pExprs[i].interBytes, tagLen, isSuperTable);
// assert(ret == TSDB_CODE_SUCCESS);
// }
// }
// }
*pExprInfo = pExprs;
return TSDB_CODE_SUCCESS;
}
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code) { SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code) {
if (pQueryMsg->numOfGroupCols == 0) { if (pQueryMsg->numOfGroupCols == 0) {
return NULL; return NULL;
...@@ -7667,8 +7849,6 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr ...@@ -7667,8 +7849,6 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
} }
doUpdateExprColumnIndex(pQuery); doUpdateExprColumnIndex(pQuery);
pQInfo->runtimeEnv.outputBuf = createOutputBuf(pQuery->pExpr1, pQuery->numOfOutput);
int32_t ret = createFilterInfo(pQInfo, pQuery); int32_t ret = createFilterInfo(pQInfo, pQuery);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
goto _cleanup; goto _cleanup;
......
...@@ -96,7 +96,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -96,7 +96,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
} }
if (param.pSecExprMsg != NULL) { if (param.pSecExprMsg != NULL) {
if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, &param.pSecExprs, param.pSecExprMsg, param.pTagColumnInfo)) != TSDB_CODE_SUCCESS) { if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, &param.pSecExprs, param.pSecExprMsg, param.pExprs)) != TSDB_CODE_SUCCESS) {
goto _over; goto _over;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册