提交 3799db43 编写于 作者: wmmhello's avatar wmmhello

feat:sort table group if needed

上级 26cceaf1
...@@ -226,7 +226,6 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { ...@@ -226,7 +226,6 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, SArray* pTableList) { static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, SArray* pTableList) {
size_t tableSize = taosArrayGetSize(pTableList); size_t tableSize = taosArrayGetSize(pTableList);
assert(tableSize >= 1);
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo)); SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo));
...@@ -510,6 +509,9 @@ int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList){ ...@@ -510,6 +509,9 @@ int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList){
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId, tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId,
uint64_t taskId) { uint64_t taskId) {
if(taosArrayGetSize(tableList) == 0){
return NULL;
}
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
return NULL; return NULL;
......
...@@ -510,6 +510,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -510,6 +510,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if(pInfo->currentGroupId == -1){ if(pInfo->currentGroupId == -1){
pInfo->currentGroupId++; pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
doSetOperatorCompleted(pOperator);
return NULL;
}
SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); SArray *tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId); tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId);
pInfo->dataReader = pReader; pInfo->dataReader = pReader;
...@@ -2200,19 +2204,34 @@ SArray* generateSortByTsInfo(int32_t order) { ...@@ -2200,19 +2204,34 @@ SArray* generateSortByTsInfo(int32_t order) {
return pList; return pList;
} }
static int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, SArray* tableList, SArray* arrayReader, uint64_t queryId,
uint64_t taskId) {
for (int32_t i = 0; i < taosArrayGetSize(tableList); ++i) {
SArray* tmp = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(tmp, taosArrayGet(tableList, i));
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, tmp, queryId, taskId);
taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(tmp);
}
return TSDB_CODE_SUCCESS;
}
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SArray* tableList = taosArrayGetP(pInfo->tableListInfo->pGroupList, pInfo->currentGroupId); SArray* tableList = taosArrayGetP(pInfo->tableListInfo->pGroupList, pInfo->currentGroupId);
tsdbReaderT* pReader = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, pInfo->queryId, pInfo->taskId); createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableList,
taosArrayPush(pInfo->dataReaders, &pReader); pInfo->dataReaders, pInfo->queryId, pInfo->taskId);
// todo the total available buffer should be determined by total capacity of buffer of this task. // todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result // the additional one is reserved for merge result
int32_t tableLen = taosArrayGetSize(tableList); int32_t tableLen = taosArrayGetSize(tableList);
pInfo->sortBufSize = pInfo->bufPageSize * (tableLen + 1); pInfo->sortBufSize = pInfo->bufPageSize * ((tableLen==0?1:tableLen) + 1);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str); pInfo->pSortInputBlock, pTaskInfo->id.str);
...@@ -2302,6 +2321,10 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2302,6 +2321,10 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if (pInfo->currentGroupId == -1) { if (pInfo->currentGroupId == -1) {
pInfo->currentGroupId++; pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
doSetOperatorCompleted(pOperator);
return NULL;
}
startGroupTableMergeScan(pOperator); startGroupTableMergeScan(pOperator);
} }
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
...@@ -2313,8 +2336,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2313,8 +2336,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
return pBlock; return pBlock;
} }
pInfo->currentGroupId++;
stopGroupTableMergeScan(pOperator); stopGroupTableMergeScan(pOperator);
pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) { if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
......
...@@ -412,6 +412,13 @@ class TDTestCase: ...@@ -412,6 +412,13 @@ class TDTestCase:
tdSql.checkColNameList(res, cname_list) tdSql.checkColNameList(res, cname_list)
# #
# test group by & order by json tag # test group by & order by json tag
tdSql.query("select ts,jtag->'tag1' from jsons1 partition by jtag->'tag1' order by jtag->'tag1' desc")
tdSql.checkRows(11)
tdSql.checkData(0, 1, '"femail"')
tdSql.checkData(2, 1, '"收到货"')
tdSql.checkData(7, 1, "false")
# tdSql.error("select count(*) from jsons1 group by jtag") # tdSql.error("select count(*) from jsons1 group by jtag")
# tdSql.error("select count(*) from jsons1 partition by jtag") # tdSql.error("select count(*) from jsons1 partition by jtag")
# tdSql.error("select count(*) from jsons1 group by jtag order by jtag") # tdSql.error("select count(*) from jsons1 group by jtag order by jtag")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册