提交 d2e78ac7 编写于 作者: H Haojun Liao

[td-2819] refactor codes.

上级 a36099d8
......@@ -302,9 +302,8 @@ int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
uint64_t* qId, char* sql, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
......
......@@ -57,7 +57,7 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
}
// todo merge with vnode side function
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchemaEx* pSchema) {
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
......@@ -69,16 +69,14 @@ void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchem
pCtx[i].order = pQueryInfo->order.order;
pCtx[i].functionId = pExpr->base.functionId;
// input buffer hold only one point data
SSchema *s = &pSchema[i].field;
// input data format comes from pModel
pCtx[i].inputType = s->type;
pCtx[i].inputBytes = s->bytes;
pCtx[i].inputType = pSchema[i].type;
pCtx[i].inputBytes = pSchema[i].bytes;
pCtx[i].outputBytes = pExpr->base.resBytes;
pCtx[i].outputType = pExpr->base.resType;
// input buffer hold only one point data
pCtx[i].size = 1;
pCtx[i].hasNull = true;
pCtx[i].currentStage = MERGE_STAGE;
......@@ -374,7 +372,13 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pReducer->pTempBuffer->num = 0;
tscCreateResPointerInfo(pRes, pQueryInfo);
tsCreateSQLFunctionCtx(pQueryInfo, pReducer->pCtx, pDesc->pColumnModel->pFields);
SSchema* pschema = calloc(pDesc->pColumnModel->numOfCols, sizeof(SSchema));
for(int32_t i = 0; i < pDesc->pColumnModel->numOfCols; ++i) {
pschema[i] = pDesc->pColumnModel->pFields[i].field;
}
tsCreateSQLFunctionCtx(pQueryInfo, pReducer->pCtx, pschema);
setCtxInputOutputBuffer(pQueryInfo, pReducer->pCtx, pReducer, pDesc);
// we change the capacity of schema to denote that there is only one row in temp buffer
......
......@@ -24,6 +24,7 @@
#include "tschemautil.h"
#include "tsclient.h"
#include "qUtil.h"
#include "qPlan.h"
typedef struct SInsertSupporter {
SSqlObj* pSql;
......@@ -3216,8 +3217,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
return hasData;
}
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
uint64_t* qId, char* sql, void* addr) {
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr) {
assert(pQueryInfo != NULL);
int16_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
......@@ -3230,9 +3230,13 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
pQInfo->signature = pQInfo;
SQueryAttr *pQueryAttr = &pQInfo->query;
pQInfo->runtimeEnv.pQueryAttr = pQueryAttr;
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
pRuntimeEnv->pQueryAttr = pQueryAttr;
tscCreateQueryFromQueryInfo(pQueryInfo, pQueryAttr, addr);
pQueryAttr->tableGroupInfo = *pTableGroupInfo;
// calculate the result row size
for (int16_t col = 0; col < numOfOutput; ++col) {
assert(pExprs[col].base.resBytes > 0);
......@@ -3244,12 +3248,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
}
}
// doUpdateExprColumnIndex(pQueryAttr);
// int32_t ret = createFilterInfo(pQInfo, pQueryAttr);
// if (ret != TSDB_CODE_SUCCESS) {
// goto _cleanup;
// }
size_t numOfGroups = 0;
if (pTableGroupInfo->pGroupList != NULL) {
numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList);
......@@ -3272,12 +3270,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
pthread_mutex_init(&pQInfo->lock, NULL);
tsem_init(&pQInfo->ready, 0, 0);
// changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STimeWindow window = pQueryAttr->window;
int32_t index = 0;
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* pa = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i);
......@@ -3290,6 +3282,7 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
taosArrayPush(pRuntimeEnv->tableqinfoGroupInfo.pGroupList, &p1);
STimeWindow window = pQueryAttr->window;
for(int32_t j = 0; j < s; ++j) {
STableKeyInfo* info = taosArrayGet(pa, j);
window.skey = info->lastKey;
......@@ -3309,13 +3302,6 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
}
}
// colIdCheck(pQueryAttr, pQInfo);
pQInfo->qId = 0;
if (qId != NULL) {
*qId = pQInfo->qId;
}
// qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
// return pQInfo;
// if (pGroupbyExpr != NULL) {
......@@ -3334,9 +3320,13 @@ void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, ST
tfree(pExprs);
SArray* pa = createExecOperatorPlan(pQueryAttr);
STsBufInfo bufInfo = {0};
SQueryParam param = {0};
SQueryParam param = {.pOperator = pa};
/*int32_t code = */initQInfo(&bufInfo, NULL, pQInfo, &param, NULL, 0);
pQInfo->runtimeEnv.proot->upstream = pOperator;
qTableQuery(pQInfo);
return pQInfo;
......
......@@ -539,6 +539,62 @@ static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray*
return pColInfo;
}
typedef struct SDummyInputInfo {
SSDataBlock block;
SSqlRes *pRes; // refactor: remove it
} SDummyInputInfo;
SSDataBlock* doGetDataBlock(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo*) param;
SDummyInputInfo *pInput = pOperator->info;
char* pData = pInput->pRes->data;
SSDataBlock* pBlock = &pInput->block;
pBlock->info.rows = pInput->pRes->numOfRows;
if (pBlock->info.rows == 0) {
return NULL;
}
int32_t offset = 0;
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
pColData->pData = pData + offset * pBlock->info.rows;
offset += pColData->info.bytes;
}
pInput->pRes->numOfRows = 0;
return pBlock;
}
SOperatorInfo* createDummyInputOperator(char* pResult, SSchema* pSchema, int32_t numOfCols) {
assert(numOfCols > 0);
SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo));
pInfo->pRes = (SSqlRes*) pResult;
pInfo->block.info.numOfCols = numOfCols;
pInfo->block.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colData = {0};
colData.info.bytes = pSchema[i].bytes;
colData.info.type = pSchema[i].type;
colData.info.colId = pSchema[i].colId;
taosArrayPush(pInfo->block.pDataBlock, &colData);
}
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DummyInputOperator";
pOptr->operatorType = OP_DummyInput;
pOptr->blockingOptr = false;
pOptr->info = pInfo;
pOptr->exec = doGetDataBlock;
return pOptr;
}
void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) {
// handle the following query process
......@@ -553,22 +609,29 @@ void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
int32_t numOfCols = taosArrayGetSize(px->colList);
SQueriedTableInfo info = {.colList = colInfo, .numOfCols = numOfCols,};
/*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
tsCreateSQLFunctionCtx(px, pCtx, NULL);
STableGroupInfo tableGroupInfo = {0};
tableGroupInfo.numOfTables = 1;
tableGroupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES);
/*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta);
tsCreateSQLFunctionCtx(px, pCtx, pSchema);
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),};
tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN};
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(group, &tableKeyInfo);
taosArrayPush(tableGroupInfo.pGroupList, &group);
SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, 0, NULL, NULL);
printf("%p\n", pQInfo);
SOperatorInfo* pSourceOptr = createDummyInputOperator((char*)pRes, pSchema, numOfOutput);
SQInfo* pQInfo = createQueryInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOptr, NULL, NULL);
//printf("%p\n", pQInfo);
SSDataBlock* pres = pQInfo->runtimeEnv.outputBuf;
// build result
pRes->numOfRows = pres->info.rows;
}
}
......@@ -3121,6 +3184,10 @@ static int32_t createSecondaryExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf
}
static int32_t createTagColumnInfo(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo) {
if (pTableMetaInfo->tagColList == NULL) {
return TSDB_CODE_SUCCESS;
}
pQueryAttr->numOfTags = (int32_t)taosArrayGetSize(pTableMetaInfo->tagColList);
if (pQueryAttr->numOfTags == 0) {
return TSDB_CODE_SUCCESS;
......
......@@ -289,6 +289,7 @@ enum OPERATOR_TYPE_E {
OP_Fill = 13,
OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
OP_DummyInput = 16, //TODO remove it after fully refactor.
};
typedef struct SOperatorInfo {
......
......@@ -35,13 +35,6 @@
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
#define CHECK_IF_QUERY_KILLED(_q) \
do { \
if (isQueryKilled((_q)->qinfo)) { \
longjmp((_q)->env, TSDB_CODE_TSC_QUERY_CANCELLED); \
} \
} while (0)
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
#define TIME_WINDOW_COPY(_dst, _src) do {\
......@@ -98,7 +91,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
uint64_t queryHandleId = 0;
int32_t getMaximumIdleDurationSec() {
......@@ -143,8 +135,8 @@ static void getNextTimeWindow(SQueryAttr* pQueryAttr, STimeWindow* tw) {
}
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset);
static void setResultOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SQLFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset);
void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
......@@ -163,7 +155,6 @@ static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream);
static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr);
//static bool isFixedOutputQuery(SQueryAttr* pQueryAttr);
static SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
static SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
......@@ -1732,7 +1723,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
SOperatorInfo* prev = pRuntimeEnv->pTableScanner;
if (i == 0) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pRuntimeEnv->pTableScanner != NULL) { // TODO refactor
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
}
} else {
prev = pRuntimeEnv->proot;
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
......@@ -3949,7 +3942,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
pQueryAttr->tsdb = tsdb;
pQueryAttr->tsdb = tsdb;
pRuntimeEnv->prevResult = prevResult;
pRuntimeEnv->qinfo = pQInfo;
......@@ -3971,20 +3964,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv->cur.vgroupIndex = -1;
setResultBufSize(pQueryAttr, &pRuntimeEnv->resultInfo);
/*
if (onlyQueryTags(pQueryAttr)) {
// pRuntimeEnv->resultInfo.capacity = 4096;
// pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
} else if (pQueryAttr->queryBlockDist) {
pRuntimeEnv->pTableScanner = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
} else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) {
pRuntimeEnv->pTableScanner = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
} else if (needReverseScan(pQueryAttr)) {
pRuntimeEnv->pTableScanner = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), 1);
} else {
pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr));
}
*/
switch(tbScanner) {
case OP_TableBlockInfoScan: {
pRuntimeEnv->pTableScanner = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv);
......@@ -4002,8 +3981,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pRuntimeEnv->pTableScanner = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr));
break;
}
default: {
// do nothing
default: { // do nothing
break;
}
}
......@@ -4531,14 +4509,19 @@ static SSDataBlock* doArithmeticOperation(void* param) {
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// todo dynamic set tags
setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) {
setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(pArithInfo, pBlock->info.rows);
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
if (pTableQueryInfo != NULL) { // TODO refactor
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
}
pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册