提交 e30d114c 编写于 作者: X Xiaoyu Wang

set datablock desc timestamp precision

上级 cd75d112
......@@ -84,6 +84,7 @@ typedef struct SVnodeModifLogicNode {
typedef struct SExchangeLogicNode {
SLogicNode node;
int32_t srcGroupId;
uint8_t precision;
} SExchangeLogicNode;
typedef enum EWindowType {
......@@ -163,7 +164,7 @@ typedef struct SDataBlockDescNode {
SNodeList* pSlots;
int32_t totalRowSize;
int32_t outputRowSize;
int16_t precision;
uint8_t precision;
} SDataBlockDescNode;
typedef struct SPhysiNode {
......@@ -253,11 +254,11 @@ typedef struct SWinodwPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of parameter expression of function
SNodeList* pFuncs;
SNode* pTspk; // timestamp primary key
} SWinodwPhysiNode;
typedef struct SIntervalPhysiNode {
SWinodwPhysiNode window;
SNode* pTspk; // timestamp primary key
int64_t interval;
int64_t offset;
int64_t sliding;
......@@ -274,7 +275,7 @@ typedef struct SMultiTableIntervalPhysiNode {
typedef struct SSessionWinodwPhysiNode {
SWinodwPhysiNode window;
int64_t gap;
int64_t gap;
} SSessionWinodwPhysiNode;
typedef struct SStateWinodwPhysiNode {
......
......@@ -641,7 +641,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{
SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, nodesGetNameFromColumnNode(pIntNode->pTspk));
EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, nodesGetNameFromColumnNode(pIntNode->window.pTspk));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
......
......@@ -6916,7 +6916,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.precision = pIntervalPhyNode->precision
};
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->pTspk)->slotId;
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId;
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, primaryTsSlotId, pTableGroupInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
......
......@@ -65,7 +65,7 @@ const char* nodesNodeName(ENodeType type) {
case QUERY_NODE_TARGET:
return "Target";
case QUERY_NODE_DATABLOCK_DESC:
return "TupleDesc";
return "DataBlockDesc";
case QUERY_NODE_SLOT_DESC:
return "SlotDesc";
case QUERY_NODE_COLUMN_DEF:
......@@ -1060,6 +1060,7 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
static const char* jkWindowPhysiPlanExprs = "Exprs";
static const char* jkWindowPhysiPlanFuncs = "Funcs";
static const char* jkWindowPhysiPlanTsPk = "TsPk";
static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj;
......@@ -1071,6 +1072,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkWindowPhysiPlanFuncs, pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkWindowPhysiPlanTsPk, nodeToJson, pNode->pTspk);
}
return code;
}
......@@ -1085,6 +1089,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkWindowPhysiPlanFuncs, &pNode->pFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsPk, (SNode**)&pNode->pTspk);
}
return code;
}
......@@ -1095,7 +1102,6 @@ static const char* jkIntervalPhysiPlanSliding = "Sliding";
static const char* jkIntervalPhysiPlanIntervalUnit = "intervalUnit";
static const char* jkIntervalPhysiPlanSlidingUnit = "slidingUnit";
static const char* jkIntervalPhysiPlanFill = "Fill";
static const char* jkIntervalPhysiPlanTsPk = "TsPk";
static const char* jkIntervalPhysiPlanPrecision = "Precision";
static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
......@@ -1120,9 +1126,6 @@ static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkIntervalPhysiPlanTsPk, nodeToJson, pNode->pTspk);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanPrecision, pNode->precision);
}
......@@ -1152,9 +1155,6 @@ static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkIntervalPhysiPlanTsPk, (SNode**)&pNode->pTspk);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUTinyIntValue(pJson, jkIntervalPhysiPlanPrecision, &pNode->precision);
}
......@@ -2291,7 +2291,7 @@ static int32_t jsonToDataBlockDescNode(const SJson* pJson, void* pObj) {
code = jsonToNodeList(pJson, jkDataBlockDescSlots, &pNode->pSlots);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetSmallIntValue(pJson, jkDataBlockPrecision, &pNode->precision);
code = tjsonGetUTinyIntValue(pJson, jkDataBlockPrecision, &pNode->precision);
}
return code;
......
......@@ -252,6 +252,7 @@ static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) {
destroyPhysiNode((SPhysiNode*)pNode);
nodesDestroyList(pNode->pExprs);
nodesDestroyList(pNode->pFuncs);
nodesDestroyNode(pNode->pTspk);
}
static void destroyScanPhysiNode(SScanPhysiNode* pNode) {
......@@ -593,7 +594,6 @@ void nodesDestroyNode(SNodeptr pNode) {
SIntervalPhysiNode* pPhyNode = (SIntervalPhysiNode*)pNode;
destroyWinodwPhysiNode((SWinodwPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pFill);
nodesDestroyNode(pPhyNode->pTspk);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
......
......@@ -316,7 +316,18 @@ static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, i
return TSDB_CODE_SUCCESS;
}
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) {
static uint8_t getPrecision(SNodeList* pChildren) {
if (1 == LIST_LENGTH(pChildren)) {
return (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc)->precision;
} else if (2 == LIST_LENGTH(pChildren)) {
uint8_t lp = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc)->precision;
uint8_t rp = (((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc)->precision;
return (lp > rp ? rp : lp);
}
return 0;
}
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, uint8_t precision, SLogicNode* pLogicNode, ENodeType type) {
SPhysiNode* pPhysiNode = (SPhysiNode*)nodesMakeNode(type);
if (NULL == pPhysiNode) {
return NULL;
......@@ -327,6 +338,7 @@ static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
nodesDestroyNode(pPhysiNode);
return NULL;
}
pPhysiNode->pOutputDataBlockDesc->precision = precision;
return pPhysiNode;
}
......@@ -405,7 +417,7 @@ static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAdd
}
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
if (NULL == pTagScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -413,7 +425,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* p
}
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
if (NULL == pTableScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -430,7 +442,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
}
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -451,7 +463,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
}
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, pScanLogicNode->pMeta->tableInfo.precision, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -515,7 +527,7 @@ static int32_t createJoinOutputCols(SPhysiPlanContext* pCxt, SDataBlockDescNode*
}
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) {
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_JOIN);
SJoinPhysiNode* pJoin = (SJoinPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_JOIN);
if (NULL == pJoin) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -656,7 +668,7 @@ static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeLi
}
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode, SPhysiNode** pPhyNode) {
SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_AGG);
SAggPhysiNode* pAgg = (SAggPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_AGG);
if (NULL == pAgg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -710,7 +722,7 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
}
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
if (NULL == pProject) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -738,18 +750,18 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
}
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
*pPhyNode = (SPhysiNode*)pExchange;
return TSDB_CODE_SUCCESS;
}
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode, SPhysiNode** pPhyNode) {
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
SStreamScanPhysiNode* pScan = (SStreamScanPhysiNode*)makePhysiNode(pCxt, pExchangeLogicNode->precision, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -803,6 +815,10 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
}
if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
......@@ -820,7 +836,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
}
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERVAL);
SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERVAL);
if (NULL == pInterval) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -838,18 +854,11 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
return TSDB_CODE_OUT_OF_MEMORY;
}
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
int32_t code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pInterval->pTspk);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pInterval);
return code;
}
return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode);
}
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW);
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW);
if (NULL == pSession) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -860,7 +869,7 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
}
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW);
SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW);
if (NULL == pState) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -908,7 +917,7 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr
}
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode, SPhysiNode** pPhyNode) {
SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pSortLogicNode, QUERY_NODE_PHYSICAL_PLAN_SORT);
SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pSortLogicNode, QUERY_NODE_PHYSICAL_PLAN_SORT);
if (NULL == pSort) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -947,7 +956,7 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
}
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION);
SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(pCxt, getPrecision(pChildren), (SLogicNode*)pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION);
if (NULL == pPart) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -103,6 +103,7 @@ static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pCxt->groupId;
pExchange->precision = pScan->pMeta->tableInfo.precision;
pExchange->node.pTargets = nodesCloneList(pScan->node.pTargets);
if (NULL == pExchange->node.pTargets) {
return TSDB_CODE_OUT_OF_MEMORY;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册