提交 46bac2f5 编写于 作者: X Xiaoyu Wang

fix: a problem of interval distributed split

上级 a4ad2b3f
...@@ -1340,6 +1340,13 @@ typedef struct { ...@@ -1340,6 +1340,13 @@ typedef struct {
int32_t tSerializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq); int32_t tSerializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq); int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
typedef struct {
int32_t vgId;
} SSplitVgroupReq;
int32_t tSerializeSSplitVgroupReq(void* buf, int32_t bufLen, SSplitVgroupReq* pReq);
int32_t tDeserializeSSplitVgroupReq(void* buf, int32_t bufLen, SSplitVgroupReq* pReq);
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char spi; char spi;
...@@ -2493,15 +2500,15 @@ int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq ...@@ -2493,15 +2500,15 @@ int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq
int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq); int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq);
typedef struct { typedef struct {
int8_t intervalUnit; int8_t intervalUnit;
int8_t slidingUnit; int8_t slidingUnit;
int64_t interval; int64_t interval;
int64_t offset; int64_t offset;
int64_t sliding; int64_t sliding;
int64_t dstTbUid; int64_t dstTbUid;
int32_t dstVgId; // for stream int32_t dstVgId; // for stream
SEpSet epSet; SEpSet epSet;
char* expr; char* expr;
} STableIndexInfo; } STableIndexInfo;
typedef struct { typedef struct {
...@@ -2510,8 +2517,7 @@ typedef struct { ...@@ -2510,8 +2517,7 @@ typedef struct {
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp); int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp); int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
void tFreeSTableIndexInfo(void *pInfo); void tFreeSTableIndexInfo(void* pInfo);
typedef struct { typedef struct {
int8_t mqMsgType; int8_t mqMsgType;
...@@ -2753,8 +2759,8 @@ typedef struct { ...@@ -2753,8 +2759,8 @@ typedef struct {
char* msg; char* msg;
} SVDeleteReq; } SVDeleteReq;
int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq);
typedef struct { typedef struct {
int64_t affectedRows; int64_t affectedRows;
......
...@@ -157,6 +157,7 @@ enum { ...@@ -157,6 +157,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP, "balance-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP, "balance-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MERGE_VGROUP, "merge-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MERGE_VGROUP, "merge-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
......
...@@ -192,67 +192,68 @@ ...@@ -192,67 +192,68 @@
#define TK_VGROUP 174 #define TK_VGROUP 174
#define TK_MERGE 175 #define TK_MERGE 175
#define TK_REDISTRIBUTE 176 #define TK_REDISTRIBUTE 176
#define TK_SYNCDB 177 #define TK_SPLIT 177
#define TK_DELETE 178 #define TK_SYNCDB 178
#define TK_NULL 179 #define TK_DELETE 179
#define TK_NK_QUESTION 180 #define TK_NULL 180
#define TK_NK_ARROW 181 #define TK_NK_QUESTION 181
#define TK_ROWTS 182 #define TK_NK_ARROW 182
#define TK_TBNAME 183 #define TK_ROWTS 183
#define TK_QSTARTTS 184 #define TK_TBNAME 184
#define TK_QENDTS 185 #define TK_QSTARTTS 185
#define TK_WSTARTTS 186 #define TK_QENDTS 186
#define TK_WENDTS 187 #define TK_WSTARTTS 187
#define TK_WDURATION 188 #define TK_WENDTS 188
#define TK_CAST 189 #define TK_WDURATION 189
#define TK_NOW 190 #define TK_CAST 190
#define TK_TODAY 191 #define TK_NOW 191
#define TK_TIMEZONE 192 #define TK_TODAY 192
#define TK_COUNT 193 #define TK_TIMEZONE 193
#define TK_FIRST 194 #define TK_COUNT 194
#define TK_LAST 195 #define TK_FIRST 195
#define TK_LAST_ROW 196 #define TK_LAST 196
#define TK_BETWEEN 197 #define TK_LAST_ROW 197
#define TK_IS 198 #define TK_BETWEEN 198
#define TK_NK_LT 199 #define TK_IS 199
#define TK_NK_GT 200 #define TK_NK_LT 200
#define TK_NK_LE 201 #define TK_NK_GT 201
#define TK_NK_GE 202 #define TK_NK_LE 202
#define TK_NK_NE 203 #define TK_NK_GE 203
#define TK_MATCH 204 #define TK_NK_NE 204
#define TK_NMATCH 205 #define TK_MATCH 205
#define TK_CONTAINS 206 #define TK_NMATCH 206
#define TK_JOIN 207 #define TK_CONTAINS 207
#define TK_INNER 208 #define TK_JOIN 208
#define TK_SELECT 209 #define TK_INNER 209
#define TK_DISTINCT 210 #define TK_SELECT 210
#define TK_WHERE 211 #define TK_DISTINCT 211
#define TK_PARTITION 212 #define TK_WHERE 212
#define TK_BY 213 #define TK_PARTITION 213
#define TK_SESSION 214 #define TK_BY 214
#define TK_STATE_WINDOW 215 #define TK_SESSION 215
#define TK_SLIDING 216 #define TK_STATE_WINDOW 216
#define TK_FILL 217 #define TK_SLIDING 217
#define TK_VALUE 218 #define TK_FILL 218
#define TK_NONE 219 #define TK_VALUE 219
#define TK_PREV 220 #define TK_NONE 220
#define TK_LINEAR 221 #define TK_PREV 221
#define TK_NEXT 222 #define TK_LINEAR 222
#define TK_HAVING 223 #define TK_NEXT 223
#define TK_ORDER 224 #define TK_HAVING 224
#define TK_SLIMIT 225 #define TK_ORDER 225
#define TK_SOFFSET 226 #define TK_SLIMIT 226
#define TK_LIMIT 227 #define TK_SOFFSET 227
#define TK_OFFSET 228 #define TK_LIMIT 228
#define TK_ASC 229 #define TK_OFFSET 229
#define TK_NULLS 230 #define TK_ASC 230
#define TK_ID 231 #define TK_NULLS 231
#define TK_NK_BITNOT 232 #define TK_ID 232
#define TK_INSERT 233 #define TK_NK_BITNOT 233
#define TK_VALUES 234 #define TK_INSERT 234
#define TK_IMPORT 235 #define TK_VALUES 235
#define TK_NK_SEMI 236 #define TK_IMPORT 236
#define TK_FILE 237 #define TK_NK_SEMI 237
#define TK_FILE 238
#define TK_NK_SPACE 300 #define TK_NK_SPACE 300
#define TK_NK_COMMENT 301 #define TK_NK_COMMENT 301
......
...@@ -2419,7 +2419,7 @@ int32_t tDeserializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pR ...@@ -2419,7 +2419,7 @@ int32_t tDeserializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pR
return 0; return 0;
} }
int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo* pInfo) { int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo *pInfo) {
if (tEncodeI8(pEncoder, pInfo->intervalUnit) < 0) return -1; if (tEncodeI8(pEncoder, pInfo->intervalUnit) < 0) return -1;
if (tEncodeI8(pEncoder, pInfo->slidingUnit) < 0) return -1; if (tEncodeI8(pEncoder, pInfo->slidingUnit) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->interval) < 0) return -1; if (tEncodeI64(pEncoder, pInfo->interval) < 0) return -1;
...@@ -2441,7 +2441,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp ...@@ -2441,7 +2441,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
if (tEncodeI32(&encoder, num) < 0) return -1; if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) { if (num > 0) {
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
STableIndexInfo* pInfo = (STableIndexInfo*)taosArrayGet(pRsp->pIndex, i); STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pRsp->pIndex, i);
if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1; if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1;
} }
} }
...@@ -2491,12 +2491,12 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR ...@@ -2491,12 +2491,12 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
return 0; return 0;
} }
void tFreeSTableIndexInfo(void* info) { void tFreeSTableIndexInfo(void *info) {
if (NULL == info) { if (NULL == info) {
return; return;
} }
STableIndexInfo *pInfo = (STableIndexInfo*)info; STableIndexInfo *pInfo = (STableIndexInfo *)info;
taosMemoryFree(pInfo->expr); taosMemoryFree(pInfo->expr);
} }
...@@ -3448,6 +3448,31 @@ int32_t tDeserializeSRedistributeVgroupReq(void *buf, int32_t bufLen, SRedistrib ...@@ -3448,6 +3448,31 @@ int32_t tDeserializeSRedistributeVgroupReq(void *buf, int32_t bufLen, SRedistrib
return 0; return 0;
} }
int32_t tSerializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) { int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
......
...@@ -200,6 +200,21 @@ bool fmIsInvertible(int32_t funcId) { ...@@ -200,6 +200,21 @@ bool fmIsInvertible(int32_t funcId) {
return res; return res;
} }
static int32_t getFuncInfo(SFunctionNode* pFunc) {
char msg[64] = {0};
if (NULL != gFunMgtService.pFuncNameHashTable) {
return fmGetFuncInfo(pFunc, msg, sizeof(msg));
}
for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) {
if (0 == strcmp(funcMgtBuiltins[i].name, pFunc->functionName)) {
pFunc->funcId = i;
pFunc->funcType = funcMgtBuiltins[pFunc->funcId].type;
return funcMgtBuiltins[pFunc->funcId].translateFunc(pFunc, msg, sizeof(msg));
}
}
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
}
static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) { static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList) {
SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION); SFunctionNode* pFunc = nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFunc) { if (NULL == pFunc) {
...@@ -207,8 +222,8 @@ static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterLis ...@@ -207,8 +222,8 @@ static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterLis
} }
strcpy(pFunc->functionName, pName); strcpy(pFunc->functionName, pName);
pFunc->pParameterList = pParameterList; pFunc->pParameterList = pParameterList;
char msg[64] = {0}; if (TSDB_CODE_SUCCESS != getFuncInfo(pFunc)) {
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc, msg, sizeof(msg))) { pFunc->pParameterList = NULL;
nodesDestroyNode(pFunc); nodesDestroyNode(pFunc);
return NULL; return NULL;
} }
......
...@@ -678,6 +678,7 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) { ...@@ -678,6 +678,7 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) {
} }
static const char* jkMergeLogicPlanMergeKeys = "MergeKeys"; static const char* jkMergeLogicPlanMergeKeys = "MergeKeys";
static const char* jkMergeLogicPlanInputs = "Inputs";
static const char* jkMergeLogicPlanNumOfChannels = "NumOfChannels"; static const char* jkMergeLogicPlanNumOfChannels = "NumOfChannels";
static const char* jkMergeLogicPlanSrcGroupId = "SrcGroupId"; static const char* jkMergeLogicPlanSrcGroupId = "SrcGroupId";
...@@ -688,6 +689,9 @@ static int32_t logicMergeNodeToJson(const void* pObj, SJson* pJson) { ...@@ -688,6 +689,9 @@ static int32_t logicMergeNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkMergeLogicPlanMergeKeys, pNode->pMergeKeys); code = nodeListToJson(pJson, jkMergeLogicPlanMergeKeys, pNode->pMergeKeys);
} }
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkMergeLogicPlanInputs, pNode->pInputs);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergeLogicPlanNumOfChannels, pNode->numOfChannels); code = tjsonAddIntegerToObject(pJson, jkMergeLogicPlanNumOfChannels, pNode->numOfChannels);
} }
...@@ -705,6 +709,9 @@ static int32_t jsonToLogicMergeNode(const SJson* pJson, void* pObj) { ...@@ -705,6 +709,9 @@ static int32_t jsonToLogicMergeNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkMergeLogicPlanMergeKeys, &pNode->pMergeKeys); code = jsonToNodeList(pJson, jkMergeLogicPlanMergeKeys, &pNode->pMergeKeys);
} }
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkMergeLogicPlanInputs, &pNode->pInputs);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergeLogicPlanNumOfChannels, &pNode->numOfChannels); code = tjsonGetIntValue(pJson, jkMergeLogicPlanNumOfChannels, &pNode->numOfChannels);
} }
......
...@@ -472,7 +472,7 @@ cmd ::= KILL TRANSACTION NK_INTEGER(A). ...@@ -472,7 +472,7 @@ cmd ::= KILL TRANSACTION NK_INTEGER(A).
cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); } cmd ::= BALANCE VGROUP. { pCxt->pRootNode = createBalanceVgroupStmt(pCxt); }
cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); } cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); }
cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); } cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); }
//cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); } cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); }
%type dnode_list { SNodeList* } %type dnode_list { SNodeList* }
%destructor dnode_list { nodesDestroyList($$); } %destructor dnode_list { nodesDestroyList($$); }
......
...@@ -248,6 +248,9 @@ static int32_t collectMetaKeyFromCreateIndex(SCollectMetaKeyCxt* pCxt, SCreateIn ...@@ -248,6 +248,9 @@ static int32_t collectMetaKeyFromCreateIndex(SCollectMetaKeyCxt* pCxt, SCreateIn
code = code =
reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->tableName, pCxt->pMetaCache); reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->tableName, pCxt->pMetaCache);
} }
if (TSDB_CODE_SUCCESS == code) {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pCxt->pMetaCache);
}
} }
return code; return code;
} }
......
...@@ -174,7 +174,7 @@ static SKeyword keywordTable[] = { ...@@ -174,7 +174,7 @@ static SKeyword keywordTable[] = {
{"SNODE", TK_SNODE}, {"SNODE", TK_SNODE},
{"SNODES", TK_SNODES}, {"SNODES", TK_SNODES},
{"SOFFSET", TK_SOFFSET}, {"SOFFSET", TK_SOFFSET},
// {"SPLIT", TK_SPLIT}, {"SPLIT", TK_SPLIT},
{"STABLE", TK_STABLE}, {"STABLE", TK_STABLE},
{"STABLES", TK_STABLES}, {"STABLES", TK_STABLES},
{"STATE", TK_STATE}, {"STATE", TK_STATE},
......
...@@ -824,9 +824,9 @@ static EDealRes translateComparisonOperator(STranslateContext* pCxt, SOperatorNo ...@@ -824,9 +824,9 @@ static EDealRes translateComparisonOperator(STranslateContext* pCxt, SOperatorNo
} }
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) { if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
SNodeListNode* pRight = (SNodeListNode*)pOp->pRight; SNodeListNode* pRight = (SNodeListNode*)pOp->pRight;
bool first = true; bool first = true;
SDataType targetDt = {0}; SDataType targetDt = {0};
SNode* pNode = NULL; SNode* pNode = NULL;
FOREACH(pNode, pRight->pNodeList) { FOREACH(pNode, pRight->pNodeList) {
SDataType dt = ((SExprNode*)pNode)->resType; SDataType dt = ((SExprNode*)pNode)->resType;
if (first) { if (first) {
...@@ -3672,6 +3672,11 @@ static int32_t translateRedistributeVgroup(STranslateContext* pCxt, SRedistribut ...@@ -3672,6 +3672,11 @@ static int32_t translateRedistributeVgroup(STranslateContext* pCxt, SRedistribut
return code; return code;
} }
static int32_t translateSplitVgroup(STranslateContext* pCxt, SSplitVgroupStmt* pStmt) {
SSplitVgroupReq req = {.vgId = pStmt->vgId};
return buildCmdMsg(pCxt, TDMT_MND_SPLIT_VGROUP, (FSerializeFunc)tSerializeSSplitVgroupReq, &req);
}
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) { switch (nodeType(pNode)) {
...@@ -3803,6 +3808,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { ...@@ -3803,6 +3808,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
code = translateRedistributeVgroup(pCxt, (SRedistributeVgroupStmt*)pNode); code = translateRedistributeVgroup(pCxt, (SRedistributeVgroupStmt*)pNode);
break; break;
case QUERY_NODE_SPLIT_VGROUP_STMT:
code = translateSplitVgroup(pCxt, (SSplitVgroupStmt*)pNode);
break;
default: default:
break; break;
} }
......
此差异已折叠。
...@@ -267,10 +267,12 @@ TEST_F(ParserInitialCTest, createFunction) { ...@@ -267,10 +267,12 @@ TEST_F(ParserInitialCTest, createFunction) {
// run("CREATE AGGREGATE FUNCTION IF NOT EXISTS udf2 AS './build/lib/libudf2.so' OUTPUTTYPE DOUBLE BUFSIZE 8"); // run("CREATE AGGREGATE FUNCTION IF NOT EXISTS udf2 AS './build/lib/libudf2.so' OUTPUTTYPE DOUBLE BUFSIZE 8");
} }
TEST_F(ParserInitialCTest, createIndexSma) { TEST_F(ParserInitialCTest, createSmaIndex) {
useDb("root", "test"); useDb("root", "test");
run("CREATE SMA INDEX index1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)"); run("CREATE SMA INDEX index1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)");
run("CREATE SMA INDEX index2 ON st1 FUNCTION(MAX(c1), MIN(tag1)) INTERVAL(10s)");
} }
TEST_F(ParserInitialCTest, createMnode) { TEST_F(ParserInitialCTest, createMnode) {
......
...@@ -19,7 +19,7 @@ using namespace std; ...@@ -19,7 +19,7 @@ using namespace std;
namespace ParserTest { namespace ParserTest {
class ParserShowToUseTest : public ParserTestBase {}; class ParserShowToUseTest : public ParserDdlTest {};
// todo SHOW accounts // todo SHOW accounts
// todo SHOW apps // todo SHOW apps
...@@ -133,7 +133,24 @@ TEST_F(ParserShowToUseTest, showVgroups) { ...@@ -133,7 +133,24 @@ TEST_F(ParserShowToUseTest, showVgroups) {
// todo SHOW vnodes // todo SHOW vnodes
// todo split vgroup TEST_F(ParserShowToUseTest, splitVgroup) {
useDb("root", "test");
SSplitVgroupReq expect = {0};
auto setSplitVgroupReqFunc = [&](int32_t vgId) { expect.vgId = vgId; };
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_SPLIT_VGROUP_STMT);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_SPLIT_VGROUP);
SSplitVgroupReq req = {0};
ASSERT_EQ(tDeserializeSSplitVgroupReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(req.vgId, expect.vgId);
});
setSplitVgroupReqFunc(15);
run("SPLIT VGROUP 15");
}
TEST_F(ParserShowToUseTest, useDatabase) { TEST_F(ParserShowToUseTest, useDatabase) {
useDb("root", "test"); useDb("root", "test");
......
...@@ -135,7 +135,8 @@ typedef struct SStableSplitInfo { ...@@ -135,7 +135,8 @@ typedef struct SStableSplitInfo {
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) { static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
SNode* pFunc = NULL; SNode* pFunc = NULL;
FOREACH(pFunc, pFuncs) { FOREACH(pFunc, pFuncs) {
if (!fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) { if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
!fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
return true; return true;
} }
} }
...@@ -314,7 +315,12 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla ...@@ -314,7 +315,12 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pMerge->pInputs = nodesCloneList(pPartChild->pTargets); pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets); // NULL == pSubplan means 'merge node' replaces 'split node'.
if (NULL == pSubplan) {
pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
} else {
pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets);
}
if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) { if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -340,6 +346,21 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent ...@@ -340,6 +346,21 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
return code; return code;
} }
static int32_t stbSplCreateMergeKeysForInterval(SNode* pWStartTs, SNodeList** pMergeKeys) {
SOrderByExprNode* pMergeKey = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pMergeKey) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMergeKey->pExpr = nodesCloneNode(pWStartTs);
if (NULL == pMergeKey->pExpr) {
nodesDestroyNode(pMergeKey);
return TSDB_CODE_OUT_OF_MEMORY;
}
pMergeKey->order = ORDER_ASC;
pMergeKey->nullOrder = NULL_ORDER_FIRST;
return nodesListMakeStrictAppend(pMergeKeys, pMergeKey);
}
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL; SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
...@@ -347,7 +368,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo ...@@ -347,7 +368,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH; ((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH;
((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE; ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE;
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk)); code = stbSplCreateMergeKeysForInterval(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow); code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow);
} }
......
...@@ -58,4 +58,6 @@ TEST_F(PlanIntervalTest, stable) { ...@@ -58,4 +58,6 @@ TEST_F(PlanIntervalTest, stable) {
useDb("root", "test"); useDb("root", "test");
run("SELECT COUNT(*) FROM st1 INTERVAL(10s)"); run("SELECT COUNT(*) FROM st1 INTERVAL(10s)");
run("SELECT _WSTARTTS, COUNT(*) FROM st1 INTERVAL(10s)");
} }
...@@ -38,5 +38,5 @@ TEST_F(PlanSuperTableTest, pseudoColOnChildTable) { ...@@ -38,5 +38,5 @@ TEST_F(PlanSuperTableTest, pseudoColOnChildTable) {
TEST_F(PlanSuperTableTest, orderBy) { TEST_F(PlanSuperTableTest, orderBy) {
useDb("root", "test"); useDb("root", "test");
run("SELECT -1*c1, c1 FROM st1 ORDER BY -1*c1"); run("SELECT -1 * c1, c1 FROM st1 ORDER BY -1 * c1");
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册