提交 051020f9 编写于 作者: wmmhello's avatar wmmhello

fix: remove table group code in 2.x

上级 dfbd780b
...@@ -53,10 +53,9 @@ typedef enum EStreamType { ...@@ -53,10 +53,9 @@ typedef enum EStreamType {
} EStreamType; } EStreamType;
typedef struct { typedef struct {
uint32_t numOfTables; SArray* pTableList;
SArray* pGroupList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid SHashObj* map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo; } STableListInfo;
typedef struct SColumnDataAgg { typedef struct SColumnDataAgg {
int16_t colId; int16_t colId;
......
...@@ -99,9 +99,9 @@ typedef void *tsdbReaderT; ...@@ -99,9 +99,9 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3 #define BLOCK_LOAD_TABLE_RR_ORDER 3
tsdbReaderT *tsdbQueryTables(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, tsdbReaderT *tsdbQueryTables(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId,
void *pMemRef); void *pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT *pReader); bool isTsdbCacheLastRow(tsdbReaderT *pReader);
...@@ -112,9 +112,6 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockI ...@@ -112,9 +112,6 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockI
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave); int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond); void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond);
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle); void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
// tq // tq
......
...@@ -114,11 +114,10 @@ int tsdbCommit(STsdb* pTsdb); ...@@ -114,11 +114,10 @@ int tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef); void* pMemRef);
int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo);
int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever); int32_t tsdbSnapshotReaderOpen(STsdb* pTsdb, STsdbSnapshotReader** ppReader, int64_t sver, int64_t ever);
int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader); int32_t tsdbSnapshotReaderClose(STsdbSnapshotReader* pReader);
int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData); int32_t tsdbSnapshotRead(STsdbSnapshotReader* pReader, void** ppData, uint32_t* nData);
......
...@@ -146,8 +146,8 @@ typedef struct STableGroupSupporter { ...@@ -146,8 +146,8 @@ typedef struct STableGroupSupporter {
SSchema* pTagSchema; SSchema* pTagSchema;
} STableGroupSupporter; } STableGroupSupporter;
static STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList); static STimeWindow updateLastrowForEachGroup(STableListInfo* pList);
static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList); static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pList);
static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle); static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey); // static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey);
...@@ -233,41 +233,34 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { ...@@ -233,41 +233,34 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
return rows; return rows;
} }
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList) { static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableListInfo* pTableList) {
size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList); size_t tableSize = taosArrayGetSize(pTableList->pTableList);
assert(numOfGroup >= 1); assert(tableSize >= 1);
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo)); SArray* pTableCheckInfo = taosArrayInit(tableSize, sizeof(STableCheckInfo));
if (pTableCheckInfo == NULL) { if (pTableCheckInfo == NULL) {
return NULL; return NULL;
} }
// todo apply the lastkey of table check to avoid to load header file // todo apply the lastkey of table check to avoid to load header file
for (int32_t i = 0; i < numOfGroup; ++i) { for (int32_t j = 0; j < tableSize; ++j) {
SArray* group = *(SArray**)taosArrayGet(pGroupList->pGroupList, i); STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pTableList->pTableList, j);
size_t gsize = taosArrayGetSize(group); STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
assert(gsize > 0); if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
for (int32_t j = 0; j < gsize; ++j) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(group, j);
STableCheckInfo info = {.lastKey = pKeyInfo->lastKey, .tableId = pKeyInfo->uid};
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReadHandle->window.skey) {
info.lastKey = pTsdbReadHandle->window.skey;
}
assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
} else {
info.lastKey = pTsdbReadHandle->window.skey; info.lastKey = pTsdbReadHandle->window.skey;
} }
taosArrayPush(pTableCheckInfo, &info); assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, } else {
info.lastKey, pTsdbReadHandle->idStr); info.lastKey = pTsdbReadHandle->window.skey;
} }
taosArrayPush(pTableCheckInfo, &info);
tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId,
info.lastKey, pTsdbReadHandle->idStr);
} }
// TODO group table according to the tag value. // TODO group table according to the tag value.
...@@ -478,7 +471,7 @@ _end: ...@@ -478,7 +471,7 @@ _end:
return NULL; return NULL;
} }
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
uint64_t taskId) { uint64_t taskId) {
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
...@@ -490,7 +483,7 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG ...@@ -490,7 +483,7 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG
} }
// todo apply the lastkey of table check to avoid to load header file // todo apply the lastkey of table check to avoid to load header file
pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList); pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, tableList);
if (pTsdbReadHandle->pTableCheckInfo == NULL) { if (pTsdbReadHandle->pTableCheckInfo == NULL) {
// tsdbCleanupReadHandle(pTsdbReadHandle); // tsdbCleanupReadHandle(pTsdbReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
...@@ -520,8 +513,8 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG ...@@ -520,8 +513,8 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG
} }
} }
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, group %" PRIzu " %s", pTsdbReadHandle, tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle,
taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(groupList->pGroupList), taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList->pTableList),
pTsdbReadHandle->idStr); pTsdbReadHandle->idStr);
return (tsdbReaderT)pTsdbReadHandle; return (tsdbReaderT)pTsdbReadHandle;
...@@ -565,7 +558,7 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) { ...@@ -565,7 +558,7 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) {
resetCheckInfo(pTsdbReadHandle); resetCheckInfo(pTsdbReadHandle);
} }
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableGroupInfo* groupList) { void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList) {
STsdbReadHandle* pTsdbReadHandle = queryHandle; STsdbReadHandle* pTsdbReadHandle = queryHandle;
pTsdbReadHandle->order = pCond->order; pTsdbReadHandle->order = pCond->order;
...@@ -607,21 +600,21 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCon ...@@ -607,21 +600,21 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCon
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
} }
tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId,
uint64_t taskId) { uint64_t taskId) {
pCond->twindow = updateLastrowForEachGroup(groupList); pCond->twindow = updateLastrowForEachGroup(pList);
// no qualified table // no qualified table
if (groupList->numOfTables == 0) { if (taosArrayGetSize(pList->pTableList) == 0) {
return NULL; return NULL;
} }
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, groupList, qId, taskId); STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pList, qId, taskId);
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
return NULL; return NULL;
} }
int32_t code = checkForCachedLastRow(pTsdbReadHandle, groupList); int32_t code = checkForCachedLastRow(pTsdbReadHandle, pList);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code; terrno = code;
return NULL; return NULL;
...@@ -667,60 +660,60 @@ SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) { ...@@ -667,60 +660,60 @@ SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) {
} }
// leave only one table for each group // leave only one table for each group
static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) { //static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) {
assert(pGroupList); // assert(pGroupList);
size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList); // size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
//
STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo)); // STableGroupInfo* pNew = taosMemoryCalloc(1, sizeof(STableGroupInfo));
pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES); // pNew->pGroupList = taosArrayInit(numOfGroup, POINTER_BYTES);
//
for (int32_t i = 0; i < numOfGroup; ++i) { // for (int32_t i = 0; i < numOfGroup; ++i) {
SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i); // SArray* oneGroup = taosArrayGetP(pGroupList->pGroupList, i);
size_t numOfTables = taosArrayGetSize(oneGroup); // size_t numOfTables = taosArrayGetSize(oneGroup);
//
SArray* px = taosArrayInit(4, sizeof(STableKeyInfo)); // SArray* px = taosArrayInit(4, sizeof(STableKeyInfo));
for (int32_t j = 0; j < numOfTables; ++j) { // for (int32_t j = 0; j < numOfTables; ++j) {
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j); // STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(oneGroup, j);
// if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) { // // if (window->skey <= pInfo->lastKey && ((STable*)pInfo->pTable)->lastKey != TSKEY_INITIAL_VAL) {
// taosArrayPush(px, pInfo); // // taosArrayPush(px, pInfo);
// pNew->numOfTables += 1; // // pNew->numOfTables += 1;
// break; // // break;
// } // // }
} // }
//
// there are no data in this group // // there are no data in this group
if (taosArrayGetSize(px) == 0) { // if (taosArrayGetSize(px) == 0) {
taosArrayDestroy(px); // taosArrayDestroy(px);
} else { // } else {
taosArrayPush(pNew->pGroupList, &px); // taosArrayPush(pNew->pGroupList, &px);
} // }
} // }
//
return pNew; // return pNew;
} //}
tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
uint64_t qId, uint64_t taskId) {
STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
if (pNew->numOfTables == 0) {
tsdbDebug("update query time range to invalidate time window");
assert(taosArrayGetSize(pNew->pGroupList) == 0);
bool asc = ASCENDING_TRAVERSE(pCond->order);
if (asc) {
pCond->twindow.ekey = pCond->twindow.skey - 1;
} else {
pCond->twindow.skey = pCond->twindow.ekey - 1;
}
}
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId);
pTsdbReadHandle->loadExternalRow = true;
pTsdbReadHandle->currentLoadExternalRows = true;
return pTsdbReadHandle; //tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
} // uint64_t qId, uint64_t taskId) {
// STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
//
// if (pNew->numOfTables == 0) {
// tsdbDebug("update query time range to invalidate time window");
//
// assert(taosArrayGetSize(pNew->pGroupList) == 0);
// bool asc = ASCENDING_TRAVERSE(pCond->order);
// if (asc) {
// pCond->twindow.ekey = pCond->twindow.skey - 1;
// } else {
// pCond->twindow.skey = pCond->twindow.ekey - 1;
// }
// }
//
// STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)tsdbQueryTables(pVnode, pCond, pNew, qId, taskId);
// pTsdbReadHandle->loadExternalRow = true;
// pTsdbReadHandle->currentLoadExternalRows = true;
//
// return pTsdbReadHandle;
//}
static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) { static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pCheckInfo) {
if (pCheckInfo->initBuf) { if (pCheckInfo->initBuf) {
...@@ -3338,8 +3331,8 @@ bool isTsdbCacheLastRow(tsdbReaderT* pReader) { ...@@ -3338,8 +3331,8 @@ bool isTsdbCacheLastRow(tsdbReaderT* pReader) {
return ((STsdbReadHandle*)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE; return ((STsdbReadHandle*)pReader)->cachelastrow > TSDB_CACHED_TYPE_NONE;
} }
int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* groupList) { int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableListInfo* tableList) {
assert(pTsdbReadHandle != NULL && groupList != NULL); assert(pTsdbReadHandle != NULL && tableList != NULL);
// TSKEY key = TSKEY_INITIAL_VAL; // TSKEY key = TSKEY_INITIAL_VAL;
// //
...@@ -3386,68 +3379,68 @@ int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) { ...@@ -3386,68 +3379,68 @@ int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) {
return code; return code;
} }
STimeWindow updateLastrowForEachGroup(STableGroupInfo* groupList) { STimeWindow updateLastrowForEachGroup(STableListInfo* pList) {
STimeWindow window = {INT64_MAX, INT64_MIN}; STimeWindow window = {INT64_MAX, INT64_MIN};
int32_t totalNumOfTable = 0; // int32_t totalNumOfTable = 0;
SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t)); // SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
//
// NOTE: starts from the buffer in case of descending timestamp order check data blocks // // NOTE: starts from the buffer in case of descending timestamp order check data blocks
size_t numOfGroups = taosArrayGetSize(groupList->pGroupList); // size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
for (int32_t j = 0; j < numOfGroups; ++j) { // for (int32_t j = 0; j < numOfGroups; ++j) {
SArray* pGroup = taosArrayGetP(groupList->pGroupList, j); // SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
TSKEY key = TSKEY_INITIAL_VAL; // TSKEY key = TSKEY_INITIAL_VAL;
//
STableKeyInfo keyInfo = {0}; // STableKeyInfo keyInfo = {0};
//
size_t numOfTables = taosArrayGetSize(pGroup); // size_t numOfTables = taosArrayGetSize(pGroup);
for (int32_t i = 0; i < numOfTables; ++i) { // for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i); // STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i);
//
// if the lastKey equals to INT64_MIN, there is no data in this table // // if the lastKey equals to INT64_MIN, there is no data in this table
TSKEY lastKey = 0; //((STable*)(pInfo->pTable))->lastKey; // TSKEY lastKey = 0; //((STable*)(pInfo->pTable))->lastKey;
if (key < lastKey) { // if (key < lastKey) {
key = lastKey; // key = lastKey;
//
// keyInfo.pTable = pInfo->pTable; // // keyInfo.pTable = pInfo->pTable;
keyInfo.lastKey = key; // keyInfo.lastKey = key;
pInfo->lastKey = key; // pInfo->lastKey = key;
//
if (key < window.skey) { // if (key < window.skey) {
window.skey = key; // window.skey = key;
} // }
//
if (key > window.ekey) { // if (key > window.ekey) {
window.ekey = key; // window.ekey = key;
} // }
} // }
} // }
//
// more than one table in each group, only one table left for each group // // more than one table in each group, only one table left for each group
// if (keyInfo.pTable != NULL) { // // if (keyInfo.pTable != NULL) {
// totalNumOfTable++; // // totalNumOfTable++;
// if (taosArrayGetSize(pGroup) == 1) { // // if (taosArrayGetSize(pGroup) == 1) {
// // do nothing // // // do nothing
// } else { // // } else {
// taosArrayClear(pGroup); // // taosArrayClear(pGroup);
// taosArrayPush(pGroup, &keyInfo); // // taosArrayPush(pGroup, &keyInfo);
// } // // }
// } else { // mark all the empty groups, and remove it later // // } else { // mark all the empty groups, and remove it later
// taosArrayDestroy(pGroup); // // taosArrayDestroy(pGroup);
// taosArrayPush(emptyGroup, &j); // // taosArrayPush(emptyGroup, &j);
// } // // }
} // }
//
// window does not being updated, so set the original // // window does not being updated, so set the original
if (window.skey == INT64_MAX && window.ekey == INT64_MIN) { // if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
window = TSWINDOW_INITIALIZER; // window = TSWINDOW_INITIALIZER;
assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups); // assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups);
} // }
//
taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup)); // taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup));
taosArrayDestroy(emptyGroup); // taosArrayDestroy(emptyGroup);
//
groupList->numOfTables = totalNumOfTable; // groupList->numOfTables = totalNumOfTable;
return window; return window;
} }
...@@ -3873,81 +3866,6 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd ...@@ -3873,81 +3866,6 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
// return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
//} //}
int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta*)pMeta, 0);
if (metaGetTableEntryByUid(&mr, uid) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
goto _error;
}
metaReaderClear(&mr);
pGroupInfo->numOfTables = 1;
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
STableKeyInfo info = {.lastKey = startKey, .uid = uid};
taosArrayPush(group, &info);
taosArrayPush(pGroupInfo->pGroupList, &group);
return TSDB_CODE_SUCCESS;
_error:
metaReaderClear(&mr);
return terrno;
}
#if 0
int32_t tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
if (tsdbRLockRepoMeta(tsdb) < 0) {
return terrno;
}
assert(pTableIdList != NULL);
size_t size = taosArrayGetSize(pTableIdList);
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
for(int32_t i = 0; i < size; ++i) {
STableIdInfo *id = taosArrayGet(pTableIdList, i);
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid);
if (pTable == NULL) {
tsdbWarn("table uid:%"PRIu64", tid:%d has been drop already", id->uid, id->tid);
continue;
}
if (pTable->type == TSDB_SUPER_TABLE) {
tsdbError("direct query on super tale is not allowed, table uid:%"PRIu64", tid:%d", id->uid, id->tid);
terrno = TSDB_CODE_QRY_INVALID_MSG;
tsdbUnlockRepoMeta(tsdb);
taosArrayDestroy(group);
return terrno;
}
STableKeyInfo info = {.pTable = pTable, .lastKey = id->key};
taosArrayPush(group, &info);
}
if (tsdbUnlockRepoMeta(tsdb) < 0) {
taosArrayDestroy(group);
return terrno;
}
pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(group);
if (pGroupInfo->numOfTables > 0) {
taosArrayPush(pGroupInfo->pGroupList, &group);
} else {
taosArrayDestroy(group);
}
return TSDB_CODE_SUCCESS;
}
#endif
static void* doFreeColumnInfoData(SArray* pColumnInfoData) { static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
if (pColumnInfoData == NULL) { if (pColumnInfoData == NULL) {
return NULL; return NULL;
...@@ -4018,30 +3936,6 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { ...@@ -4018,30 +3936,6 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) {
} }
#if 0 #if 0
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
assert(pGroupList != NULL);
size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
for(int32_t i = 0; i < numOfGroup; ++i) {
SArray* p = taosArrayGetP(pGroupList->pGroupList, i);
size_t numOfTables = taosArrayGetSize(p);
for(int32_t j = 0; j < numOfTables; ++j) {
STable* pTable = taosArrayGetP(p, j);
if (pTable != NULL) { // in case of handling retrieve data from tsdb
tsdbUnRefTable(pTable);
}
//assert(pTable != NULL);
}
taosArrayDestroy(p);
}
taosHashCleanup(pGroupList->map);
taosArrayDestroy(pGroupList->pGroupList);
pGroupList->numOfTables = 0;
}
static void applyFilterToSkipListNode(SSkipList *pSkipList, tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) { static void applyFilterToSkipListNode(SSkipList *pSkipList, tExprNode *pExpr, SArray *pResult, SExprTraverseSupp *param) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList); SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
......
...@@ -147,16 +147,10 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) { ...@@ -147,16 +147,10 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
} }
// wrapper of tsdb read interface // wrapper of tsdb read interface
tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo* tableList, uint64_t qId,
void *pMemRef) { void *pMemRef) {
#if 0 #if 0
return tsdbQueryCacheLastT(pVnode->pTsdb, pCond, groupList, qId, pMemRef); return tsdbQueryCacheLastT(pVnode->pTsdb, pCond, groupList, qId, pMemRef);
#endif #endif
return 0; return 0;
}
int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo) {
#if 0
return tsdbGetTableGroupFromIdListT(pVnode->pTsdb, pTableIdList, pGroupInfo);
#endif
return 0;
} }
\ No newline at end of file
...@@ -49,7 +49,7 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int ...@@ -49,7 +49,7 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) //#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) #define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
...@@ -153,7 +153,7 @@ typedef struct STaskAttr { ...@@ -153,7 +153,7 @@ typedef struct STaskAttr {
int32_t numOfFilterCols; int32_t numOfFilterCols;
int64_t* fillVal; int64_t* fillVal;
void* tsdb; void* tsdb;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> // STableListInfo tableGroupInfo; // table list
int32_t vgId; int32_t vgId;
} STaskAttr; } STaskAttr;
...@@ -193,7 +193,7 @@ typedef struct SExecTaskInfo { ...@@ -193,7 +193,7 @@ typedef struct SExecTaskInfo {
int32_t tversion; int32_t tversion;
} schemaVer; } schemaVer;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableListInfo tableqinfoList; // this is a table list
char* sql; // query sql string char* sql; // query sql string
jmp_buf env; // jump to this position when error happens. jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
...@@ -215,7 +215,7 @@ typedef struct STaskRuntimeEnv { ...@@ -215,7 +215,7 @@ typedef struct STaskRuntimeEnv {
STSCursor cur; STSCursor cur;
char* tagVal; // tag value of current data block char* tagVal; // tag value of current data block
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure // STableGroupInfo tableqinfoGroupInfo; // this is a table list
struct SOperatorInfo* proot; struct SOperatorInfo* proot;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value int64_t currentOffset; // dynamic offset value
...@@ -344,7 +344,7 @@ typedef struct STagScanInfo { ...@@ -344,7 +344,7 @@ typedef struct STagScanInfo {
SArray *pColMatchInfo; SArray *pColMatchInfo;
int32_t curPos; int32_t curPos;
SReadHandle readHandle; SReadHandle readHandle;
STableGroupInfo *pTableGroups; STableListInfo *pTableList;
} STagScanInfo; } STagScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
...@@ -707,7 +707,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -707,7 +707,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
...@@ -720,21 +720,19 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB ...@@ -720,21 +720,19 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle, SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SReadHandle* pHandle,
uint64_t uid, SSDataBlock* pResBlock, SArray* pColList, uint64_t uid, SSDataBlock* pResBlock, SArray* pColList,
...@@ -748,14 +746,13 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf ...@@ -748,14 +746,13 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo); SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo);
const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
...@@ -771,8 +768,6 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo ...@@ -771,8 +768,6 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win);
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
......
...@@ -88,7 +88,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { ...@@ -88,7 +88,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#endif #endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) //#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
...@@ -1846,12 +1846,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { ...@@ -1846,12 +1846,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
} }
} }
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win) {
STableQueryInfo* pTableQueryInfo = buf;
pTableQueryInfo->lastKey = win.skey;
return pTableQueryInfo;
}
void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) { void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
if (pTableQueryInfo == NULL) { if (pTableQueryInfo == NULL) {
return; return;
...@@ -2431,7 +2425,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t ...@@ -2431,7 +2425,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); static void doDestroyTableList(STableListInfo* pTableqinfoList);
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) { static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
#if 0 #if 0
...@@ -3999,35 +3993,30 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { ...@@ -3999,35 +3993,30 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
} }
} }
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) { //static STableQueryInfo* initTableQueryInfo(const STableListInfo* pTableListInfo) {
if (pTableGroupInfo->numOfTables == 0) { // int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
return NULL; // if (size == 0) {
} // return NULL;
// }
STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); //
if (pTableQueryInfo == NULL) { // STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(size, sizeof(STableQueryInfo));
return NULL; // if (pTableQueryInfo == NULL) {
} // return NULL;
// }
int32_t index = 0; //
for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pGroupList); ++i) { // for (int32_t j = 0; j < size; ++j) {
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i); // STableKeyInfo* pk = taosArrayGet(pTableListInfo->pTableList, j);
for (int32_t j = 0; j < taosArrayGetSize(pa); ++j) { // STableQueryInfo* pTQueryInfo = &pTableQueryInfo[j];
STableKeyInfo* pk = taosArrayGet(pa, j); // pTQueryInfo->lastKey = pk->lastKey;
STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++]; // }
pTQueryInfo->lastKey = pk->lastKey; //
} // pTableQueryInfo->lastKey = 0;
} // return pTableQueryInfo;
//}
STimeWindow win = {0, INT64_MAX};
createTableQueryInfo(pTableQueryInfo, win);
return pTableQueryInfo;
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -4040,7 +4029,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -4040,7 +4029,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
initResultSizeInfo(pOperator, numOfRows); initResultSizeInfo(pOperator, numOfRows);
int32_t code = int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str);
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4438,11 +4426,10 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -4438,11 +4426,10 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
} }
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode); STableListInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode);
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond);
uint64_t queryId, uint64_t taskId, SNode* pTagCond); static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo);
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* createSortInfo(SNodeList* pNodeList); static SArray* createSortInfo(SNodeList* pNodeList);
...@@ -4471,14 +4458,24 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo ...@@ -4471,14 +4458,24 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
} }
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SNode* pTagCond) { uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, SNode* pTagCond) {
int32_t type = nodeType(pPhyNode); int32_t type = nodeType(pPhyNode);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); int32_t code = getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
return NULL;
}
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
if (pDataReader == NULL && terrno != 0) { if (pDataReader == NULL && terrno != 0) {
return NULL; return NULL;
} }
...@@ -4500,11 +4497,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4500,11 +4497,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t numOfCols = 0; int32_t numOfCols = 0;
int32_t code = getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
return NULL;
}
tsdbReaderT pDataReader = NULL; tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) { if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
} else {
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId, pTagCond);
} }
if (pDataReader == NULL && terrno != 0) { if (pDataReader == NULL && terrno != 0) {
...@@ -4517,7 +4522,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4517,7 +4522,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SArray* tableIdList = extractTableIdList(pTableGroupInfo); SArray* tableIdList = extractTableIdList(pTableListInfo);
SSDataBlock* pResBlock = createResDataBlock(pDescNode); SSDataBlock* pResBlock = createResDataBlock(pDescNode);
SArray* pCols = SArray* pCols =
...@@ -4550,8 +4555,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4550,8 +4555,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pDescNode); SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pTagCond);
queryId, taskId, pTagCond);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
...@@ -4564,7 +4568,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4564,7 +4568,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
COL_MATCH_FROM_COL_ID); COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = SOperatorInfo* pOperator =
createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo); createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableListInfo, pTaskInfo);
return pOperator; return pOperator;
} else { } else {
ASSERT(0); ASSERT(0);
...@@ -4577,7 +4581,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4577,7 +4581,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, pTagCond); ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo, pTagCond);
if (ops[i] == NULL) { if (ops[i] == NULL) {
return NULL; return NULL;
} }
...@@ -4606,10 +4610,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4606,10 +4610,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pAggNode->pGroupKeys != NULL) { if (pAggNode->pGroupKeys != NULL) {
SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys);
pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions,
pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); pScalarExprInfo, numOfScalarExpr, pTaskInfo);
} else { } else {
pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr, pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr,
pTaskInfo, pTableGroupInfo); pTaskInfo);
} }
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
...@@ -4628,8 +4632,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4628,8 +4632,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.calTrigger = pIntervalPhyNode->window.triggerType}; .calTrigger = pIntervalPhyNode->window.triggerType};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTableGroupInfo, pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo);
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
...@@ -4678,7 +4681,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4678,7 +4681,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == type) {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode; SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
...@@ -4892,75 +4895,57 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod ...@@ -4892,75 +4895,57 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
return pList; return pList;
} }
int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid,
uint64_t queryId, uint64_t taskId, SNode* pTagCond) { STableListInfo* pListInfo, SNode* pTagCond) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (tableType == TSDB_SUPER_TABLE) { pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
if (tableType == TSDB_SUPER_TABLE) {
if(pTagCond){ if(pTagCond){
SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, res); code = doFilterTag(pTagCond, res);
if (code != TSDB_CODE_SUCCESS) {
qError("doFilterTag error:%d", code);
taosArrayDestroy(res);
return code;
}
for(int i = 0; i < taosArrayGetSize(res); i++){
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)};
taosArrayPush(pListInfo->pTableList, &info);
}
taosArrayDestroy(res);
}else{ }else{
code = tsdbGetAllTableList(metaHandle, tableUid, res); code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
} }
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(pGroupInfo->pGroupList, &res);
} else { // Create one table group. } else { // Create one table group.
code = tsdbGetOneTableGroup(metaHandle, tableUid, 0, pGroupInfo); STableKeyInfo info = {.lastKey = 0, .uid = tableUid};
taosArrayPush(pListInfo->pTableList, &info);
} }
return code; return code;
} }
SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) { SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
if (pTableGroupInfo->numOfTables > 0) { // Transfer the Array of STableKeyInfo into uid list.
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, 0); for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) {
ASSERT(taosArrayGetSize(pTableGroupInfo->pGroupList) == 1); STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i);
taosArrayPush(tableIdList, &pkeyInfo->uid);
// Transfer the Array of STableKeyInfo into uid list.
size_t numOfTables = taosArrayGetSize(pa);
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pa, i);
taosArrayPush(tableIdList, &pkeyInfo->uid);
}
} }
return tableIdList; return tableIdList;
} }
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode) { STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode) {
uint64_t uid = pTableScanNode->scan.uid;
int32_t code =
doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId, pTagNode);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (pTableGroupInfo->numOfTables == 0) {
code = 0;
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
goto _error;
}
SQueryTableDataCond cond = {0}; SQueryTableDataCond cond = {0};
code = initQueryTableDataCond(&cond, pTableScanNode); int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; return NULL;
} }
return tsdbQueryTables(pHandle->vnode, &cond, pTableGroupInfo, queryId, taskId); return tsdbQueryTables(pHandle->vnode, &cond, pTableListInfo, queryId, taskId);
_error:
terrno = code;
return NULL;
} }
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
...@@ -4975,7 +4960,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4975,7 +4960,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
} }
(*pTaskInfo)->pRoot = (*pTaskInfo)->pRoot =
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo, pPlan->pTagCond); createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond);
if (NULL == (*pTaskInfo)->pRoot) { if (NULL == (*pTaskInfo)->pRoot) {
code = terrno; code = terrno;
goto _complete; goto _complete;
...@@ -5034,34 +5019,18 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) { ...@@ -5034,34 +5019,18 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
taosMemoryFree(pFilter); taosMemoryFree(pFilter);
} }
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) { static void doDestroyTableList(STableListInfo* pTableqinfoList) {
if (pTableqinfoGroupInfo->pGroupList != NULL) { taosArrayDestroy(pTableqinfoList->pTableList);
int32_t numOfGroups = (int32_t)taosArrayGetSize(pTableqinfoGroupInfo->pGroupList); taosHashCleanup(pTableqinfoList->map);
for (int32_t i = 0; i < numOfGroups; ++i) {
SArray* p = taosArrayGetP(pTableqinfoGroupInfo->pGroupList, i);
size_t num = taosArrayGetSize(p);
for (int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(p, j);
destroyTableQueryInfoImpl(item);
}
taosArrayDestroy(p);
}
}
taosArrayDestroy(pTableqinfoGroupInfo->pGroupList);
taosHashCleanup(pTableqinfoGroupInfo->map);
pTableqinfoGroupInfo->pGroupList = NULL; pTableqinfoList->pTableList = NULL;
pTableqinfoGroupInfo->map = NULL; pTableqinfoList->map = NULL;
pTableqinfoGroupInfo->numOfTables = 0;
} }
void doDestroyTask(SExecTaskInfo* pTaskInfo) { void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo); doDestroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot); destroyOperatorInfo(pTaskInfo->pRoot);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults); // taosHashCleanup(pTaskInfo->summary.operatorProfResults);
......
...@@ -346,7 +346,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -346,7 +346,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
} }
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -616,7 +616,7 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -616,7 +616,7 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
} }
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SExecTaskInfo* pTaskInfo) {
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
......
...@@ -1610,20 +1610,19 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1610,20 +1610,19 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
SExprInfo* pExprInfo = &pOperator->pExpr[0]; SExprInfo* pExprInfo = &pOperator->pExpr[0];
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
if (taosArrayGetSize(pInfo->pTableGroups->pGroupList) == 0) { int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED); setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL; return NULL;
} }
SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0);
char str[512] = {0}; char str[512] = {0};
int32_t count = 0; int32_t count = 0;
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pInfo->readHandle.meta, 0); metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) { while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos); STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
metaGetTableEntryByUid(&mr, item->uid); metaGetTableEntryByUid(&mr, item->uid);
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) { for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
...@@ -1655,7 +1654,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1655,7 +1654,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
} }
count += 1; count += 1;
if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) { if (++pInfo->curPos >= size) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
} }
...@@ -1680,14 +1679,14 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -1680,14 +1679,14 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
SSDataBlock* pResBlock, SArray* pColMatchInfo, SSDataBlock* pResBlock, SArray* pColMatchInfo,
STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
pInfo->pTableGroups = pTableGroupInfo; pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = pColMatchInfo; pInfo->pColMatchInfo = pColMatchInfo;
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
......
...@@ -1167,8 +1167,7 @@ bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { ...@@ -1167,8 +1167,7 @@ bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo) {
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -1192,8 +1191,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1192,8 +1191,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols); pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols);
pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
goto _error; goto _error;
} }
...@@ -1228,8 +1226,7 @@ _error: ...@@ -1228,8 +1226,7 @@ _error:
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo) {
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo)); SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -1285,8 +1282,7 @@ _error: ...@@ -1285,8 +1282,7 @@ _error:
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo) {
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -1308,8 +1304,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr ...@@ -1308,8 +1304,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
goto _error; goto _error;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册