提交 7170a716 编写于 作者: H Haojun Liao

[td-2895] refactor query processing model.

上级 b0cabda1
......@@ -364,6 +364,12 @@ typedef struct STableScanInfo {
int64_t elapsedTime;
} STableScanInfo;
typedef struct STagScanInfo {
SQueryRuntimeEnv *pRuntimeEnv;
SColumnInfo* pCols;
SSDataBlock* pRes;
} STagScanInfo;
typedef struct SAggOperatorInfo {
SResultRowInfo resultRowInfo;
STableQueryInfo *pTableQueryInfo;
......
......@@ -1841,7 +1841,7 @@ static void first_dist_function(SQLFunctionCtx *pCtx) {
* 1. data block that are not loaded
* 2. scan data files in desc order
*/
if (pCtx->order == TSDB_ORDER_DESC || pCtx->preAggVals.dataBlockLoaded == false) {
if (pCtx->order == TSDB_ORDER_DESC/* || pCtx->preAggVals.dataBlockLoaded == false*/) {
return;
}
......
......@@ -152,6 +152,7 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
}
tw->ekey -= 1;
}
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult);
static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols);
......@@ -163,7 +164,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData,
SDataStatis *pStatis, SExprInfo* pExprInfo);
static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex);
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx);
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx);
static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo);
static void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
static bool hasMainOutput(SQuery *pQuery);
......@@ -191,6 +192,7 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ
static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv);
static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
......@@ -215,7 +217,8 @@ static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) {
idata.info.type = pExpr[i].type;
idata.info.bytes = pExpr[i].bytes;
idata.info.colId = pExpr[i].base.resColId;
idata.pData = calloc(4096, idata.info.bytes * 4096);
idata.pData = calloc(4096, idata.info.bytes);
taosArrayPush(res->pDataBlock, &idata);
}
......@@ -2452,6 +2455,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv));
// group by normal column, sliding window query, interval query are handled by interval query processor
// if (!pQuery->stableQuery) { // interval (down sampling operation)
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
......@@ -2487,7 +2492,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkResultBuf == 1);
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
if (!onlyQueryTags(pQuery)) {
pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi);
setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot);
}
}
if (pQuery->limit.offset > 0) {
......@@ -3230,10 +3238,24 @@ void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilte
}
start += len;
len = 0;
}
}
}
if (len > 0) {
int32_t cstart = numOfRows - len;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColumnInfoData->info.bytes;
memmove(pColumnInfoData->pData + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes);
}
start += len;
len = 0;
}
pBlock->info.rows = start;
pBlock->pBlockStatis = NULL; // clean the block statistics info
......@@ -4068,6 +4090,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SR
pCtx[i].resultInfo = pCellInfo;
pCtx[i].pOutput = pData->pData;
assert(pCtx[i].pOutput != NULL);
// set the timestamp output buffer for top/bottom/diff query
int32_t functionId = pCtx->functionId;
......@@ -5668,7 +5691,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
if (needReverseScan(pQuery)) {
if (onlyQueryTags(pQuery)) {
pRuntimeEnv->proot = createTagScanOperator(pRuntimeEnv);
} else if (needReverseScan(pQuery)) {
pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1);
} else {
pRuntimeEnv->pi = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery));
......@@ -6628,7 +6653,8 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
// todo check for query cancel
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
if (pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables > 1) {
if (pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables > 1 ||
(pQuery->current == NULL && pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1)) {
STableQueryInfo **pTableQueryInfo = (STableQueryInfo **)taosHashGet(
pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.map, &pBlock->info.tid, sizeof(pBlock->info.tid));
if (pTableQueryInfo == NULL) {
......@@ -6637,6 +6663,8 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
pQuery->current = *pTableQueryInfo;
doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo);
} else if (pTableScanInfo->pRuntimeEnv->pQuery->current == NULL) {
}
// this function never returns error?
......@@ -6765,6 +6793,11 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} else if (strcasecmp(name, "STableIntervalAggOp") == 0) {
SHashIntervalOperatorInfo *pInfo = pDownstream->optInfo;
pTableScanInfo->pCtx = pInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
} else if (strcasecmp(name, "ArithmeticOp") == 0) {
SArithOperatorInfo *pInfo = pDownstream->optInfo;
pTableScanInfo->pCtx = pInfo->pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
} else {
......@@ -6815,10 +6848,8 @@ static SSDataBlock* doAggregation(void* param) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, order, pQuery->vgId);
SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
setDefaultOutputBuf(pRuntimeEnv, pCtx, &pRuntimeEnv->resultRowInfo, pRes);
setDefaultOutputBuf(pRuntimeEnv, pAggInfo->pCtx, &pRuntimeEnv->resultRowInfo, pRes);
SOperatorInfo* upstream = pOperator->upstream;
pQuery->pos = 0;
......@@ -6835,8 +6866,8 @@ static SSDataBlock* doAggregation(void* param) {
}
// the pDataBlock are always the same one, no need to call this again
setInputSDataBlock(pOperator, pCtx, pBlock, order);
aggApplyFunctions(pRuntimeEnv, pOperator, pCtx, pBlock);
setInputSDataBlock(pOperator, pAggInfo->pCtx, pBlock, order);
aggApplyFunctions(pRuntimeEnv, pOperator, pAggInfo->pCtx, pBlock);
}
pOperator->completed = true;
......@@ -6846,8 +6877,8 @@ static SSDataBlock* doAggregation(void* param) {
finalizeQueryResult(pRuntimeEnv);
}
pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pCtx, pOperator->numOfOutput);
destroySQLFunctionCtx(pCtx, pRes->info.numOfCols);
pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput);
destroySQLFunctionCtx(pAggInfo->pCtx, pRes->info.numOfCols);
return pRes;
}
......@@ -7369,7 +7400,156 @@ static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, S
return pOperator;
}
static SSDataBlock* doTagScan(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
STagScanInfo *pTagScanInfo = pOperator->optInfo;
SQueryRuntimeEnv *pRuntimeEnv = pTagScanInfo->pRuntimeEnv;
SQuery* pQuery = pRuntimeEnv->pQuery;
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
assert(numOfGroup == 0 || numOfGroup == 1);
if (numOfGroup == 0) {
return NULL;
}
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
size_t num = taosArrayGetSize(pa);
assert(num == pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
if (pTagScanInfo->pRes == NULL) {
pTagScanInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput);
}
int32_t count = 0;
// int32_t functionId = pOperator->pExpr[0].base.functionId;
/*if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
assert(pQuery->numOfOutput == 1);
SExprInfo* pExprInfo = &pOperator->pExpr[0];
int32_t rsize = pExprInfo->bytes;
count = 0;
int16_t bytes = pExprInfo->bytes;
int16_t type = pExprInfo->type;
for(int32_t i = 0; i < pQuery->numOfTags; ++i) {
if (pQuery->tagColList[i].colId == pExprInfo->base.colInfo.colId) {
bytes = pQuery->tagColList[i].bytes;
type = pQuery->tagColList[i].type;
break;
}
}
while(pRuntimeEnv->tableIndex < num && count < pQuery->rec.capacity) {
int32_t i = pRuntimeEnv->tableIndex++;
STableQueryInfo *item = taosArrayGetP(pa, i);
char *output = pQuery->sdata[0]->data + count * rsize;
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
output = varDataVal(output);
STableId* id = TSDB_TABLEID(item->pTable);
*(int16_t *)output = 0;
output += sizeof(int16_t);
*(int64_t *)output = id->uid; // memory align problem, todo serialize
output += sizeof(id->uid);
*(int32_t *)output = id->tid;
output += sizeof(id->tid);
*(int32_t *)output = pQuery->vgId;
output += sizeof(pQuery->vgId);
if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
char* data = tsdbGetTableName(item->pTable);
memcpy(output, data, varDataTLen(data));
} else {
char* data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes);
doSetTagValueToResultBuf(output, data, type, bytes);
}
count += 1;
}
qDebug("QInfo:%p create (tableId, tag) info completed, rows:%d", pRuntimeEnv->qinfo, count);
} else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query
*(int64_t*) pQuery->sdata[0]->data = num;
count = 1;
SET_STABLE_QUERY_OVER(pRuntimeEnv);
qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pRuntimeEnv->qinfo, count);
} else*/ { // return only the tags|table name etc.
count = 0;
SSchema* tbnameSchema = tGetTbnameColumnSchema();
int32_t maxNumOfTables = (int32_t)pQuery->rec.capacity;
// if (pQuery->limit.limit >= 0 && pQuery->limit.limit < pQuery->rec.capacity) {
// maxNumOfTables = (int32_t)pQuery->limit.limit;
// }
while(pRuntimeEnv->tableIndex < num && count < maxNumOfTables) {
int32_t i = pRuntimeEnv->tableIndex++;
SExprInfo* pExprInfo = pOperator->pExpr;
STableQueryInfo* item = taosArrayGetP(pa, i);
char *data = NULL, *dst = NULL;
int16_t type = 0, bytes = 0;
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
// not assign value in case of user defined constant output column
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.colInfo.flag)) {
continue;
}
SColumnInfoData* pColInfo = taosArrayGet(pTagScanInfo->pRes->pDataBlock, j);
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
bytes = tbnameSchema->bytes;
type = tbnameSchema->type;
data = tsdbGetTableName(item->pTable);
dst = pColInfo->pData + count * tbnameSchema->bytes;
} else {
type = pExprInfo[j].type;
bytes = pExprInfo[j].bytes;
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes);
dst = pColInfo->pData + count * pExprInfo[j].bytes;
}
doSetTagValueToResultBuf(dst, data, type, bytes);
}
count += 1;
}
pTagScanInfo->pRes->info.rows = count;
qDebug("QInfo:%p create tag values results completed, rows:%d", pRuntimeEnv->qinfo, count);
}
return pTagScanInfo->pRes;
}
static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv) {
STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo));
pInfo->pRuntimeEnv = pRuntimeEnv;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqTagScanOp";
pOperator->blockingOptr = false;
pOperator->completed = false;
pOperator->optInfo = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput;
pOperator->exec = doTagScan;
pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1;
return pOperator;
}
/*
* in each query, this function will be called only once, no retry for further result.
......@@ -8909,6 +9089,10 @@ void buildTagQueryResult(SQInfo* pQInfo) {
return;
}
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows;
return;
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
size_t num = taosArrayGetSize(pa);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册