未验证 提交 a729c159 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #13869 from taosdata/feature/3.0_debug_wxy

feat: partition by tags optimize
......@@ -65,6 +65,12 @@ extern "C" {
(list) = NULL; \
} while (0)
#define NODES_CLEAR_LIST(list) \
do { \
nodesClearList((list)); \
(list) = NULL; \
} while (0)
typedef enum ENodeType {
// Syntax nodes are used in parser and planner module, and some are also used in executor module, such as COLUMN,
// VALUE, OPERATOR, FUNCTION and so on.
......
......@@ -69,6 +69,7 @@ typedef struct SScanLogicNode {
int16_t tsColId;
double filesFactor;
SArray* pSmaIndexes;
SNodeList* pPartTags;
} SScanLogicNode;
typedef struct SJoinLogicNode {
......@@ -257,7 +258,7 @@ typedef struct STableScanPhysiNode {
double ratio;
int32_t dataRequired;
SNodeList* pDynamicScanFuncs;
SNodeList* pPartitionKeys;
SNodeList* pPartitionTags;
int64_t interval;
int64_t offset;
int64_t sliding;
......
......@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tref.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
......@@ -21,6 +20,7 @@
#include "querynodes.h"
#include "tfill.h"
#include "tname.h"
#include "tref.h"
#include "tdatablock.h"
#include "tglobal.h"
......@@ -79,7 +79,7 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#define realloc u_realloc
#endif
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; }
......@@ -2395,7 +2395,7 @@ typedef struct SFetchRspHandleWrapper {
} SFetchRspHandleWrapper;
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*) param;
SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
if (pExchangeInfo == NULL) {
......@@ -2403,7 +2403,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
return TSDB_CODE_SUCCESS;
}
int32_t index = pWrapper->sourceIndex;
int32_t index = pWrapper->sourceIndex;
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
if (code == TSDB_CODE_SUCCESS) {
......@@ -2411,9 +2411,9 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->compLen = htonl(pRsp->compLen);
pRsp->compLen = htonl(pRsp->compLen);
pRsp->numOfCols = htonl(pRsp->numOfCols);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->useconds = htobe64(pRsp->useconds);
ASSERT(pRsp != NULL);
qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
......@@ -2475,9 +2475,9 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
......@@ -2489,14 +2489,14 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
}
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
pWrapper->exchangeId = pExchangeInfo->self;
pWrapper->exchangeId = pExchangeInfo->self;
pWrapper->sourceIndex = sourceIndex;
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = TDMT_VND_FETCH;
pMsgSendInfo->fp = loadRemoteDataCallback;
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
......@@ -2645,7 +2645,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskId:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
", completed:%d try next %d/%"PRIzu,
", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources);
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
......@@ -2663,8 +2663,9 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 ", completed:%d try next %d/%" PRIzu,
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64
", completed:%d try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pRes->info.rows, pDataInfo->totalRows,
pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources);
completed += 1;
......@@ -2718,7 +2719,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
int64_t endTs = taosGetTimestampUs();
qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
totalSources, (endTs - startTs)/1000.0);
totalSources, (endTs - startTs) / 1000.0);
pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
......@@ -2849,8 +2850,8 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY;
dataInfo.taskId = id;
dataInfo.index = i;
SSourceDataInfo *pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
dataInfo.index = i;
SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
if (pDs == NULL) {
taosArrayDestroy(pInfo->pSourceDataInfo);
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -2864,7 +2865,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo*
size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
if (numOfSources == 0) {
qError("%s invalid number: %d of sources in exchange operator", id, (int32_t) numOfSources);
qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
return TSDB_CODE_INVALID_PARA;
}
......@@ -2898,16 +2899,16 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
tsem_init(&pInfo->ready, 0, 0);
pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter;
pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
pOperator->name = "ExchangeOperator";
pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter;
pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pInfo->pResult->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL);
......@@ -4066,7 +4067,7 @@ void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
}
void doDestroyExchangeOperatorInfo(void* param) {
SExchangeInfo* pExInfo = (SExchangeInfo*) param;
SExchangeInfo* pExInfo = (SExchangeInfo*)param;
taosArrayDestroy(pExInfo->pSources);
taosArrayDestroy(pExInfo->pSourceDataInfo);
......@@ -4554,8 +4555,8 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
return TSDB_CODE_SUCCESS;
}
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey){
if(groupKey == NULL) {
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey) {
if (groupKey == NULL) {
return TDB_CODE_SUCCESS;
}
......@@ -4564,11 +4565,11 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t keyLen = 0;
void *keyBuf = NULL;
void* keyBuf = NULL;
int32_t numOfGroupCols = taosArrayGetSize(groupKey);
for (int32_t j = 0; j < numOfGroupCols; ++j) {
SColumn* pCol = taosArrayGet(groupKey, j);
keyLen += pCol->bytes; // actual data + null_flag
keyLen += pCol->bytes; // actual data + null_flag
}
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
......@@ -4579,9 +4580,9 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
return TSDB_CODE_OUT_OF_MEMORY;
}
for(int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++){
STableKeyInfo *info = taosArrayGet(pTableListInfo->pTableList, i);
SMetaReader mr = {0};
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
metaGetTableEntryByUid(&mr, info->uid);
......@@ -4590,23 +4591,23 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
for (int32_t j = 0; j < numOfGroupCols; ++j) {
SColumn* pCol = taosArrayGet(groupKey, j);
if(strcmp(pCol->name, "tbname") == 0){
if (strcmp(pCol->name, "tbname") == 0) {
isNull[i] = 0;
memcpy(pStart, mr.me.name, strlen(mr.me.name));
pStart += strlen(mr.me.name);
}else{
} else {
STagVal tagVal = {0};
tagVal.cid = pCol->colId;
const char* p = metaGetTableTagVal(&mr.me, pCol->type, &tagVal);
if(p == NULL){
if (p == NULL) {
isNull[j] = 1;
continue;
}
isNull[i] = 0;
if (pCol->type == TSDB_DATA_TYPE_JSON) {
// int32_t dataLen = getJsonValueLen(pkey->pData);
// memcpy(pStart, (pkey->pData), dataLen);
// pStart += dataLen;
// int32_t dataLen = getJsonValueLen(pkey->pData);
// memcpy(pStart, (pkey->pData), dataLen);
// pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
memcpy(pStart, tagVal.pData, tagVal.nData);
pStart += tagVal.nData;
......@@ -4618,7 +4619,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
}
}
int32_t len = (int32_t) (pStart - (char*)keyBuf);
int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t* groupId = taosHashGet(pTableListInfo->map, keyBuf, len);
if (groupId) {
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), groupId, sizeof(uint64_t));
......@@ -4653,7 +4654,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
if (code) {
......@@ -4661,7 +4662,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator;
......@@ -4671,11 +4672,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray* dataReaders = taosArrayInit(8, POINTER_BYTES);
createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond);
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
SOperatorInfo* pOperator =
createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo);
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator;
......@@ -4700,16 +4700,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
}
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys);
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json
SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags);
int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json
taosArrayDestroy(groupKeys);
if (code){
if (code) {
tsdbCleanupReadHandle(pDataReader);
return NULL;
}
SOperatorInfo* pOperator =
createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
......@@ -4718,7 +4717,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pScanPhyNode->node.pConditions);
int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo,
pScanPhyNode->node.pConditions);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
......@@ -4796,7 +4796,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode * pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
......@@ -5034,7 +5034,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
}
SArray* extractPartitionColInfo(SNodeList* pNodeList) {
if(!pNodeList) return NULL;
if (!pNodeList) return NULL;
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
if (pList == NULL) {
......@@ -5139,7 +5139,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, &metaArg, res);
if (code == TSDB_CODE_INDEX_REBUILDING){ // todo
if (code == TSDB_CODE_INDEX_REBUILDING) { // todo
// doFilter();
} else if (code != TSDB_CODE_SUCCESS) {
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
......@@ -5499,4 +5499,3 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
}
return code;
}
......@@ -355,6 +355,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(tsColId);
COPY_SCALAR_FIELD(filesFactor);
CLONE_NODE_LIST_FIELD(pPartTags);
return (SNode*)pDst;
}
......@@ -495,7 +496,7 @@ static SNode* physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhys
COPY_SCALAR_FIELD(ratio);
COPY_SCALAR_FIELD(dataRequired);
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
CLONE_NODE_LIST_FIELD(pPartitionKeys);
CLONE_NODE_LIST_FIELD(pPartitionTags);
COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding);
......
......@@ -515,6 +515,7 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
static const char* jkScanLogicPlanTableId = "TableId";
static const char* jkScanLogicPlanTableType = "TableType";
static const char* jkScanLogicPlanTagCond = "TagCond";
static const char* jkScanLogicPlanPartTags = "PartTags";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
......@@ -535,6 +536,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkScanLogicPlanPartTags, pNode->pPartTags);
}
return code;
}
......@@ -559,6 +563,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkScanLogicPlanPartTags, &pNode->pPartTags);
}
return code;
}
......@@ -1368,6 +1375,7 @@ static const char* jkTableScanPhysiPlanTriggerType = "triggerType";
static const char* jkTableScanPhysiPlanWatermark = "watermark";
static const char* jkTableScanPhysiPlanTsColId = "tsColId";
static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor";
static const char* jkTableScanPhysiPlanPartitionTags = "PartitionTags";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
......@@ -1421,6 +1429,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkTableScanPhysiPlanPartitionTags, pNode->pPartitionTags);
}
return code;
}
......@@ -1446,30 +1457,24 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTableScanPhysiPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType, code);
......@@ -1483,6 +1488,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTableScanPhysiPlanPartitionTags, &pNode->pPartitionTags);
}
return code;
}
......
......@@ -109,9 +109,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
return false;
}
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent) ||
(QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) &&
pNode->pParent->pParent &&
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent)) ) {
(QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode->pParent) && pNode->pParent->pParent &&
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent->pParent))) {
return true;
}
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
......@@ -222,9 +221,8 @@ static int32_t osdGetDataRequired(SNodeList* pFuncs) {
static void setScanWindowInfo(SScanLogicNode* pScan) {
SLogicNode* pParent = pScan->node.pParent;
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) &&
pParent->pParent &&
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) {
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) && pParent->pParent &&
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent->pParent)) {
pParent = pParent->pParent;
}
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pParent)) {
......@@ -1041,12 +1039,55 @@ static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan)
return smaOptimizeImpl(pCxt, pLogicSubplan, pScan);
}
static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
}
return DEAL_RES_CONTINUE;
}
static bool partTagsOptHasCol(SNodeList* pPartKeys) {
bool hasCol = false;
nodesWalkExprs(pPartKeys, partTagsOptHasColImpl, &hasCol);
return hasCol;
}
static bool partTagsOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
return false;
}
return !partTagsOptHasCol(((SPartitionLogicNode*)pNode)->pPartitionKeys);
}
static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SPartitionLogicNode* pPart =
(SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized);
if (NULL == pPart) {
return TSDB_CODE_SUCCESS;
}
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->node.pChildren, 0);
TSWAP(pPart->pPartitionKeys, pScan->pPartTags);
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pPart, (SLogicNode*)pScan);
if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pPart->node.pChildren);
nodesDestroyNode((SNode*)pPart);
}
return code;
}
// clang-format off
static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "OptimizeScanData", .optimizeFunc = osdOptimize},
{.pName = "OptimizeScanData", .optimizeFunc = osdOptimize},
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaOptimize}
{.pName = "SmaIndex", .optimizeFunc = smaOptimize},
{.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize}
};
// clang-format on
......
......@@ -500,7 +500,9 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
pTableScan->dataRequired = pScanLogicNode->dataRequired;
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
if (NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) {
pTableScan->pPartitionTags = nodesCloneList(pScanLogicNode->pPartTags);
if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) ||
(NULL != pScanLogicNode->pPartTags && NULL == pTableScan->pPartitionTags)) {
nodesDestroyNode((SNode*)pTableScan);
return TSDB_CODE_OUT_OF_MEMORY;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册