未验证 提交 dcbe4acb 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2497 from taosdata/feature/query

Feature/query
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "tarray.h" #include "tarray.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "tname.h" #include "tname.h"
#include "hash.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -168,8 +169,9 @@ typedef struct SDataBlockInfo { ...@@ -168,8 +169,9 @@ typedef struct SDataBlockInfo {
} SDataBlockInfo; } SDataBlockInfo;
typedef struct { typedef struct {
size_t numOfTables; size_t numOfTables;
SArray *pGroupList; SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo from STableId
} STableGroupInfo; } STableGroupInfo;
typedef struct SQueryRowCond { typedef struct SQueryRowCond {
......
...@@ -3291,16 +3291,17 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) ...@@ -3291,16 +3291,17 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
free(pTableQueryInfo); free(pTableQueryInfo);
} }
void setCurrentQueryTable(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { #define SET_CURRENT_QUERY_TABLE_INFO(_runtime, _tableInfo) \
SQuery *pQuery = pRuntimeEnv->pQuery; do { \
pQuery->current = pTableQueryInfo; SQuery *_query = (_runtime)->pQuery; \
_query->current = _tableInfo; \
assert(((pTableQueryInfo->lastKey >= pTableQueryInfo->win.skey) && QUERY_IS_ASC_QUERY(pQuery)) || assert((((_tableInfo)->lastKey >= (_tableInfo)->win.skey) && QUERY_IS_ASC_QUERY(_query)) || \
((pTableQueryInfo->lastKey <= pTableQueryInfo->win.skey) && !QUERY_IS_ASC_QUERY(pQuery))); (((_tableInfo)->lastKey <= (_tableInfo)->win.skey) && !QUERY_IS_ASC_QUERY(_query))); \
} } while (0)
/** /**
* set output buffer for different group * set output buffer for different group
* TODO opt performance if current group is identical to previous group
* @param pRuntimeEnv * @param pRuntimeEnv
* @param pDataBlockInfo * @param pDataBlockInfo
*/ */
...@@ -4161,33 +4162,13 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { ...@@ -4161,33 +4162,13 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
} }
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle); SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
STableQueryInfo *pTableQueryInfo = NULL; STableQueryInfo **pTableQueryInfo = (STableQueryInfo**) taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid));
if(pTableQueryInfo == NULL) {
// todo opt performance using hash table break;
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
for (int32_t i = 0; i < numOfGroup; ++i) {
SArray *group = GET_TABLEGROUP(pQInfo, i);
size_t num = taosArrayGetSize(group);
for (int32_t j = 0; j < num; ++j) {
STableQueryInfo *p = taosArrayGetP(group, j);
STableId id = tsdbGetTableId(p->pTable);
if (id.tid == blockInfo.tid) {
assert(id.uid == blockInfo.uid);
pTableQueryInfo = p;
break;
}
}
if (pTableQueryInfo != NULL) {
break;
}
} }
assert(pTableQueryInfo != NULL); assert(*pTableQueryInfo != NULL);
setCurrentQueryTable(pRuntimeEnv, pTableQueryInfo); SET_CURRENT_QUERY_TABLE_INFO(pRuntimeEnv, *pTableQueryInfo);
SDataStatis *pStatis = NULL; SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
...@@ -4195,11 +4176,12 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { ...@@ -4195,11 +4176,12 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
if (!isIntervalQuery(pQuery)) { if (!isIntervalQuery(pQuery)) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1; int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
setExecutionContext(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo->groupIndex, blockInfo.window.ekey + step); setExecutionContext(pQInfo, (*pTableQueryInfo)->pTable, (*pTableQueryInfo)->groupIndex,
blockInfo.window.ekey + step);
} else { // interval query } else { // interval query
TSKEY nextKey = blockInfo.window.skey; TSKEY nextKey = blockInfo.window.skey;
setIntervalQueryRange(pQInfo, nextKey); setIntervalQueryRange(pQInfo, nextKey);
/*int32_t ret = */setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo); /*int32_t ret = */setAdditionalInfo(pQInfo, (*pTableQueryInfo)->pTable, *pTableQueryInfo);
} }
} }
...@@ -5627,7 +5609,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5627,7 +5609,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables; pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables;
} pQInfo->tableqinfoGroupInfo.map = taosHashInit(pTableGroupInfo->numOfTables,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
}
int tableIndex = 0; int tableIndex = 0;
STimeWindow window = pQueryMsg->window; STimeWindow window = pQueryMsg->window;
...@@ -5655,6 +5639,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5655,6 +5639,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
item->groupIndex = i; item->groupIndex = i;
item->tableIndex = tableIndex++; item->tableIndex = tableIndex++;
taosArrayPush(p1, &item); taosArrayPush(p1, &item);
taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES);
} }
taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1); taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1);
...@@ -5797,7 +5782,7 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5797,7 +5782,7 @@ static void freeQInfo(SQInfo *pQInfo) {
} }
taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList); taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList);
taosHashCleanup(pQInfo->tableqinfoGroupInfo.map);
tsdbDestoryTableGroup(&pQInfo->tableGroupInfo); tsdbDestoryTableGroup(&pQInfo->tableGroupInfo);
taosArrayDestroy(pQInfo->arrTableIdInfo); taosArrayDestroy(pQInfo->arrTableIdInfo);
......
...@@ -209,8 +209,7 @@ int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { ...@@ -209,8 +209,7 @@ int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
return 0; return 0;
} }
return FILL_IS_ASC_FILL(pFillInfo) ? (pFillInfo->numOfRows - pFillInfo->rowIdx) return FILL_IS_ASC_FILL(pFillInfo) ? (pFillInfo->numOfRows - pFillInfo->rowIdx) : pFillInfo->rowIdx + 1;
: pFillInfo->rowIdx + 1;
} }
// todo: refactor // todo: refactor
......
...@@ -542,17 +542,12 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -542,17 +542,12 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SDataBlockInfo getTrueDataBlockInfo(STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { #define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \
SDataBlockInfo info = { ((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
.window = {.skey = pBlock->keyFirst, .ekey = pBlock->keyLast}, .numOfCols = (_block)->numOfCols, \
.numOfCols = pBlock->numOfCols, .rows = (_block)->numOfRows, \
.rows = pBlock->numOfRows, .tid = (_checkInfo)->tableId.tid, \
.tid = pCheckInfo->tableId.tid, .uid = (_checkInfo)->tableId.uid})
.uid = pCheckInfo->tableId.uid,
};
return info;
}
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) { static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
...@@ -626,7 +621,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -626,7 +621,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock); SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo); SDataRow row = getSDataRowInTableMem(pCheckInfo);
...@@ -946,7 +941,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, ...@@ -946,7 +941,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
SArray* sa) { SArray* sa) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = getTrueDataBlockInfo(pCheckInfo, pBlock); SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
initTableMemIterator(pQueryHandle, pCheckInfo); initTableMemIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
...@@ -1322,8 +1317,8 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1322,8 +1317,8 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0 assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0
sup.numOfTables = numOfQualTables; sup.numOfTables = numOfQualTables;
SLoserTreeInfo* pTree = NULL;
SLoserTreeInfo* pTree = NULL;
uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar); uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
cleanBlockOrderSupporter(&sup, numOfTables); cleanBlockOrderSupporter(&sup, numOfTables);
...@@ -1844,7 +1839,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { ...@@ -1844,7 +1839,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
if (pHandle->cur.mixBlock) { if (pHandle->cur.mixBlock) {
return pHandle->pColumns; return pHandle->pColumns;
} else { } else {
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfo->compBlock); SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
assert(pHandle->realNumOfRows <= binfo.rows); assert(pHandle->realNumOfRows <= binfo.rows);
// data block has been loaded, todo extract method // data block has been loaded, todo extract method
......
...@@ -137,7 +137,6 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -137,7 +137,6 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} }
if (pQInfo != NULL) { if (pQInfo != NULL) {
vDebug("vgId:%d, QInfo:%p, do qTableQuery", pVnode->vgId, pQInfo);
qTableQuery(pQInfo, vnodeRelease, pVnode); // do execute query qTableQuery(pQInfo, vnodeRelease, pVnode); // do execute query
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册