提交 85f87ae2 编写于 作者: X Xiaoyu Wang

bugfix

上级 f791aa3c
......@@ -89,6 +89,7 @@ tests/examples/JDBC/JDBCDemo/.project
tests/examples/JDBC/JDBCDemo/.settings/
source/libs/parser/inc/sql.*
tests/script/tmqResult.txt
tests/tmqResult.txt
# Emacs
# -*- mode: gitignore; -*-
......
......@@ -97,6 +97,7 @@ typedef struct SWindowLogicNode {
int8_t slidingUnit;
SFillNode* pFill;
int64_t sessionGap;
SNode* pTspk;
} SWindowLogicNode;
typedef struct SSortLogicNode {
......@@ -228,6 +229,7 @@ typedef struct SWinodwPhysiNode {
typedef struct SIntervalPhysiNode {
SWinodwPhysiNode window;
SNode* pTspk; // timestamp primary key
int64_t interval;
int64_t offset;
int64_t sliding;
......
......@@ -191,12 +191,13 @@ typedef struct SStateWindowNode {
typedef struct SSessionWindowNode {
ENodeType type; // QUERY_NODE_SESSION_WINDOW
SNode* pCol;
SNode* pCol; // timestamp primary key
SNode* pGap; // gap between two session window(in microseconds)
} SSessionWindowNode;
typedef struct SIntervalWindowNode {
ENodeType type; // QUERY_NODE_INTERVAL_WINDOW
SNode* pCol; // timestamp primary key
SNode* pInterval; // SValueNode
SNode* pOffset; // SValueNode
SNode* pSliding; // SValueNode
......
......@@ -278,16 +278,18 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo
}
static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(winType);
// COPY_SCALAR_FIELD(winType);
CLONE_NODE_LIST_FIELD(pFuncs);
COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding);
COPY_SCALAR_FIELD(intervalUnit);
COPY_SCALAR_FIELD(slidingUnit);
// COPY_SCALAR_FIELD(interval);
// COPY_SCALAR_FIELD(offset);
// COPY_SCALAR_FIELD(sliding);
// COPY_SCALAR_FIELD(intervalUnit);
// COPY_SCALAR_FIELD(slidingUnit);
CLONE_NODE_FIELD(pFill);
COPY_SCALAR_FIELD(sessionGap);
// COPY_SCALAR_FIELD(sessionGap);
CLONE_NODE_FIELD(pTspk);
return (SNode*)pDst;
}
......
......@@ -988,6 +988,7 @@ static const char* jkIntervalPhysiPlanSliding = "Sliding";
static const char* jkIntervalPhysiPlanIntervalUnit = "intervalUnit";
static const char* jkIntervalPhysiPlanSlidingUnit = "slidingUnit";
static const char* jkIntervalPhysiPlanFill = "Fill";
static const char* jkIntervalPhysiPlanTsPk = "TsPk";
static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj;
......@@ -1011,6 +1012,9 @@ static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkIntervalPhysiPlanTsPk, nodeToJson, pNode->pTspk);
}
return code;
}
......@@ -1037,6 +1041,9 @@ static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkIntervalPhysiPlanTsPk, (SNode**)&pNode->pTspk);
}
return code;
}
......
......@@ -99,6 +99,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
if (DEAL_RES_ERROR != res) {
res = walkNode(pInterval->pFill, order, walker, pContext);
}
if (DEAL_RES_ERROR != res) {
res = walkNode(pInterval->pCol, order, walker, pContext);
}
break;
}
case QUERY_NODE_NODE_LIST:
......@@ -225,6 +228,9 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
if (DEAL_RES_ERROR != res) {
res = rewriteNode(&(pInterval->pFill), order, rewriter, pContext);
}
if (DEAL_RES_ERROR != res) {
res = rewriteNode(&(pInterval->pCol), order, rewriter, pContext);
}
break;
}
case QUERY_NODE_NODE_LIST:
......
......@@ -30,6 +30,8 @@ extern "C" {
#define parserDebug(param, ...) qDebug("PARSER: " param, __VA_ARGS__)
#define parserTrace(param, ...) qTrace("PARSER: " param, __VA_ARGS__)
#define PK_TS_COL_INTERNAL_NAME "_rowts"
typedef struct SMsgBuf {
int32_t len;
char *buf;
......
......@@ -702,6 +702,13 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pCol) {
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, SNode* pFill) {
SIntervalWindowNode* interval = (SIntervalWindowNode*)nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW);
CHECK_OUT_OF_MEM(interval);
interval->pCol = nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == interval->pCol) {
nodesDestroyNode(interval);
CHECK_OUT_OF_MEM(interval->pCol);
}
((SColumnNode*)interval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(((SColumnNode*)interval->pCol)->colName, PK_TS_COL_INTERNAL_NAME);
interval->pInterval = pInterval;
interval->pOffset = pOffset;
interval->pSliding = pSliding;
......
......@@ -256,6 +256,10 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
bool found = false;
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta;
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME)) {
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema, false, pCol);
return true;
}
int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
for (int32_t i = 0; i < nums; ++i) {
if (0 == strcmp(pCol->colName, pMeta->schema[i].name)) {
......
......@@ -88,7 +88,7 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
}
static int32_t rewriteExpr(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
static int32_t rewriteId = 1;
static int32_t rewriteId = 1; // todo modify
SNameExprCxt nameCxt = { .rewriteId = rewriteId };
nodesWalkList(pExprs, doNameExpr, &nameCxt);
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
......@@ -461,6 +461,12 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval);
pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
pWindow->pTspk = nodesCloneNode(pInterval->pCol);
if (NULL == pWindow->pTspk) {
nodesDestroyNode(pWindow);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL != pInterval->pFill) {
pWindow->pFill = nodesCloneNode(pInterval->pFill);
if (NULL == pWindow->pFill) {
......
......@@ -17,9 +17,14 @@
#include "functionMgt.h"
typedef struct SSlotIdInfo {
int16_t slotId;
bool set;
} SSlotIdInfo;
typedef struct SSlotIndex {
int16_t dataBlockId;
int16_t slotId;
SArray* pSlotIdsInfo; // duplicate name slot
} SSlotIndex;
typedef struct SPhysiPlanContext {
......@@ -79,7 +84,19 @@ static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, S
}
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
SSlotIndex index = { .dataBlockId = dataBlockId, .slotId = slotId };
SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
if (NULL != pIndex) {
SSlotIdInfo info = { .slotId = slotId, .set = false };
taosArrayPush(pIndex->pSlotIdsInfo, &info);
return TSDB_CODE_SUCCESS;
}
SSlotIndex index = { .dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo)) };
if (NULL == index.pSlotIdsInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSlotIdInfo info = { .slotId = slotId, .set = false };
taosArrayPush(index.pSlotIdsInfo, &info);
return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
}
......@@ -90,7 +107,7 @@ static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode,
}
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId, SHashObj** pDescHash) {
SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pHash) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -149,6 +166,18 @@ static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SD
return code;
}
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
int32_t size = taosArrayGetSize(pSlotIdsInfo);
for (int32_t i = 0; i < size; ++i) {
SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
if (!pInfo->set) {
pInfo->set = true;
return pInfo->slotId;
}
}
return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
}
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, const char* pStmtName, bool output) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
......@@ -167,7 +196,7 @@ static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList,
slotId = nextSlotId;
++nextSlotId;
} else {
slotId = pIndex->slotId;
slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -217,7 +246,7 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
return DEAL_RES_ERROR;
}
((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
((SColumnNode*)pNode)->slotId = pIndex->slotId;
((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
......@@ -792,6 +821,13 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
return TSDB_CODE_OUT_OF_MEMORY;
}
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
int32_t code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pInterval->pTspk);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pInterval);
return code;
}
return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册