提交 2a1ec3c3 编写于 作者: X Xiaoyu Wang

feat: order by distributed split

上级 ce3aacf4
......@@ -95,7 +95,7 @@ typedef struct SVnodeModifyLogicNode {
int32_t msgType;
SArray* pDataBlocks;
SVgDataBlocks* pVgDataBlocks;
SNode* pModifyRows; // SColumnNode
SNode* pAffectedRows; // SColumnNode
uint64_t tableId;
int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN];
......@@ -392,6 +392,7 @@ typedef struct SDataDeleterNode {
int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN];
STimeWindow deleteTimeRange;
SNode* pAffectedRows;
} SDataDeleterNode;
typedef struct SSubplan {
......
......@@ -371,7 +371,7 @@ static SNode* logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModif
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(modifyType);
COPY_SCALAR_FIELD(msgType);
CLONE_NODE_FIELD(pModifyRows);
CLONE_NODE_FIELD(pAffectedRows);
COPY_SCALAR_FIELD(tableId);
COPY_SCALAR_FIELD(tableType);
COPY_CHAR_ARRAY_FIELD(tableFName);
......
......@@ -613,7 +613,7 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
static const char* jkVnodeModifyLogicPlanModifyType = "ModifyType";
static const char* jkVnodeModifyLogicPlanMsgType = "MsgType";
static const char* jkVnodeModifyLogicPlanModifyRows = "ModifyRows";
static const char* jkVnodeModifyLogicPlanAffectedRows = "AffectedRows";
static int32_t logicVnodeModifyNodeToJson(const void* pObj, SJson* pJson) {
const SVnodeModifyLogicNode* pNode = (const SVnodeModifyLogicNode*)pObj;
......@@ -626,7 +626,7 @@ static int32_t logicVnodeModifyNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddIntegerToObject(pJson, jkVnodeModifyLogicPlanMsgType, pNode->msgType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkVnodeModifyLogicPlanModifyRows, nodeToJson, pNode->pModifyRows);
code = tjsonAddObject(pJson, jkVnodeModifyLogicPlanAffectedRows, nodeToJson, pNode->pAffectedRows);
}
return code;
......@@ -643,7 +643,7 @@ static int32_t jsonToLogicVnodeModifyNode(const SJson* pJson, void* pObj) {
code = tjsonGetIntValue(pJson, jkVnodeModifyLogicPlanMsgType, &pNode->msgType);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkVnodeModifyLogicPlanModifyRows, &pNode->pModifyRows);
code = jsonToNodeObject(pJson, jkVnodeModifyLogicPlanAffectedRows, &pNode->pAffectedRows);
}
return code;
......@@ -2016,6 +2016,7 @@ static const char* jkDeletePhysiPlanTableType = "TableType";
static const char* jkDeletePhysiPlanTableFName = "TableFName";
static const char* jkDeletePhysiPlanDeleteTimeRangeStartKey = "DeleteTimeRangeStartKey";
static const char* jkDeletePhysiPlanDeleteTimeRangeEndKey = "DeleteTimeRangeEndKey";
static const char* jkDeletePhysiPlanAffectedRows = "AffectedRows";
static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) {
const SDataDeleterNode* pNode = (const SDataDeleterNode*)pObj;
......@@ -2036,6 +2037,9 @@ static int32_t physiDeleteNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDeletePhysiPlanDeleteTimeRangeEndKey, pNode->deleteTimeRange.ekey);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkDeletePhysiPlanAffectedRows, nodeToJson, pNode->pAffectedRows);
}
return code;
}
......@@ -2059,6 +2063,9 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkDeletePhysiPlanDeleteTimeRangeEndKey, &pNode->deleteTimeRange.ekey);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkDeletePhysiPlanAffectedRows, &pNode->pAffectedRows);
}
return code;
}
......
......@@ -1057,8 +1057,8 @@ static int32_t createVnodeModifLogicNodeByDelete(SLogicPlanContext* pCxt, SDelet
snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId,
pRealTable->table.dbName, pRealTable->table.tableName);
pModify->deleteTimeRange = pDelete->timeRange;
pModify->pModifyRows = nodesCloneNode(pDelete->pCountFunc);
if (NULL == pModify->pModifyRows) {
pModify->pAffectedRows = nodesCloneNode(pDelete->pCountFunc);
if (NULL == pModify->pAffectedRows) {
nodesDestroyNode(pModify);
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -1332,13 +1332,21 @@ static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode*
strcpy(pDeleter->tableFName, pModify->tableFName);
pDeleter->deleteTimeRange = pModify->deleteTimeRange;
pDeleter->sink.pInputDataBlockDesc = nodesCloneNode(pRoot->pOutputDataBlockDesc);
if (NULL == pDeleter->sink.pInputDataBlockDesc) {
int32_t code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows,
&pDeleter->pAffectedRows);
if (TSDB_CODE_SUCCESS == code) {
pDeleter->sink.pInputDataBlockDesc = nodesCloneNode(pRoot->pOutputDataBlockDesc);
if (NULL == pDeleter->sink.pInputDataBlockDesc) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pSink = (SDataSinkNode*)pDeleter;
} else {
nodesDestroyNode(pDeleter);
return TSDB_CODE_OUT_OF_MEMORY;
}
*pSink = (SDataSinkNode*)pDeleter;
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册