提交 ac35ae98 编写于 作者: D dapan1121

feat: add dynamic query ctrl operator

上级 02cec497
......@@ -325,6 +325,7 @@ typedef struct SDataBlockDescNode {
typedef struct SPhysiNode {
ENodeType type;
bool dynamicOp;
EOrder inputTsOrder;
EOrder outputTsOrder;
SDataBlockDescNode* pOutputDataBlockDesc;
......@@ -443,9 +444,17 @@ typedef struct SGroupCachePhysiNode {
SNodeList* pGroupCols;
} SGroupCachePhysiNode;
typedef struct SStbJoinDynCtrlInfo {
int32_t vgSlot[2];
int32_t uidSlot[2];
} SStbJoinDynCtrlInfo;
typedef struct SDynQueryCtrlPhysiNode {
SPhysiNode node;
EDynQueryType qType;
union {
SStbJoinDynCtrlInfo stbJoin;
};
} SDynQueryCtrlPhysiNode;
typedef struct SAggPhysiNode {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DYNQUERYCTRL_H
#define TDENGINE_DYNQUERYCTRL_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SDynQueryCtrlOperatorInfo {
SStbJoinDynCtrlInfo ctrlInfo;
} SDynQueryCtrlOperatorInfo;
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_DYNQUERYCTRL_H
......@@ -21,17 +21,10 @@ extern "C" {
#define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760
typedef struct SGcSessionCtx {
SOperatorInfo* pDownstream;
bool cacheHit;
bool needCache;
SGcBlkBufInfo* pLastBlk;
} SGcSessionCtx;
typedef struct SGcOperatorParam {
SOperatorBasicParam basic;
int64_t sessionId;
int32_t downstreamKey;
int32_t downstreamIdx;
bool needCache;
void* pGroupValue;
int32_t groupValueSize;
......@@ -71,18 +64,18 @@ typedef struct SGroupColsInfo {
char* pData;
} SGroupColsInfo;
typedef struct SGcDownstreamInfo {
SSHashObj* pKey2Idx;
SOperatorInfo** ppDownStream;
int32_t downStreamNum;
} SGcDownstreamInfo;
typedef struct SGcSessionCtx {
SOperatorInfo* pDownstream;
bool cacheHit;
bool needCache;
SGcBlkBufInfo* pLastBlk;
} SGcSessionCtx;
typedef struct SGroupCacheOperatorInfo {
SSHashObj* pSessionHash;
SGroupColsInfo groupColsInfo;
SArray* pBlkBufs;
SSHashObj* pBlkHash;
SGcDownstreamInfo downstreamInfo;
int64_t pCurrentId;
SGcSessionCtx* pCurrent;
} SGroupCacheOperatorInfo;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "operator.h"
#include "os.h"
#include "querynodes.h"
#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
#include "dynqueryctrl.h"
static void destroyDynQueryCtrlOperator(void* param) {
SDynQueryCtrlOperatorInfo* pDynCtrlOperator = (SDynQueryCtrlOperatorInfo*)param;
taosMemoryFreeClear(param);
}
SSDataBlock* getBlockFromDynQueryCtrl(SOperatorInfo* pOperator) {
SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
while (true) {
SSDataBlock* pBlock = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]);
if (NULL == pBlock) {
break;
}
addBlkToGroupCache(pOperator, pBlock, &pRes);
}
}
SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (TSDB_CODE_SUCCESS != code) {
goto _error;
}
memcpy(&pInfo->ctrlInfo, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getBlockFromDynQueryCtrl, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn, NULL, NULL, NULL);
return pOperator;
_error:
if (pInfo != NULL) {
destroyDynQueryCtrlOperator(pInfo);
}
taosMemoryFree(pOperator);
pTaskInfo->code = code;
return NULL;
}
......@@ -74,14 +74,12 @@ static void destroyGroupCacheOperator(void* param) {
taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage);
tSimpleHashCleanup(pGrpCacheOperator->pSessionHash);
tSimpleHashCleanup(pGrpCacheOperator->pBlkHash);
tSimpleHashCleanup(pGrpCacheOperator->downstreamInfo.pKey2Idx);
taosMemoryFree(pGrpCacheOperator->downstreamInfo.ppDownStream);
taosMemoryFreeClear(param);
}
static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) {
SBufPageInfo page;
SGcBufPageInfo page;
page.pageSize = GROUP_CACHE_DEFAULT_PAGE_SIZE;
page.offset = 0;
page.data = taosMemoryMalloc(page.pageSize);
......@@ -94,7 +92,7 @@ static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) {
}
static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo* pBlkInfo) {
SBufPageInfo *pPage = taosArrayGet(pBlkBufs, pBlkInfo->pageId);
SGcBufPageInfo *pPage = taosArrayGet(pBlkBufs, pBlkInfo->pageId);
return pPage->data + pBlkInfo->offset;
}
......@@ -105,7 +103,7 @@ static FORCE_INLINE char* moveRetrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBuf
SGcBlkBufInfo* pCurr = (*ppLastBlk)->next;
*ppLastBlk = pCurr;
if (pCurr) {
SBufPageInfo *pPage = taosArrayGet(pBlkBufs, pCurr->pageId);
SGcBufPageInfo *pPage = taosArrayGet(pBlkBufs, pCurr->pageId);
return pPage->data + pCurr->offset;
}
......@@ -114,7 +112,7 @@ static FORCE_INLINE char* moveRetrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBuf
static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) {
pInfo->pBlkBufs = taosArrayInit(32, sizeof(SBufPageInfo));
pInfo->pBlkBufs = taosArrayInit(32, sizeof(SGcBufPageInfo));
if (NULL == pInfo->pBlkBufs) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -128,47 +126,35 @@ static int32_t initGroupCacheDownstreamInfo(SGroupCachePhysiNode* pPhyciNode, SO
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pInfo->ppDownStream, pDownstream, numOfDownstream * POINTER_BYTES);
pInfo->pKey2Idx = tSimpleHashInit(numOfDownstream, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pInfo->pKey2Idx) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfDownstream; ++i) {
int32_t keyValue = taosArrayGet(pPhyciNode, i);
tSimpleHashPut(pInfo->pKey2Idx, &keyValue, sizeof(keyValue), &i, sizeof(i));
}
return TSDB_CODE_SUCCESS;
}
static int32_t initGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) {
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) {
SGcSessionCtx ctx = {0};
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGroupData* pGroup = tSimpleHashGet(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize);
if (pGroup) {
ctx.cacheHit = true;
ctx.pLastBlk = pGroup->blks;
} else {
int32_t* pIdx = tSimpleHashGet(pGCache->downstreamInfo.pKey2Idx, &pParam->downstreamKey, sizeof(pParam->downstreamKey));
if (NULL == pIdx) {
qError("Invalid downstream key value: %d", pParam->downstreamKey);
return TSDB_CODE_INVALID_PARA;
}
ctx.pDownstream = pGCache->downstreamInfo.ppDownStream[*pIdx];
ctx.pDownstream = pOperator->pDownstream[pParam->downstreamIdx];
ctx.needCache = pParam->needCache;
}
return TSDB_CODE_SUCCESS;
}
static void getFromSessionCache(SExecTaskInfo* pTaskInfo, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) {
static void getFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pParam->basic.newExec) {
int32_t code = initGroupCacheSession(pGCache, pParam, ppSession);
int32_t code = initGroupCacheSession(pOperator, pParam, ppSession);
if (TSDB_CODE_SUCCESS != code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if ((*ppSession)->pLastBlk) {
*ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, (*ppSession)->pLastBlk);
*ppRes = (SSDataBlock*)retrieveBlkFromBlkBufs(pGCache->pBlkBufs, (*ppSession)->pLastBlk);
} else {
*ppRes = NULL;
}
......@@ -184,7 +170,7 @@ static void getFromSessionCache(SExecTaskInfo* pTaskInfo, SGroupCacheOperatorInf
*ppSession = pCtx;
if (pCtx->cacheHit) {
*ppRes = moveRetrieveBlkFromBlkBufs(pGCache->pBlkBufs, &pCtx->pLastBlk);
*ppRes = (SSDataBlock*)moveRetrieveBlkFromBlkBufs(pGCache->pBlkBufs, &pCtx->pLastBlk);
return;
}
......@@ -223,7 +209,7 @@ static SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator, SOperator
return NULL;
}
getFromSessionCache(pTaskInfo, pGCache, pParam, &pRes, &pSession);
getFromSessionCache(pOperator, pGCache, pParam, &pRes, &pSession);
pGCache->pCurrent = pSession;
pGCache->pCurrentId = pParam->sessionId;
......@@ -283,8 +269,8 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
goto _error;
}
code = initGroupCacheDownstreamInfo(pPhyciNode, pDownstream, numOfDownstream, &pInfo->downstreamInfo);
if (code) {
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (TSDB_CODE_SUCCESS != code) {
goto _error;
}
......
......@@ -810,6 +810,11 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
goto _error;
}
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, NULL, NULL);
return pOperator;
......
......@@ -1853,7 +1853,8 @@ enum {
PHY_NODE_CODE_LIMIT,
PHY_NODE_CODE_SLIMIT,
PHY_NODE_CODE_INPUT_TS_ORDER,
PHY_NODE_CODE_OUTPUT_TS_ORDER
PHY_NODE_CODE_OUTPUT_TS_ORDER,
PHY_NODE_CODE_DYNAMIC_OP,
};
static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
......@@ -1878,6 +1879,9 @@ static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_OUTPUT_TS_ORDER, pNode->outputTsOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_NODE_CODE_DYNAMIC_OP, pNode->dynamicOp);
}
return code;
}
......@@ -1910,6 +1914,9 @@ static int32_t msgToPhysiNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_NODE_CODE_OUTPUT_TS_ORDER:
code = tlvDecodeEnum(pTlv, &pNode->outputTsOrder, sizeof(pNode->outputTsOrder));
break;
case PHY_NODE_CODE_DYNAMIC_OP:
code = tlvDecodeBool(pTlv, &pNode->dynamicOp);
break;
default:
break;
}
......
......@@ -3210,6 +3210,7 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL
break;
}
pScan->node.dynamicOp = true;
pScan->scanType = SCAN_TYPE_TABLE;
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -366,6 +366,7 @@ static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
TSWAP(pPhysiNode->pLimit, pLogicNode->pLimit);
TSWAP(pPhysiNode->pSlimit, pLogicNode->pSlimit);
pPhysiNode->dynamicOp = pLogicNode->dynamicOp;
pPhysiNode->inputTsOrder = pLogicNode->inputTsOrder;
pPhysiNode->outputTsOrder = pLogicNode->outputTsOrder;
......
......@@ -1204,7 +1204,11 @@ static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubp
SNode* pChild = NULL;
FOREACH(pChild, pJoin->node.pChildren) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, false);
if (pJoin->node.dynamicOp) {
code = TSDB_CODE_SUCCESS;
} else {
code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, false);
}
} else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild);
} else {
......@@ -1220,7 +1224,9 @@ static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubp
static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode);
if (TSDB_CODE_SUCCESS == code) {
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
if (!pInfo->pSplitNode->dynamicOp) {
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
}
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
}
return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册