提交 9b224d9b 编写于 作者: H Haojun Liao

[td-225] refactor

上级 40bdfdb2
......@@ -237,6 +237,17 @@ typedef struct SQuery {
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
} SQuery;
typedef SSDataBlock* (*__operator_fn_t)(void* param);
typedef struct SOperatorInfo {
char *name;
bool blockingOptr;
void *optInfo;
__operator_fn_t exec;
struct SOperatorInfo *upstream;
} SOperatorInfo;
typedef struct SQueryRuntimeEnv {
jmp_buf env;
SQuery* pQuery;
......@@ -329,8 +340,6 @@ typedef struct SQueryParam {
SSqlGroupbyExpr *pGroupbyExpr;
} SQueryParam;
typedef SSDataBlock* (*__operator_fn_t)(void* param);
typedef struct STableScanInfo {
SQueryRuntimeEnv *pRuntimeEnv;
void *pQueryHandle;
......@@ -350,23 +359,19 @@ typedef struct STableScanInfo {
int64_t elapsedTime;
} STableScanInfo;
typedef struct SOperatorInfo {
char *name;
bool blockingOptr;
void *optInfo;
__operator_fn_t exec;
} SOperatorInfo;
SOperatorInfo optrList[5];
typedef struct SAggOperatorInfo {
SResultRowInfo *pResultRowInfo;
STableQueryInfo *pTableQueryInfo;
SOperatorInfo *prevOptr;
SQueryRuntimeEnv *pRuntimeEnv;
} SAggOperatorInfo;
typedef struct SArithOperatorInfo {
STableQueryInfo *pTableQueryInfo;
SQueryRuntimeEnv *pRuntimeEnv;
} SArithOperatorInfo;
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
......
......@@ -173,8 +173,8 @@ static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArr
static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win);
static STableIdInfo createTableIdInfo(SQuery* pQuery);
static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
static STableScanInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
static int32_t getNumOfScanTimes(SQuery* pQuery);
static SSDataBlock* createOutputBuf(SQuery* pQuery) {
......@@ -1220,6 +1220,63 @@ static void aggApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSData
}
}
static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pSDataBlock) {
SArithmeticSupport arithSup = {0};
SQuery *pQuery = pRuntimeEnv->pQuery;
// create the output result buffer
tFilePage **data = calloc(pQuery->numOfExpr2, POINTER_BYTES);
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
int32_t bytes = pQuery->pExpr2[i].bytes;
data[i] = (tFilePage *)malloc((size_t)(bytes * pQuery->rec.rows) + sizeof(tFilePage));
}
arithSup.numOfCols = (int32_t)pSDataBlock->info.numOfCols;
arithSup.exprList = pQuery->pExpr1;
arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES);
// set the input column data
for (int32_t f = 0; f < arithSup.numOfCols; ++f) {
SColumnInfoData *pColumnInfoData = taosArrayGet(pSDataBlock->pDataBlock, f);
arithSup.data[f] = pColumnInfoData->pData;
}
// output result number of columns
for (int32_t k = 0; k < pRuntimeEnv->outputBuf->info.numOfCols; ++k) {
for (int i = 0; i < pQuery->numOfExpr2; ++i) {
SExprInfo *pExpr = &pQuery->pExpr2[i];
// calculate the result from several other columns
SSqlFuncMsg *pSqlFunc = &pExpr->base;
if (pSqlFunc->functionId != TSDB_FUNC_ARITHM) {
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
if (pSqlFunc->functionId == pQuery->pExpr1[j].base.functionId &&
pSqlFunc->colInfo.colId == pQuery->pExpr1[j].base.colInfo.colId) {
memcpy(data[i]->data, pQuery->sdata[j]->data, (size_t)(pQuery->pExpr1[j].bytes * pSDataBlock->info.rows));
break;
}
}
} else {
arithSup.pArithExpr = pExpr;
arithmeticTreeTraverse(arithSup.pArithExpr->pExpr, (int32_t)pQuery->rec.rows, data[i]->data, &arithSup,
TSDB_ORDER_ASC, getArithemicInputSrc);
}
}
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
memcpy(pQuery->sdata[i]->data, data[i]->data, (size_t)(pQuery->pExpr2[i].bytes * pQuery->rec.rows));
}
for (int32_t i = 0; i < pQuery->numOfExpr2; ++i) {
tfree(data[i]);
}
tfree(data);
tfree(arithSup.data);
}
}
/**
* todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions
* @param pRuntimeEnv
......@@ -5890,17 +5947,16 @@ static UNUSED_FUNC SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle
pOptr->name = "SeqScanTableOp";
pOptr->blockingOptr = false;
pOptr->optInfo = pInfo;
pOptr->exec = doScanTable;
pOptr->exec = doTableScan;
return pOptr;
}
static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->exec = doTableScan;
pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime;
......@@ -5908,7 +5964,14 @@ static STableScanInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQ
pInfo->order = pRuntimeEnv->pQuery->order.order;
pInfo->pRuntimeEnv = pRuntimeEnv;
return pInfo;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "BidirectionSeqScanTableOp";
pOptr->blockingOptr = false;
pOptr->optInfo = pInfo;
pOptr->exec = doTableScan;
return pOptr;
}
static UNUSED_FUNC int32_t getTableScanId(STableScanInfo* pTableScanInfo) {
......@@ -5920,34 +5983,53 @@ static UNUSED_FUNC int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
}
// this is a blocking operator
static SSDataBlock* doAggOperator(void* param) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
SQueryRuntimeEnv* pRuntimeEnv = pInfo->pRuntimeEnv;
STableScanInfo* pTableScanInfo = pInfo->pTableScanInfo;
static SSDataBlock* doAggregation(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
int32_t countId = 0;
int32_t order = getTableScanOrder(pInfo->pTableScanInfo);
SAggOperatorInfo* pAggInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pAggInfo->pRuntimeEnv;
resetDefaultResInfoOutputBuf_rv(pInfo->pRuntimeEnv);
SOperatorInfo* upstream = pOperator->upstream;
resetDefaultResInfoOutputBuf_rv(pRuntimeEnv);
pRuntimeEnv->pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = pTableScanInfo->exec(pTableScanInfo);
SSDataBlock* pBlock = upstream->exec(upstream->optInfo);
if (pBlock == NULL) {
break;
}
if (countId != getTableScanId(pTableScanInfo) && order == getTableScanOrder(pTableScanInfo)) {
prepareRepeatTableScan(pRuntimeEnv);
countId = getTableScanId(pTableScanInfo);
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pRuntimeEnv, pBlock);
aggApplyFunctions(pRuntimeEnv, pBlock);
}
setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED);
finalizeQueryResult(pRuntimeEnv);
pRuntimeEnv->outputBuf->info.rows = getNumOfResult(pRuntimeEnv);
return pRuntimeEnv->outputBuf;
}
static SSDataBlock* doArithmeticOperation(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
SArithOperatorInfo* pArithInfo = pOperator->optInfo;
SQueryRuntimeEnv* pRuntimeEnv = pArithInfo->pRuntimeEnv;
// the order has changed
if (order != getTableScanOrder(pTableScanInfo)) {
order = getTableScanOrder(pTableScanInfo);
SOperatorInfo* upstream = pOperator->upstream;
resetDefaultResInfoOutputBuf_rv(pRuntimeEnv);
pRuntimeEnv->pQuery->pos = 0;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream->optInfo);
if (pBlock == NULL) {
break;
}
// the pDataBlock are alway the same one, no need to call this again
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pRuntimeEnv, pBlock);
aggApplyFunctions(pRuntimeEnv, pBlock);
}
......@@ -5971,29 +6053,43 @@ static int32_t getNumOfScanTimes(SQuery* pQuery) {
return 1;
}
//void createBasicOperatorInfo() {
// optrList[0].name = "SeqScanTableOp";
// optrList[0].blockingOptr = false;
// optrList[0].exec = doTableScan;
//
// optrList[0].name = "SeqScanTableOp";
// optrList[0].blockingOptr = false;
// optrList[0].exec = doTableScan;
//}
static UNUSED_FUNC SAggOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo) {
static UNUSED_FUNC SOperatorInfo* createAggOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
pInfo->pResultRowInfo = pResultRowInfo;
pInfo->pTableQueryInfo = pTableQueryInfo;
pInfo->pTableScanInfo = pTableScanInfo;
pInfo->pRuntimeEnv = pRuntimeEnv;
pInfo->apply = doAggOperator;
return pInfo;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "AggregationOp";
pOperator->blockingOptr = true;
pOperator->optInfo = pInfo;
pOperator->upstream = inputOptr;
pOperator->exec = doAggregation;
return pOperator;
}
static UNUSED_FUNC SOperatorInfo* createArithOperatorInfo(SResultRowInfo* pResultRowInfo, STableQueryInfo* pTableQueryInfo,
SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* inputOptr) {
SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo));
pInfo->pTableQueryInfo = pTableQueryInfo;
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ArithmeticOp";
pOperator->blockingOptr = false;
pOperator->optInfo = pInfo;
pOperator->upstream = inputOptr;
pOperator->exec = doArithmeticOperation;
return pOperator;
}
static
/*
* in each query, this function will be called only once, no retry for further result.
*
......@@ -6008,10 +6104,9 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
return;
}
SAggOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
SSDataBlock* pResBlock = pAggInfo->apply(pAggInfo);
SOperatorInfo* pAggInfo = createAggOperatorInfo(&pRuntimeEnv->resultRowInfo, pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
SSDataBlock* pResBlock = pAggInfo->exec(pAggInfo->optInfo);
// since the numOfRows must be identical for all functions that are allowed to be executed simutaneously.
pQuery->rec.rows = pResBlock->info.rows;
// doSecondaryArithmeticProcess(pQuery);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册