提交 add5ff5b 编写于 作者: L Liu Jicong

Merge branch 'feature/3.0_liaohj' into feature/tq

#include "../../libs/scheduler/inc/schedulerInt.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "parser.h" #include "parser.h"
......
...@@ -220,7 +220,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGro ...@@ -220,7 +220,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGro
* clean up the query handle * clean up the query handle
* @param queryHandle * @param queryHandle
*/ */
void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle); void tsdbCleanupReadHandle(tsdbReadHandleT queryHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -208,8 +208,8 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SA ...@@ -208,8 +208,8 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SA
pReadHandle->pColIdList = pColIdList; pReadHandle->pColIdList = pColIdList;
} }
static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t tbUid) { static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, const SArray* pTableIdList) {
pHandle->tbUid = tbUid; // pHandle->tbUid = pTableIdList;
} }
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle* pHandle, SArray* tbUidList) { static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle* pHandle, SArray* tbUidList) {
......
...@@ -427,7 +427,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, ...@@ -427,7 +427,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
return (tsdbReadHandleT)pReadHandle; return (tsdbReadHandleT)pReadHandle;
_end: _end:
tsdbCleanupQueryHandle(pReadHandle); tsdbCleanupReadHandle(pReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -445,7 +445,7 @@ tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroup ...@@ -445,7 +445,7 @@ tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroup
// todo apply the lastkey of table check to avoid to load header file // todo apply the lastkey of table check to avoid to load header file
pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList); pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, groupList);
if (pTsdbReadHandle->pTableCheckInfo == NULL) { if (pTsdbReadHandle->pTableCheckInfo == NULL) {
// tsdbCleanupQueryHandle(pTsdbReadHandle); // tsdbCleanupReadHandle(pTsdbReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -525,7 +525,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReadHandleT queryHandle, STsdbQueryCond ...@@ -525,7 +525,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReadHandleT queryHandle, STsdbQueryCond
pTsdbReadHandle->pTableCheckInfo = NULL;//createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, &pTable); pTsdbReadHandle->pTableCheckInfo = NULL;//createCheckInfoFromTableGroup(pTsdbReadHandle, groupList, pMeta, &pTable);
if (pTsdbReadHandle->pTableCheckInfo == NULL) { if (pTsdbReadHandle->pTableCheckInfo == NULL) {
// tsdbCleanupQueryHandle(pTsdbReadHandle); // tsdbCleanupReadHandle(pTsdbReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
} }
...@@ -3051,7 +3051,7 @@ bool tsdbNextDataBlock(tsdbReadHandleT pHandle) { ...@@ -3051,7 +3051,7 @@ bool tsdbNextDataBlock(tsdbReadHandleT pHandle) {
// } // }
// //
//out_of_memory: //out_of_memory:
// tsdbCleanupQueryHandle(pSecQueryHandle); // tsdbCleanupReadHandle(pSecQueryHandle);
// return terrno; // return terrno;
//} //}
...@@ -3722,26 +3722,19 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch ...@@ -3722,26 +3722,19 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
return terrno; return terrno;
} }
#if 0
int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; STbCfg* pTbCfg = metaGetTbInfoByUid(tsdb->pMeta, uid);
if (pTbCfg == NULL) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
tsdbUnlockRepoMeta(tsdb);
goto _error; goto _error;
} }
assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_STREAM_TABLE);
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
pGroupInfo->numOfTables = 1; pGroupInfo->numOfTables = 1;
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
STableKeyInfo info = {.pTable = pTable, .lastKey = startKey}; STableKeyInfo info = {.lastKey = startKey, .uid = uid};
taosArrayPush(group, &info); taosArrayPush(group, &info);
taosArrayPush(pGroupInfo->pGroupList, &group); taosArrayPush(pGroupInfo->pGroupList, &group);
...@@ -3751,6 +3744,7 @@ int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGr ...@@ -3751,6 +3744,7 @@ int32_t tsdbGetOneTableGroup(STsdb* tsdb, uint64_t uid, TSKEY startKey, STableGr
return terrno; return terrno;
} }
#if 0
int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) { int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo) {
if (tsdbRLockRepoMeta(tsdb) < 0) { if (tsdbRLockRepoMeta(tsdb) < 0) {
return terrno; return terrno;
...@@ -3826,7 +3820,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { ...@@ -3826,7 +3820,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
} }
void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) { void tsdbCleanupReadHandle(tsdbReadHandleT queryHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle; STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
return; return;
......
...@@ -2371,7 +2371,7 @@ _clean: ...@@ -2371,7 +2371,7 @@ _clean:
static void doFreeQueryHandle(STaskRuntimeEnv* pRuntimeEnv) { static void doFreeQueryHandle(STaskRuntimeEnv* pRuntimeEnv) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
// tsdbCleanupQueryHandle(pRuntimeEnv->pTsdbReadHandle); // tsdbCleanupReadHandle(pRuntimeEnv->pTsdbReadHandle);
pRuntimeEnv->pTsdbReadHandle = NULL; pRuntimeEnv->pTsdbReadHandle = NULL;
// SMemRef* pMemRef = &pQueryAttr->memRef; // SMemRef* pMemRef = &pQueryAttr->memRef;
...@@ -5454,7 +5454,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt ...@@ -5454,7 +5454,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRunt
return pOperator; return pOperator;
} }
SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, uint64_t uid, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExprInfo, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) {
SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo)); SStreamBlockScanInfo* pInfo = calloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -5477,7 +5477,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp ...@@ -5477,7 +5477,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
// set the extract column id to streamHandle // set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList); tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList);
tqReadHandleSetTbUid(streamReadHandle, uid); tqReadHandleSetTbUid(streamReadHandle, pTableIdList);
pInfo->readerHandle = streamReadHandle; pInfo->readerHandle = streamReadHandle;
...@@ -7750,20 +7750,21 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) { ...@@ -7750,20 +7750,21 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
} }
static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId, uint64_t taskId); static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId, uint64_t taskId);
static int32_t doCreateTableGroup(void* readerHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId);
SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, void* readerHandle, uint64_t queryId, uint64_t taskId) { SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, void* readerHandle, uint64_t queryId, uint64_t taskId) {
if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) {
if (pPhyNode->info.type == OP_TableScan) { if (pPhyNode->info.type == OP_TableScan) {
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode;
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId, taskId); tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId, taskId);
return createTableScanOperatorInfo(tReaderHandle, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo); return createTableScanOperatorInfo(tReaderHandle, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo);
} else if (pPhyNode->info.type == OP_DataBlocksOptScan) { } else if (pPhyNode->info.type == OP_DataBlocksOptScan) {
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode;
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId, taskId); tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId, taskId);
...@@ -7772,8 +7773,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask ...@@ -7772,8 +7773,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode;
return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo); return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
} else if (pPhyNode->info.type == OP_StreamScan) { } else if (pPhyNode->info.type == OP_StreamScan) {
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table. SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table.
return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pScanPhyNode->uid, pTaskInfo); STableGroupInfo groupInfo = {0};
int32_t code = doCreateTableGroup(readerHandle, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
SArray* pa = taosArrayGetP(groupInfo.pGroupList, 0);
ASSERT(taosArrayGetSize(groupInfo.pGroupList) == 1);
return createStreamScanOperatorInfo(readerHandle, pPhyNode->pTargets, pa, pTaskInfo);
} }
} }
...@@ -7811,29 +7817,24 @@ static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, S ...@@ -7811,29 +7817,24 @@ static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, S
return tsdbQueryTables(readerHandle, &cond, pGroupInfo, queryId, taskId); return tsdbQueryTables(readerHandle, &cond, pGroupInfo, queryId, taskId);
} }
static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId, uint64_t taskId) { static int32_t doCreateTableGroup(void* readerHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) {
int32_t code = 0; int32_t code = 0;
STableGroupInfo groupInfo = {0};
uint64_t uid = pTableScanNode->scan.uid;
STimeWindow window = pTableScanNode->window;
int32_t tableType = pTableScanNode->scan.tableType;
if (tableType == TSDB_SUPER_TABLE) { if (tableType == TSDB_SUPER_TABLE) {
code = code = tsdbQuerySTableByTagCond(readerHandle, tableUid, 0, NULL, 0, 0, NULL, pGroupInfo, NULL, 0, queryId, taskId);
tsdbQuerySTableByTagCond(readerHandle, uid, window.skey, NULL, 0, 0, NULL, &groupInfo, NULL, 0, queryId, taskId);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
} else { // Create one table group. } else { // Create one table group.
groupInfo.numOfTables = 1; code = tsdbGetOneTableGroup(readerHandle, tableUid, 0, pGroupInfo);
groupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES); }
SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo)); return code;
}
STableKeyInfo info = {.lastKey = 0, .uid = uid}; static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId, uint64_t taskId) {
taosArrayPush(pa, &info); STableGroupInfo groupInfo = {0};
taosArrayPush(groupInfo.pGroupList, &pa);
uint64_t uid = pTableScanNode->scan.uid;
int32_t code = doCreateTableGroup(readerHandle, pTableScanNode->scan.tableType, uid, &groupInfo, queryId, taskId);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
} }
if (groupInfo.numOfTables == 0) { if (groupInfo.numOfTables == 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册