提交 ea0b5b9b 编写于 作者: H Haojun Liao

[td-4312]

上级 5262f927
...@@ -646,9 +646,10 @@ static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) { ...@@ -646,9 +646,10 @@ static SColumnInfo* extractColumnInfoFromResult(SArray* pTableCols) {
} }
typedef struct SDummyInputInfo { typedef struct SDummyInputInfo {
SSDataBlock *block; SSDataBlock *block;
SSqlObj *pSql; // refactor: remove it STableQueryInfo *pTableQueryInfo;
int32_t numOfFilterCols; SSqlObj *pSql; // refactor: remove it
int32_t numOfFilterCols;
SSingleColumnFilterInfo *pFilterInfo; SSingleColumnFilterInfo *pFilterInfo;
} SDummyInputInfo; } SDummyInputInfo;
...@@ -665,7 +666,7 @@ typedef struct SJoinOperatorInfo { ...@@ -665,7 +666,7 @@ typedef struct SJoinOperatorInfo {
SRspResultInfo resultInfo; // todo refactor, add this info for each operator SRspResultInfo resultInfo; // todo refactor, add this info for each operator
} SJoinOperatorInfo; } SJoinOperatorInfo;
static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock) { static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
int32_t offset = 0; int32_t offset = 0;
char* pData = pRes->data; char* pData = pRes->data;
...@@ -680,6 +681,18 @@ static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock) { ...@@ -680,6 +681,18 @@ static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock) {
offset += pColData->info.bytes; offset += pColData->info.bytes;
} }
// filter data if needed
if (numOfFilterCols > 0) {
doSetFilterColumnInfo(pFilterInfo, numOfFilterCols, pBlock);
int8_t* p = calloc(pBlock->info.rows, sizeof(int8_t));
bool all = doFilterDataBlock(pFilterInfo, numOfFilterCols, pBlock->info.rows, p);
if (!all) {
doCompactSDataBlock(pBlock, pBlock->info.rows, p);
}
tfree(p);
}
// todo refactor: extract method // todo refactor: extract method
// set the timestamp range of current result data block // set the timestamp range of current result data block
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, 0);
...@@ -703,22 +716,11 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { ...@@ -703,22 +716,11 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SSDataBlock* pBlock = pInput->block; SSDataBlock* pBlock = pInput->block;
pOperator->pRuntimeEnv->current = pInput->pTableQueryInfo;
pBlock->info.rows = pRes->numOfRows; pBlock->info.rows = pRes->numOfRows;
if (pRes->numOfRows != 0) { if (pRes->numOfRows != 0) {
doSetupSDataBlock(pRes, pBlock); doSetupSDataBlock(pRes, pBlock, pInput->pFilterInfo, pInput->numOfFilterCols);
if (pInput->numOfFilterCols > 0) {
doSetFilterColumnInfo(pInput->pFilterInfo, pInput->numOfFilterCols, pBlock);
int8_t* p = calloc(pBlock->info.rows, sizeof(int8_t));
bool all = doFilterDataBlock(pInput->pFilterInfo, pInput->numOfFilterCols, pBlock->info.rows, p);
if (!all) {
doCompactSDataBlock(pBlock, pBlock->info.rows, p);
}
tfree(p);
}
*newgroup = false; *newgroup = false;
return pBlock; return pBlock;
} }
...@@ -733,7 +735,7 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { ...@@ -733,7 +735,7 @@ SSDataBlock* doGetDataBlock(void* param, bool* newgroup) {
} }
pBlock->info.rows = pRes->numOfRows; pBlock->info.rows = pRes->numOfRows;
doSetupSDataBlock(pRes, pBlock); doSetupSDataBlock(pRes, pBlock, pInput->pFilterInfo, pInput->numOfFilterCols);
*newgroup = false; *newgroup = false;
return pBlock; return pBlock;
} }
...@@ -890,11 +892,14 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) { ...@@ -890,11 +892,14 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) {
// todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later // todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later
SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t numOfCols, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) { SOperatorInfo* createDummyInputOperator(SSqlObj* pSql, SSchema* pSchema, int32_t numOfCols, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
assert(numOfCols > 0); assert(numOfCols > 0);
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo)); SDummyInputInfo* pInfo = calloc(1, sizeof(SDummyInputInfo));
pInfo->pSql = pSql; pInfo->pSql = pSql;
pInfo->pFilterInfo = pFilterInfo; pInfo->pFilterInfo = pFilterInfo;
pInfo->numOfFilterCols = numOfFilterCols; pInfo->numOfFilterCols = numOfFilterCols;
pInfo->pTableQueryInfo = createTmpTableQueryInfo(win);
pInfo->block = calloc(numOfCols, sizeof(SSDataBlock)); pInfo->block = calloc(numOfCols, sizeof(SSDataBlock));
pInfo->block->info.numOfCols = numOfCols; pInfo->block->info.numOfCols = numOfCols;
...@@ -980,12 +985,32 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -980,12 +985,32 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
pRes->completed = (pRes->numOfRows == 0); pRes->completed = (pRes->numOfRows == 0);
} }
static void createInputDataFilterInfo(SQueryInfo* px, int32_t numOfCol1, int32_t* numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo) {
SColumnInfo* tableCols = calloc(numOfCol1, sizeof(SColumnInfo));
for(int32_t i = 0; i < numOfCol1; ++i) {
SColumn* pCol = taosArrayGetP(px->colList, i);
if (pCol->info.flist.numOfFilters > 0) {
(*numOfFilterCols) += 1;
}
tableCols[i] = pCol->info;
}
if ((*numOfFilterCols) > 0) {
doCreateFilterInfo(tableCols, numOfCol1, (*numOfFilterCols), pFilterInfo, 0);
}
tfree(tableCols);
}
void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) { void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) {
// handle the following query process // handle the following query process
if (px->pQInfo == NULL) { if (px->pQInfo == NULL) {
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList); SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta); STableMeta* pTableMeta = tscGetMetaInfo(px, 0)->pTableMeta;
SSchema* pSchema = tscGetTableSchema(pTableMeta);
STableGroupInfo tableGroupInfo = { STableGroupInfo tableGroupInfo = {
.numOfTables = 1, .numOfTables = 1,
.pGroupList = taosArrayInit(1, POINTER_BYTES), .pGroupList = taosArrayInit(1, POINTER_BYTES),
...@@ -1001,23 +1026,11 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue ...@@ -1001,23 +1026,11 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
taosArrayPush(tableGroupInfo.pGroupList, &group); taosArrayPush(tableGroupInfo.pGroupList, &group);
// if it is a join query, create join operator here // if it is a join query, create join operator here
int32_t numOfCol1 = px->pTableMetaInfo[0]->pTableMeta->tableInfo.numOfColumns; int32_t numOfCol1 = pTableMeta->tableInfo.numOfColumns;
int32_t numOfFilterCols = 0; int32_t numOfFilterCols = 0;
SColumnInfo* tableCols = calloc(numOfCol1, sizeof(SColumnInfo));
for(int32_t i = 0; i < numOfCol1; ++i) {
SColumn* pCol = taosArrayGetP(px->colList, i);
if (pCol->info.flist.numOfFilters > 0) {
numOfFilterCols += 1;
}
tableCols[i] = pCol->info;
}
SSingleColumnFilterInfo* pFilterInfo = NULL; SSingleColumnFilterInfo* pFilterInfo = NULL;
if (numOfFilterCols > 0) { createInputDataFilterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo);
doCreateFilterInfo(tableCols, numOfCol1, numOfFilterCols, &pFilterInfo, 0);
}
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilterInfo, numOfFilterCols); SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilterInfo, numOfFilterCols);
...@@ -1033,24 +1046,14 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue ...@@ -1033,24 +1046,14 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
int32_t offset = pSourceOperator->numOfOutput; int32_t offset = pSourceOperator->numOfOutput;
for(int32_t i = 1; i < px->numOfTables; ++i) { for(int32_t i = 1; i < px->numOfTables; ++i) {
SSchema* pSchema1 = tscGetTableSchema(px->pTableMetaInfo[i]->pTableMeta); STableMeta* pTableMeta1 = tscGetMetaInfo(px, i)->pTableMeta;
int32_t n = px->pTableMetaInfo[i]->pTableMeta->tableInfo.numOfColumns;
int32_t numOfFilterCols1 = 0;
SColumnInfo* tableCols1 = calloc(numOfCol1, sizeof(SColumnInfo));
for(int32_t j = 0; j < numOfCol1; ++j) {
SColumn* pCol = taosArrayGetP(px->colList, j);
if (pCol->info.flist.numOfFilters > 0) {
numOfFilterCols += 1;
}
tableCols1[j] = pCol->info; SSchema* pSchema1 = tscGetTableSchema(pTableMeta1);
} int32_t n = pTableMeta1->tableInfo.numOfColumns;
int32_t numOfFilterCols1 = 0;
SSingleColumnFilterInfo* pFilterInfo1 = NULL; SSingleColumnFilterInfo* pFilterInfo1 = NULL;
if (numOfFilterCols1 > 0) { createInputDataFilterInfo(px, numOfCol1, &numOfFilterCols1, &pFilterInfo1);
doCreateFilterInfo(tableCols1, numOfCol1, numOfFilterCols1, &pFilterInfo1, 0);
}
p[i] = createDummyInputOperator(pSqlObjList[i], pSchema1, n, pFilterInfo1, numOfFilterCols1); p[i] = createDummyInputOperator(pSqlObjList[i], pSchema1, n, pFilterInfo1, numOfFilterCols1);
memcpy(&schema[offset], pSchema1, n * sizeof(SSchema)); memcpy(&schema[offset], pSchema1, n * sizeof(SSchema));
...@@ -1068,6 +1071,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue ...@@ -1068,6 +1071,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN); px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
tfree(pColumnInfo); tfree(pColumnInfo);
tfree(schema); tfree(schema);
// set the pRuntimeEnv for pSourceOperator
pSourceOperator->pRuntimeEnv = &px->pQInfo->runtimeEnv;
} }
uint64_t qId = 0; uint64_t qId = 0;
......
...@@ -552,6 +552,8 @@ int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId); ...@@ -552,6 +552,8 @@ int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf); STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf);
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg); int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg);
bool isQueryKilled(SQInfo *pQInfo); bool isQueryKilled(SQInfo *pQInfo);
......
...@@ -3237,6 +3237,25 @@ STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool ...@@ -3237,6 +3237,25 @@ STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool
return pTableQueryInfo; return pTableQueryInfo;
} }
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win) {
STableQueryInfo* pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
pTableQueryInfo->win = win;
pTableQueryInfo->lastKey = win.skey;
pTableQueryInfo->pTable = NULL;
pTableQueryInfo->cur.vgroupIndex = -1;
// set more initial size of interval/groupby query
int32_t initialSize = 16;
int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize, TSDB_DATA_TYPE_INT);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
return pTableQueryInfo;
}
void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
if (pTableQueryInfo == NULL) { if (pTableQueryInfo == NULL) {
return; return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册