提交 974dcef7 编写于 作者: H Haojun Liao

other: merge 3.0

上级 998dedb9
......@@ -332,6 +332,9 @@ static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
COPY_SCALAR_FIELD(precision);
CLONE_NODE_FIELD(pLimit);
CLONE_NODE_FIELD(pSlimit);
COPY_SCALAR_FIELD(requireDataOrder);
COPY_SCALAR_FIELD(resultDataOrder);
COPY_SCALAR_FIELD(groupAction);
return TSDB_CODE_SUCCESS;
}
......
......@@ -504,6 +504,9 @@ static const char* jkLogicPlanConditions = "Conditions";
static const char* jkLogicPlanChildren = "Children";
static const char* jkLogicPlanLimit = "Limit";
static const char* jkLogicPlanSlimit = "SLimit";
static const char* jkLogicPlanRequireDataOrder = "RequireDataOrder";
static const char* jkLogicPlanResultDataOrder = "ResultDataOrder";
static const char* jkLogicPlanGroupAction = "GroupAction";
static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) {
const SLogicNode* pNode = (const SLogicNode*)pObj;
......@@ -521,6 +524,15 @@ static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkLogicPlanSlimit, nodeToJson, pNode->pSlimit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkLogicPlanRequireDataOrder, pNode->requireDataOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkLogicPlanResultDataOrder, pNode->resultDataOrder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkLogicPlanGroupAction, pNode->groupAction);
}
return code;
}
......@@ -541,6 +553,15 @@ static int32_t jsonToLogicPlanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkLogicPlanSlimit, &pNode->pSlimit);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkLogicPlanRequireDataOrder, pNode->requireDataOrder, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkLogicPlanResultDataOrder, pNode->resultDataOrder, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkLogicPlanGroupAction, pNode->groupAction, code);
}
return code;
}
......
......@@ -35,6 +35,7 @@ int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...);
int32_t createColumnByRewriteExprs(SNodeList* pExprs, SNodeList** pList);
int32_t createColumnByRewriteExpr(SNode* pExpr, SNodeList** pList);
int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew);
int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requirement);
int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan);
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
......
......@@ -250,6 +250,9 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
SScanLogicNode* pScan = NULL;
int32_t code = makeScanLogicNode(pCxt, pRealTable, pSelect->hasRepeatScanFuncs, (SLogicNode**)&pScan);
pScan->node.groupAction = GROUP_ACTION_NONE;
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK;
// set columns to scan
if (TSDB_CODE_SUCCESS == code) {
code = nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pRealTable->table.tableAlias, COLLECT_COL_TYPE_COL,
......@@ -336,6 +339,9 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pJoin->joinType = pJoinTable->joinType;
pJoin->isSingleTableJoin = pJoinTable->table.singleTable;
pJoin->node.groupAction = GROUP_ACTION_CLEAR;
pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -472,6 +478,9 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
}
pAgg->hasLastRow = pSelect->hasLastRowFunc;
pAgg->node.groupAction = GROUP_ACTION_SET;
pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -540,6 +549,10 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt
pIdfRowsFunc->isTailFunc = pSelect->hasTailFunc;
pIdfRowsFunc->isUniqueFunc = pSelect->hasUniqueFunc;
pIdfRowsFunc->isTimeLineFunc = pSelect->hasTimeLineFunc;
pIdfRowsFunc->node.groupAction = GROUP_ACTION_KEEP;
pIdfRowsFunc->node.requireDataOrder =
pIdfRowsFunc->isTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE;
pIdfRowsFunc->node.resultDataOrder = pIdfRowsFunc->node.requireDataOrder;
// indefinite rows functions and _select_values functions
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pFuncs);
......@@ -571,6 +584,10 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
return TSDB_CODE_OUT_OF_MEMORY;
}
pInterpFunc->node.groupAction = GROUP_ACTION_KEEP;
pInterpFunc->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pInterpFunc->node.resultDataOrder = pInterpFunc->node.requireDataOrder;
int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsInterpFunc, &pInterpFunc->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT);
......@@ -642,10 +659,12 @@ static int32_t createWindowLogicNodeByState(SLogicPlanContext* pCxt, SStateWindo
}
pWindow->winType = WINDOW_TYPE_STATE;
pWindow->node.groupAction = GROUP_ACTION_KEEP;
pWindow->node.requireDataOrder = pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : DATA_ORDER_LEVEL_IN_GROUP;
pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pWindow->pStateExpr = nodesCloneNode(pState->pExpr);
pWindow->pTspk = nodesCloneNode(pState->pCol);
if (NULL == pWindow->pTspk) {
if (NULL == pWindow->pStateExpr || NULL == pWindow->pTspk) {
nodesDestroyNode((SNode*)pWindow);
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -663,6 +682,9 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW
pWindow->winType = WINDOW_TYPE_SESSION;
pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i;
pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? SESSION_ALGO_STREAM_SINGLE : SESSION_ALGO_MERGE;
pWindow->node.groupAction = GROUP_ACTION_KEEP;
pWindow->node.requireDataOrder = pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : DATA_ORDER_LEVEL_IN_GROUP;
pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pWindow->pTspk = nodesCloneNode((SNode*)pSession->pCol);
if (NULL == pWindow->pTspk) {
......@@ -689,6 +711,9 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow->slidingUnit =
(NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH;
pWindow->node.groupAction = GROUP_ACTION_KEEP;
pWindow->node.requireDataOrder = DATA_ORDER_LEVEL_IN_BLOCK;
pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pWindow->pTspk = nodesCloneNode(pInterval->pCol);
if (NULL == pWindow->pTspk) {
......@@ -734,6 +759,10 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return TSDB_CODE_OUT_OF_MEMORY;
}
pFill->node.groupAction = GROUP_ACTION_KEEP;
pFill->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
pFill->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP;
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_WINDOW, NULL, COLLECT_COL_TYPE_ALL, &pFill->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pFill->node.pTargets) {
code = nodesListMakeStrictAppend(&pFill->node.pTargets,
......@@ -768,6 +797,9 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
}
pSort->groupSort = pSelect->groupSort;
pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pSort->node.resultDataOrder = pSort->groupSort ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_GLOBAL;
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, COLLECT_COL_TYPE_ALL, &pSort->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pSort->node.pTargets) {
......@@ -818,6 +850,9 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
TSWAP(pProject->node.pLimit, pSelect->pLimit);
TSWAP(pProject->node.pSlimit, pSelect->pSlimit);
pProject->node.groupAction = GROUP_ACTION_CLEAR;
pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pProject->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -850,6 +885,10 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
return TSDB_CODE_OUT_OF_MEMORY;
}
pPartition->node.groupAction = GROUP_ACTION_SET;
pPartition->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pPartition->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
int32_t code =
nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, NULL, COLLECT_COL_TYPE_ALL, &pPartition->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pPartition->node.pTargets) {
......@@ -882,11 +921,23 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
return TSDB_CODE_OUT_OF_MEMORY;
}
pAgg->node.groupAction = GROUP_ACTION_SET;
pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
int32_t code = TSDB_CODE_SUCCESS;
// set grouyp keys, agg funcs and having conditions
pAgg->pGroupKeys = nodesCloneList(pSelect->pProjectionList);
if (NULL == pAgg->pGroupKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
SNodeList* pGroupKeys = NULL;
SNode* pProjection = NULL;
FOREACH(pProjection, pSelect->pProjectionList) {
code = nodesListMakeStrictAppend(&pGroupKeys, createGroupingSetNode(pProjection));
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pGroupKeys);
break;
}
}
if (TSDB_CODE_SUCCESS == code) {
pAgg->pGroupKeys = pGroupKeys;
}
// rewrite the expression in subsequent clauses
......@@ -1361,6 +1412,7 @@ int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
if (TSDB_CODE_SUCCESS == code) {
setLogicNodeParent(pSubplan->pNode);
setLogicSubplanType(cxt.hasScan, pSubplan);
code = adjustLogicNodeDataRequirement(pSubplan->pNode, DATA_ORDER_LEVEL_NONE);
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -1579,6 +1579,34 @@ static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) {
return eliminateProjOptCheckProjColumnNames(pProjectNode);
}
typedef struct CheckNewChildTargetsCxt {
SNodeList* pNewChildTargets;
bool canUse;
} CheckNewChildTargetsCxt;
static EDealRes eliminateProjOptCanUseNewChildTargetsImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
CheckNewChildTargetsCxt* pCxt = pContext;
SNode* pTarget = NULL;
FOREACH(pTarget, pCxt->pNewChildTargets) {
if (!nodesEqualNode(pTarget, pNode)) {
pCxt->canUse = false;
return DEAL_RES_END;
}
}
}
return DEAL_RES_CONTINUE;
}
static bool eliminateProjOptCanUseNewChildTargets(SLogicNode* pChild, SNodeList* pNewChildTargets) {
if (NULL == pChild->pConditions) {
return true;
}
CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = true};
nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt);
return cxt.canUse;
}
static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan,
SProjectLogicNode* pProjectNode) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProjectNode->node.pChildren, 0);
......@@ -1594,8 +1622,13 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
}
}
}
nodesDestroyList(pChild->pTargets);
pChild->pTargets = pNewChildTargets;
if (eliminateProjOptCanUseNewChildTargets(pChild, pNewChildTargets)) {
nodesDestroyList(pChild->pTargets);
pChild->pTargets = pNewChildTargets;
} else {
nodesDestroyList(pNewChildTargets);
return TSDB_CODE_SUCCESS;
}
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild);
if (TSDB_CODE_SUCCESS == code) {
......@@ -1873,6 +1906,8 @@ static int32_t rewriteUniqueOptCreateAgg(SIndefRowsFuncLogicNode* pIndef, SLogic
TSWAP(pAgg->node.pChildren, pIndef->node.pChildren);
optResetParent((SLogicNode*)pAgg);
pAgg->node.precision = pIndef->node.precision;
pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_IN_BLOCK; // first function requirement
pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
int32_t code = TSDB_CODE_SUCCESS;
bool hasSelectPrimaryKey = false;
......@@ -1945,6 +1980,8 @@ static int32_t rewriteUniqueOptCreateProject(SIndefRowsFuncLogicNode* pIndef, SL
TSWAP(pProject->node.pTargets, pIndef->node.pTargets);
pProject->node.precision = pIndef->node.precision;
pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pProject->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
......@@ -1973,11 +2010,16 @@ static int32_t rewriteUniqueOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&pProject->pChildren, (SNode*)pAgg);
}
if (TSDB_CODE_SUCCESS == code) {
pAgg->pParent = pProject;
pAgg = NULL;
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pIndef, pProject);
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pIndef, pProject);
code = adjustLogicNodeDataRequirement(
pProject, NULL == pProject->pParent ? DATA_ORDER_LEVEL_NONE : pProject->pParent->requireDataOrder);
pProject = NULL;
}
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pIndef);
......@@ -2145,11 +2187,20 @@ static bool tagScanMayBeOptimized(SLogicNode* pNode) {
}
SAggLogicNode* pAgg = (SAggLogicNode*)(pNode->pParent);
if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs ||
planOptNodeListHasCol(pAgg->pGroupKeys) || !planOptNodeListHasTbname(pAgg->pGroupKeys)) {
if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || planOptNodeListHasCol(pAgg->pGroupKeys) ||
!planOptNodeListHasTbname(pAgg->pGroupKeys)) {
return false;
}
SNode* pGroupKey = NULL;
FOREACH(pGroupKey, pAgg->pGroupKeys) {
SNode* pGroup = NULL;
FOREACH(pGroup, ((SGroupingSetNode*)pGroupKey)->pParameterList) {
if (QUERY_NODE_COLUMN != nodeType(pGroup)) {
return false;
}
}
}
return true;
}
......@@ -2162,11 +2213,12 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
pScanNode->scanType = SCAN_TYPE_TAG;
SNode* pTarget = NULL;
FOREACH(pTarget, pScanNode->node.pTargets) {
if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)(pTarget))->colId) {
ERASE_NODE(pScanNode->node.pTargets);
break;
}
if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)(pTarget))->colId) {
ERASE_NODE(pScanNode->node.pTargets);
break;
}
}
NODES_DESTORY_LIST(pScanNode->pScanCols);
SLogicNode* pAgg = pScanNode->node.pParent;
......@@ -2176,8 +2228,8 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
SNode* pAggTarget = NULL;
FOREACH(pAggTarget, pAgg->pTargets) {
SNode* pScanTarget = NULL;
FOREACH(pScanTarget, pScanNode->node.pTargets) {
if (0 == strcmp( ((SColumnNode*)pAggTarget)->colName, ((SColumnNode*)pAggTarget)->colName )) {
FOREACH(pScanTarget, pScanNode->node.pTargets) {
if (0 == strcmp(((SColumnNode*)pAggTarget)->colName, ((SColumnNode*)pAggTarget)->colName)) {
nodesListAppend(pScanTargets, nodesCloneNode(pScanTarget));
break;
}
......
......@@ -974,6 +974,17 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
return code;
}
static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) {
if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) {
return true;
}
if (1 != LIST_LENGTH(pProject->node.pChildren)) {
return false;
}
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0);
return DATA_ORDER_LEVEL_GLOBAL == pChild->resultDataOrder ? true : false;
}
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
SProjectPhysiNode* pProject =
......@@ -982,6 +993,8 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
return TSDB_CODE_OUT_OF_MEMORY;
}
pProject->mergeDataBlock = projectCanMergeDataBlock(pProjectLogicNode);
int32_t code = TSDB_CODE_SUCCESS;
if (0 == LIST_LENGTH(pChildren)) {
pProject->pProjections = nodesCloneList(pProjectLogicNode->pProjections);
......
......@@ -657,6 +657,9 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn
return TSDB_CODE_SUCCESS;
}
if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_FILL == nodeType(pInfo->pSplitNode->pParent)) {
pInfo->pSplitNode = pInfo->pSplitNode->pParent;
}
SExchangeLogicNode* pExchange = NULL;
int32_t code = splCreateExchangeNode(pCxt, pInfo->pSplitNode, &pExchange);
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "functionMgt.h"
#include "planInt.h"
static char* getUsageErrFormat(int32_t errCode) {
......@@ -121,3 +122,185 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode*
}
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) {
if (SCAN_TYPE_TABLE != pScan->scanType || SCAN_TYPE_TABLE_MERGE != pScan->scanType) {
return TSDB_CODE_SUCCESS;
}
// The lowest sort level of scan output data is DATA_ORDER_LEVEL_IN_BLOCK
if (requirement < DATA_ORDER_LEVEL_IN_BLOCK) {
requirement = DATA_ORDER_LEVEL_IN_BLOCK;
}
if (DATA_ORDER_LEVEL_IN_BLOCK == requirement) {
pScan->scanType = SCAN_TYPE_TABLE;
} else if (TSDB_SUPER_TABLE == pScan->tableType) {
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
}
pScan->node.resultDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustJoinDataRequirement(SJoinLogicNode* pJoin, EDataOrderLevel requirement) {
// The lowest sort level of join input and output data is DATA_ORDER_LEVEL_GLOBAL
return TSDB_CODE_SUCCESS;
}
static bool isKeepOrderAggFunc(SNodeList* pFuncs) {
SNode* pFunc = NULL;
FOREACH(pFunc, pFuncs) {
if (!fmIsKeepOrderFunc(((SFunctionNode*)pFunc)->funcId)) {
return false;
}
}
return true;
}
static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel requirement) {
// The sort level of agg with group by output data can only be DATA_ORDER_LEVEL_NONE
if (requirement > DATA_ORDER_LEVEL_NONE && (NULL != pAgg->pGroupKeys || !isKeepOrderAggFunc(pAgg->pAggFuncs))) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
pAgg->node.resultDataOrder = requirement;
pAgg->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustProjectDataRequirement(SProjectLogicNode* pProject, EDataOrderLevel requirement) {
pProject->node.resultDataOrder = requirement;
pProject->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustIntervalDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
// The lowest sort level of interval output data is DATA_ORDER_LEVEL_IN_GROUP
if (requirement < DATA_ORDER_LEVEL_IN_GROUP) {
requirement = DATA_ORDER_LEVEL_IN_GROUP;
}
// The sort level of interval input data is always DATA_ORDER_LEVEL_IN_BLOCK
pWindow->node.resultDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustSessionDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
if (requirement <= pWindow->node.resultDataOrder) {
return TSDB_CODE_SUCCESS;
}
pWindow->node.resultDataOrder = requirement;
pWindow->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustStateDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
if (requirement <= pWindow->node.resultDataOrder) {
return TSDB_CODE_SUCCESS;
}
pWindow->node.resultDataOrder = requirement;
pWindow->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
switch (pWindow->winType) {
case WINDOW_TYPE_INTERVAL:
return adjustIntervalDataRequirement(pWindow, requirement);
case WINDOW_TYPE_SESSION:
return adjustSessionDataRequirement(pWindow, requirement);
case WINDOW_TYPE_STATE:
return adjustStateDataRequirement(pWindow, requirement);
default:
break;
}
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t adjustFillDataRequirement(SFillLogicNode* pFill, EDataOrderLevel requirement) {
if (requirement <= pFill->node.requireDataOrder) {
return TSDB_CODE_SUCCESS;
}
pFill->node.resultDataOrder = requirement;
pFill->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustSortDataRequirement(SSortLogicNode* pSort, EDataOrderLevel requirement) {
return TSDB_CODE_SUCCESS;
}
static int32_t adjustPartitionDataRequirement(SPartitionLogicNode* pPart, EDataOrderLevel requirement) {
if (DATA_ORDER_LEVEL_GLOBAL == requirement) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
pPart->node.resultDataOrder = requirement;
pPart->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustIndefRowsDataRequirement(SIndefRowsFuncLogicNode* pIndef, EDataOrderLevel requirement) {
if (requirement <= pIndef->node.resultDataOrder) {
return TSDB_CODE_SUCCESS;
}
pIndef->node.resultDataOrder = requirement;
pIndef->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustInterpDataRequirement(SInterpFuncLogicNode* pInterp, EDataOrderLevel requirement) {
if (requirement <= pInterp->node.requireDataOrder) {
return TSDB_CODE_SUCCESS;
}
pInterp->node.resultDataOrder = requirement;
pInterp->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requirement) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
code = adjustScanDataRequirement((SScanLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_JOIN:
code = adjustJoinDataRequirement((SJoinLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_AGG:
code = adjustAggDataRequirement((SAggLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_PROJECT:
code = adjustProjectDataRequirement((SProjectLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY:
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
case QUERY_NODE_LOGIC_PLAN_MERGE:
break;
case QUERY_NODE_LOGIC_PLAN_WINDOW:
code = adjustWindowDataRequirement((SWindowLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_FILL:
code = adjustFillDataRequirement((SFillLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_SORT:
code = adjustSortDataRequirement((SSortLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_PARTITION:
code = adjustPartitionDataRequirement((SPartitionLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
code = adjustIndefRowsDataRequirement((SIndefRowsFuncLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
code = adjustInterpDataRequirement((SInterpFuncLogicNode*)pNode, requirement);
break;
default:
break;
}
if (TSDB_CODE_SUCCESS == code) {
SNode* pChild = NULL;
FOREACH(pChild, pNode->pChildren) {
code = adjustLogicNodeDataRequirement((SLogicNode*)pChild, pNode->requireDataOrder);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
}
return code;
}
......@@ -38,9 +38,15 @@ TEST_F(PlanIntervalTest, fill) {
run("SELECT COUNT(*) FROM t1 WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' "
"INTERVAL(10s) FILL(LINEAR)");
run("SELECT COUNT(*) FROM st1 WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' "
"INTERVAL(10s) FILL(LINEAR)");
run("SELECT COUNT(*), SUM(c1) FROM t1 "
"WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' "
"INTERVAL(10s) FILL(VALUE, 10, 20)");
run("SELECT COUNT(*) FROM st1 WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' "
"PARTITION BY TBNAME interval(10s) fill(prev)");
}
TEST_F(PlanIntervalTest, selectFunc) {
......
......@@ -48,10 +48,28 @@ TEST_F(PlanSubqeuryTest, doubleGroupBy) {
"WHERE a > 100 GROUP BY b");
}
TEST_F(PlanSubqeuryTest, withSetOperator) {
TEST_F(PlanSubqeuryTest, innerSetOperator) {
useDb("root", "test");
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)");
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION SELECT c1 FROM t1)");
}
TEST_F(PlanSubqeuryTest, innerFill) {
useDb("root", "test");
run("SELECT cnt FROM (SELECT _WSTART ts, COUNT(*) cnt FROM t1 "
"WHERE ts > '2022-04-01 00:00:00' and ts < '2022-04-30 23:59:59' INTERVAL(10s) FILL(LINEAR)) "
"WHERE ts > '2022-04-06 00:00:00'");
}
TEST_F(PlanSubqeuryTest, outerInterval) {
useDb("root", "test");
run("SELECT COUNT(*) FROM (SELECT * FROM st1) INTERVAL(5s)");
run("SELECT COUNT(*) + SUM(c1) FROM (SELECT * FROM st1) INTERVAL(5s)");
run("SELECT COUNT(*) FROM (SELECT ts, TOP(c1, 10) FROM st1s1) INTERVAL(5s)");
}
......@@ -34,6 +34,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
pDataBlock->info.version = be64toh(pRetrieve->version);
pDataBlock->info.type = pRetrieve->streamBlockType;
pDataBlock->info.childId = pReq->upstreamChildId;
......@@ -54,6 +55,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
pDataBlock->info.version = be64toh(pRetrieve->version);
pDataBlock->info.type = pRetrieve->streamBlockType;
......
......@@ -108,6 +108,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
pRetrieve->numOfCols = htonl(numOfCols);
pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version);
int32_t actualLen = 0;
blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false);
......@@ -182,6 +183,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->numOfRows = htonl(pBlock->info.rows);
pRetrieve->skey = htobe64(pBlock->info.window.skey);
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version);
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols);
......
......@@ -159,6 +159,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (data == NULL) {
data = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->execType == TASK_EXEC__NONE) break;
/*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
/*}*/
......
......@@ -1550,12 +1550,12 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
char logBuf[256 + 256];
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(logBuf, sizeof(logBuf),
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64
", snapshot:%" PRId64 ", snapshot-term:%" PRIu64
", standby:%d, "
"strategy:%d, batch:%d, "
"replica-num:%d, "
"lconfig:%" PRId64 ", changing:%d, restore:%d, %s",
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64
", sby:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
......@@ -1573,12 +1573,12 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
char* s = (char*)taosMemoryMalloc(len);
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(s, len,
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64
", snapshot:%" PRId64 ", snapshot-term:%" PRIu64
", standby:%d, "
"strategy:%d, batch:%d, "
"replica-num:%d, "
"lconfig:%" PRId64 ", changing:%d, restore:%d, %s",
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64
", sby:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
......@@ -1621,12 +1621,12 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
char logBuf[256 + 256];
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(logBuf, sizeof(logBuf),
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64
", snapshot:%" PRId64 ", snapshot-term:%" PRIu64
", standby:%d, "
"strategy:%d, batch:%d, "
"replica-num:%d, "
"lconfig:%" PRId64 ", changing:%d, restore:%d, %s",
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64
", sby:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
......@@ -1642,12 +1642,12 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
char* s = (char*)taosMemoryMalloc(len);
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(s, len,
"vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64
", snapshot:%" PRId64 ", snapshot-term:%" PRIu64
", standby:%d, "
"strategy:%d, batch:%d, "
"replica-num:%d, "
"lconfig:%" PRId64 ", changing:%d, restore:%d, %s",
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64
", sby:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
......@@ -1675,11 +1675,10 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
snprintf(s, len,
"vgId:%d, sync %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64
", snapshot:%" PRId64
", standby:%d, "
"replica-num:%d, "
"lconfig:%" PRId64 ", changing:%d, restore:%d",
"vgId:%d, sync %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
", sby:%d, "
"r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish);
......@@ -2977,7 +2976,7 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64
", pterm:%" PRIu64 ", cmt:%" PRId64
", "
"datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
......@@ -2992,7 +2991,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:%" PRIu64 ", pre-index:%" PRIu64 ", pre-term:%" PRIu64
", commit:%" PRIu64 ", pterm:%" PRIu64
", cmt:%" PRIu64 ", pterm:%" PRIu64
", "
"datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
......@@ -3007,7 +3006,7 @@ void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s",
", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, pMsg->dataCount, s);
syncNodeEventLog(pSyncNode, logBuf);
......@@ -3020,7 +3019,7 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch from %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s",
", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, pMsg->dataCount, s);
syncNodeEventLog(pSyncNode, logBuf);
......
......@@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -109,10 +109,10 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) {
int32_t len = 512;
char * s = taosMemoryMalloc(len);
char *s = taosMemoryMalloc(len);
memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
snprintf(s, len, "{r-num:%d, my:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
char *p = s + strlen(s);
for (int i = 0; i < pSyncCfg->replicaNum; ++i) {
/*
......@@ -206,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -285,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
}
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0);
......
......@@ -229,8 +229,8 @@ typedef struct {
int8_t stop;
} SAsyncPool;
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool);
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transAsyncPoolDestroy(SAsyncPool* pool);
int transAsyncSend(SAsyncPool* pool, queue* mq);
bool transAsyncPoolIsEmpty(SAsyncPool* pool);
......@@ -322,7 +322,7 @@ typedef struct STransReq {
} STransReq;
void transReqQueueInit(queue* q);
void* transReqQueuePushReq(queue* q);
void* transReqQueuePush(queue* q);
void* transReqQueueRemove(void* arg);
void transReqQueueClear(queue* q);
......@@ -393,9 +393,9 @@ typedef struct SDelayQueue {
uv_loop_t* loop;
} SDelayQueue;
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
/*
......
......@@ -26,7 +26,7 @@ typedef struct SCliConn {
SConnBuffer readBuf;
STransQueue cliMsgs;
queue conn;
queue q;
uint64_t expireTime;
STransCtx ctx;
......@@ -451,7 +451,7 @@ void cliTimeoutCb(uv_timer_t* handle) {
while (p != NULL) {
while (!QUEUE_IS_EMPTY(&p->conn)) {
queue* h = QUEUE_HEAD(&p->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, q);
if (c->expireTime < currentTime) {
QUEUE_REMOVE(h);
transUnrefCliHandle(c);
......@@ -475,7 +475,7 @@ void* destroyConnPool(void* pool) {
while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conn)) {
queue* h = QUEUE_HEAD(&connList->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, q);
cliDestroyConn(c, true);
}
connList = taosHashIterate((SHashObj*)pool, connList);
......@@ -501,11 +501,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
return NULL;
}
queue* h = QUEUE_HEAD(&plist->conn);
SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal;
QUEUE_REMOVE(&conn->conn);
QUEUE_INIT(&conn->conn);
assert(h == &conn->conn);
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
assert(h == &conn->q);
return conn;
}
static int32_t allocConnRef(SCliConn* conn, bool update) {
......@@ -560,8 +560,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
// list already create before
assert(plist != NULL);
QUEUE_INIT(&conn->conn);
QUEUE_PUSH(&plist->conn, &conn->conn);
QUEUE_INIT(&conn->q);
QUEUE_PUSH(&plist->conn, &conn->q);
assert(!QUEUE_IS_EMPTY(&plist->conn));
}
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
......@@ -614,7 +614,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
transReqQueueInit(&conn->wreqQueue);
transQueueInit(&conn->cliMsgs, NULL);
QUEUE_INIT(&conn->conn);
QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd;
conn->status = ConnNormal;
conn->broken = 0;
......@@ -626,8 +626,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
}
static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn);
QUEUE_INIT(&conn->conn);
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
......@@ -735,7 +735,7 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP(pConn);
}
uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return;
_RETURN:
......@@ -990,7 +990,7 @@ static SCliThrd* createThrdObj() {
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
uv_loop_init(pThrd->loop);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb);
uv_timer_init(pThrd->loop, &pThrd->timer);
pThrd->timer.data = pThrd;
......@@ -1009,7 +1009,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
CLI_RELEASE_UV(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
transDestroyAsyncPool(pThrd->asyncPool);
transAsyncPoolDestroy(pThrd->asyncPool);
transDQDestroy(pThrd->delayQueue, destroyCmsg);
taosMemoryFree(pThrd->loop);
......@@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) {
cliHandleReq(pMsg, pThrd);
}
static void doCloseIdleConn(void* param) {
STaskArg* arg = param;
SCliConn* conn = arg->param1;
SCliThrd* pThrd = arg->param2;
}
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx;
......@@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
}
}
bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) {
bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
if ((pResp == NULL || pResp->info.hasEpSet == 0)) {
return false;
}
......@@ -1116,7 +1122,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
*/
STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code;
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
if (retry) {
pMsg->sent = 0;
pCtx->retryCnt += 1;
......@@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pCtx->retryCnt < pCtx->retryLimit) {
transUnrefCliHandle(pConn);
EPSET_FORWARD_INUSE(&pCtx->epSet);
transFreeMsg(pResp->pCont);
cliSchedMsgToNextNode(pMsg, pThrd);
return -1;
}
......@@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STraceId* trace = &pResp->info.traceId;
bool hasEpSet = cliTryToExtractEpSet(pResp, &pCtx->epSet);
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
if (hasEpSet) {
char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
......@@ -1336,19 +1344,18 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
if (0 != transAsyncSend(pThrd->asyncPool, &cliMsg->q)) {
tsem_destroy(sem);
taosMemoryFree(sem);
int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q);
if (ret != 0) {
destroyCmsg(cliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return -1;
goto _RETURN;
}
tsem_wait(sem);
_RETURN:
tsem_destroy(sem);
taosMemoryFree(sem);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0;
return ret;
}
/*
*
......
......@@ -175,7 +175,7 @@ int transSetConnOption(uv_tcp_t* stream) {
return ret;
}
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
pool->nAsync = sz;
pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
......@@ -194,7 +194,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
return pool;
}
void transDestroyAsyncPool(SAsyncPool* pool) {
void transAsyncPoolDestroy(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
// uv_close((uv_handle_t*)async, NULL);
......@@ -205,6 +205,14 @@ void transDestroyAsyncPool(SAsyncPool* pool) {
taosMemoryFree(pool->asyncs);
taosMemoryFree(pool);
}
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
}
return true;
}
int transAsyncSend(SAsyncPool* pool, queue* q) {
if (atomic_load_8(&pool->stop) == 1) {
return -1;
......@@ -228,14 +236,6 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
}
return uv_async_send(async);
}
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
}
return true;
}
void transCtxInit(STransCtx* ctx) {
// init transCtx
......@@ -308,7 +308,7 @@ void transReqQueueInit(queue* q) {
// init req queue
QUEUE_INIT(q);
}
void* transReqQueuePushReq(queue* q) {
void* transReqQueuePush(queue* q) {
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq));
wreq->data = req;
......@@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
heapDestroy(queue->heap);
taosMemoryFree(queue);
}
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
uv_timer_stop(queue->timer);
if (heapSize(queue->heap) <= 0) return;
heapRemove(queue->heap, &task->node);
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
if (heapSize(queue->heap) != 0) {
HeapNode* minNode = heapMin(queue->heap);
if (minNode != NULL) return;
uint64_t now = taosGetTimestampMs();
SDelayTask* task = container_of(minNode, SDelayTask, node);
uint64_t timeout = now > task->execTime ? now - task->execTime : 0;
uv_timer_start(queue->timer, transDQTimeout, timeout, 0);
}
}
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) {
uint64_t now = taosGetTimestampMs();
SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask));
task->func = func;
......@@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_
tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs);
heapInsert(queue->heap, &task->node);
uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0);
return 0;
return task;
}
void transPrintEpSet(SEpSet* pEpSet) {
......
......@@ -434,7 +434,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
uvPrepareSendData(smsg, &wb);
transRefSrvHandle(pConn);
uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
}
static void uvStartSendResp(SSvrMsg* smsg) {
......@@ -697,7 +697,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
// conn set
QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 1, pThrd, uvWorkerAsyncCb);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 1, pThrd, uvWorkerAsyncCb);
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true;
......@@ -976,7 +976,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
taosThreadJoin(pThrd->thread, NULL);
SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
transDestroyAsyncPool(pThrd->asyncPool);
transAsyncPoolDestroy(pThrd->asyncPool);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册