提交 e2586979 编写于 作者: X Xiaoyu Wang

enh: the delete physical plan increases the timestamp interval of the actual deleted data

上级 1479bc8a
......@@ -151,6 +151,8 @@ typedef struct SVnodeModifyLogicNode {
SArray* pDataBlocks;
SVgDataBlocks* pVgDataBlocks;
SNode* pAffectedRows; // SColumnNode
SNode* pStartTs; // SColumnNode
SNode* pEndTs; // SColumnNode
uint64_t tableId;
uint64_t stableId;
int8_t tableType; // table type
......@@ -525,6 +527,8 @@ typedef struct SDataDeleterNode {
char tsColName[TSDB_COL_NAME_LEN];
STimeWindow deleteTimeRange;
SNode* pAffectedRows;
SNode* pStartTs;
SNode* pEndTs;
} SDataDeleterNode;
typedef struct SSubplan {
......
......@@ -315,6 +315,8 @@ typedef struct SDeleteStmt {
SNode* pFromTable; // FROM clause
SNode* pWhere; // WHERE clause
SNode* pCountFunc; // count the number of rows affected
SNode* pFirstFunc; // the start timestamp when the data was actually deleted
SNode* pLastFunc; // the end timestamp when the data was actually deleted
SNode* pTagCond; // pWhere divided into pTagCond and timeRange
STimeWindow timeRange;
uint8_t precision;
......
......@@ -399,6 +399,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
COPY_SCALAR_FIELD(modifyType);
COPY_SCALAR_FIELD(msgType);
CLONE_NODE_FIELD(pAffectedRows);
CLONE_NODE_FIELD(pStartTs);
CLONE_NODE_FIELD(pEndTs);
COPY_SCALAR_FIELD(tableId);
COPY_SCALAR_FIELD(stableId);
COPY_SCALAR_FIELD(tableType);
......
......@@ -2431,6 +2431,8 @@ static const char* jkDeletePhysiPlanTsColName = "TsColName";
static const char* jkDeletePhysiPlanDeleteTimeRangeStartKey = "DeleteTimeRangeStartKey";
static const char* jkDeletePhysiPlanDeleteTimeRangeEndKey = "DeleteTimeRangeEndKey";
static const char* jkDeletePhysiPlanAffectedRows = "AffectedRows";
static const char* jkDeletePhysiPlanStartTs = "StartTs";
static const char* jkDeletePhysiPlanEndTs = "EndTs";
static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) {
const SDataDeleterNode* pNode = (const SDataDeleterNode*)pObj;
......@@ -2457,6 +2459,12 @@ static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkDeletePhysiPlanAffectedRows, nodeToJson, pNode->pAffectedRows);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkDeletePhysiPlanStartTs, nodeToJson, pNode->pStartTs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkDeletePhysiPlanEndTs, nodeToJson, pNode->pEndTs);
}
return code;
}
......@@ -2486,6 +2494,12 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkDeletePhysiPlanAffectedRows, &pNode->pAffectedRows);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkDeletePhysiPlanStartTs, &pNode->pStartTs);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkDeletePhysiPlanEndTs, &pNode->pEndTs);
}
return code;
}
......
......@@ -2665,7 +2665,9 @@ enum {
PHY_DELETER_CODE_TABLE_FNAME,
PHY_DELETER_CODE_TS_COL_NAME,
PHY_DELETER_CODE_DELETE_TIME_RANGE,
PHY_DELETER_CODE_AFFECTED_ROWS
PHY_DELETER_CODE_AFFECTED_ROWS,
PHY_DELETER_CODE_START_TS,
PHY_DELETER_CODE_END_TS
};
static int32_t physiDeleteNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
......@@ -2690,6 +2692,12 @@ static int32_t physiDeleteNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_AFFECTED_ROWS, nodeToMsg, pNode->pAffectedRows);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_START_TS, nodeToMsg, pNode->pStartTs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_END_TS, nodeToMsg, pNode->pEndTs);
}
return code;
}
......@@ -2722,6 +2730,12 @@ static int32_t msgToPhysiDeleteNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_DELETER_CODE_AFFECTED_ROWS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pAffectedRows);
break;
case PHY_DELETER_CODE_START_TS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pStartTs);
break;
case PHY_DELETER_CODE_END_TS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEndTs);
break;
default:
break;
}
......
......@@ -727,6 +727,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pFromTable);
nodesDestroyNode(pStmt->pWhere);
nodesDestroyNode(pStmt->pCountFunc);
nodesDestroyNode(pStmt->pFirstFunc);
nodesDestroyNode(pStmt->pLastFunc);
nodesDestroyNode(pStmt->pTagCond);
break;
}
......@@ -791,6 +793,8 @@ void nodesDestroyNode(SNode* pNode) {
destroyVgDataBlockArray(pLogicNode->pDataBlocks);
// pVgDataBlocks is weak reference
nodesDestroyNode(pLogicNode->pAffectedRows);
nodesDestroyNode(pLogicNode->pStartTs);
nodesDestroyNode(pLogicNode->pEndTs);
taosMemoryFreeClear(pLogicNode->pVgroupList);
nodesDestroyList(pLogicNode->pInsertCols);
break;
......@@ -997,6 +1001,8 @@ void nodesDestroyNode(SNode* pNode) {
SDataDeleterNode* pSink = (SDataDeleterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
nodesDestroyNode(pSink->pAffectedRows);
nodesDestroyNode(pSink->pStartTs);
nodesDestroyNode(pSink->pEndTs);
break;
}
case QUERY_NODE_PHYSICAL_SUBPLAN: {
......
......@@ -1787,10 +1787,10 @@ SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDb
return (SNode*)pStmt;
}
SNode* createCountFuncForDelete(SAstCreateContext* pCxt) {
SNode* createFuncForDelete(SAstCreateContext* pCxt, const char* pFuncName) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
CHECK_OUT_OF_MEM(pFunc);
strcpy(pFunc->functionName, "count");
strcpy(pFunc->functionName, pFuncName);
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pFunc->pParameterList, createPrimaryKeyCol(pCxt, NULL))) {
nodesDestroyNode((SNode*)pFunc);
CHECK_OUT_OF_MEM(NULL);
......@@ -1804,8 +1804,10 @@ SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere) {
CHECK_OUT_OF_MEM(pStmt);
pStmt->pFromTable = pTable;
pStmt->pWhere = pWhere;
pStmt->pCountFunc = createCountFuncForDelete(pCxt);
if (NULL == pStmt->pCountFunc) {
pStmt->pCountFunc = createFuncForDelete(pCxt, "count");
pStmt->pFirstFunc = createFuncForDelete(pCxt, "first");
pStmt->pLastFunc = createFuncForDelete(pCxt, "last");
if (NULL == pStmt->pCountFunc || NULL == pStmt->pFirstFunc || NULL == pStmt->pLastFunc) {
nodesDestroyNode((SNode*)pStmt);
CHECK_OUT_OF_MEM(NULL);
}
......
......@@ -3347,10 +3347,16 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
if (TSDB_CODE_SUCCESS == code) {
code = translateDeleteWhere(pCxt, pDelete);
}
pCxt->currClause = SQL_CLAUSE_SELECT;
if (TSDB_CODE_SUCCESS == code) {
pCxt->currClause = SQL_CLAUSE_SELECT;
code = translateExpr(pCxt, &pDelete->pCountFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, &pDelete->pFirstFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, &pDelete->pLastFunc);
}
return code;
}
......
......@@ -1372,9 +1372,21 @@ static int32_t createDeleteAggLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* pD
}
int32_t code = nodesListMakeStrictAppend(&pAgg->pAggFuncs, nodesCloneNode(pDelete->pCountFunc));
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pAgg->pAggFuncs, nodesCloneNode(pDelete->pFirstFunc));
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pAgg->pAggFuncs, nodesCloneNode(pDelete->pLastFunc));
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pAggFuncs, &pDelete->pCountFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pAggFuncs, &pDelete->pFirstFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pAggFuncs, &pDelete->pLastFunc);
}
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets);
......@@ -1405,7 +1417,9 @@ static int32_t createVnodeModifLogicNodeByDelete(SLogicPlanContext* pCxt, SDelet
strcpy(pModify->tsColName, pRealTable->pMeta->schema->name);
pModify->deleteTimeRange = pDelete->timeRange;
pModify->pAffectedRows = nodesCloneNode(pDelete->pCountFunc);
if (NULL == pModify->pAffectedRows) {
pModify->pStartTs = nodesCloneNode(pDelete->pFirstFunc);
pModify->pEndTs = nodesCloneNode(pDelete->pLastFunc);
if (NULL == pModify->pAffectedRows || NULL == pModify->pStartTs || NULL == pModify->pEndTs) {
nodesDestroyNode((SNode*)pModify);
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -1323,9 +1323,9 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
SPartitionPhysiNode* pPart =
(SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode,
pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION : QUERY_NODE_PHYSICAL_PLAN_PARTITION);
SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pPartLogicNode,
pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION : QUERY_NODE_PHYSICAL_PLAN_PARTITION);
if (NULL == pPart) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -1670,6 +1670,12 @@ static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode*
int32_t code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows,
&pDeleter->pAffectedRows);
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pStartTs, &pDeleter->pStartTs);
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pEndTs, &pDeleter->pEndTs);
}
if (TSDB_CODE_SUCCESS == code) {
pDeleter->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc);
if (NULL == pDeleter->sink.pInputDataBlockDesc) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册