提交 ba0c9283 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/test/chr/TD-14699' into fix/dnode

...@@ -177,7 +177,8 @@ typedef enum ESubplanType { ...@@ -177,7 +177,8 @@ typedef enum ESubplanType {
SUBPLAN_TYPE_MERGE = 1, SUBPLAN_TYPE_MERGE = 1,
SUBPLAN_TYPE_PARTIAL, SUBPLAN_TYPE_PARTIAL,
SUBPLAN_TYPE_SCAN, SUBPLAN_TYPE_SCAN,
SUBPLAN_TYPE_MODIFY SUBPLAN_TYPE_MODIFY,
SUBPLAN_TYPE_COMPUTE
} ESubplanType; } ESubplanType;
typedef struct SSubplanId { typedef struct SSubplanId {
...@@ -196,6 +197,7 @@ typedef struct SLogicSubplan { ...@@ -196,6 +197,7 @@ typedef struct SLogicSubplan {
SVgroupsInfo* pVgroupList; SVgroupsInfo* pVgroupList;
int32_t level; int32_t level;
int32_t splitFlag; int32_t splitFlag;
int32_t numOfComputeNodes;
} SLogicSubplan; } SLogicSubplan;
typedef struct SQueryLogicPlan { typedef struct SQueryLogicPlan {
......
...@@ -1117,6 +1117,7 @@ static const char* jkLogicSubplanVgroupsSize = "VgroupsSize"; ...@@ -1117,6 +1117,7 @@ static const char* jkLogicSubplanVgroupsSize = "VgroupsSize";
static const char* jkLogicSubplanVgroups = "Vgroups"; static const char* jkLogicSubplanVgroups = "Vgroups";
static const char* jkLogicSubplanLevel = "Level"; static const char* jkLogicSubplanLevel = "Level";
static const char* jkLogicSubplanSplitFlag = "SplitFlag"; static const char* jkLogicSubplanSplitFlag = "SplitFlag";
static const char* jkLogicSubplanNumOfComputeNodes = "NumOfComputeNodes";
static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) { static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) {
const SLogicSubplan* pNode = (const SLogicSubplan*)pObj; const SLogicSubplan* pNode = (const SLogicSubplan*)pObj;
...@@ -1143,6 +1144,9 @@ static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) { ...@@ -1143,6 +1144,9 @@ static int32_t logicSubplanToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkLogicSubplanSplitFlag, pNode->splitFlag); code = tjsonAddIntegerToObject(pJson, jkLogicSubplanSplitFlag, pNode->splitFlag);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkLogicSubplanNumOfComputeNodes, pNode->numOfComputeNodes);
}
return code; return code;
} }
...@@ -1159,7 +1163,6 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) { ...@@ -1159,7 +1163,6 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) {
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType, code); tjsonGetNumberValue(pJson, jkLogicSubplanType, pNode->subplanType, code);
;
} }
int32_t objSize = 0; int32_t objSize = 0;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -1174,6 +1177,9 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) { ...@@ -1174,6 +1177,9 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkLogicSubplanSplitFlag, &pNode->splitFlag); code = tjsonGetIntValue(pJson, jkLogicSubplanSplitFlag, &pNode->splitFlag);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkLogicSubplanNumOfComputeNodes, &pNode->numOfComputeNodes);
}
return code; return code;
} }
......
...@@ -115,7 +115,45 @@ static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, ...@@ -115,7 +115,45 @@ static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan,
} }
} }
static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) { static int32_t scaleOutForCompute(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pSubplan->numOfComputeNodes; ++i) {
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
if (NULL == pNewSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static int32_t pushHierarchicalPlanForCompute(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
SNode* pChild = NULL;
SNode* pParent = NULL;
int32_t code = TSDB_CODE_SUCCESS;
FORBOTH(pChild, pCurrentGroup, pParent, pParentsGroup) {
code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static bool isComputeGroup(SNodeList* pGroup) {
if (0 == LIST_LENGTH(pGroup)) {
return false;
}
return SUBPLAN_TYPE_COMPUTE == ((SLogicSubplan*)nodesListGetNode(pGroup, 0))->subplanType;
}
static int32_t pushHierarchicalPlanForNormal(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool topLevel = (0 == LIST_LENGTH(pParentsGroup)); bool topLevel = (0 == LIST_LENGTH(pParentsGroup));
SNode* pChild = NULL; SNode* pChild = NULL;
...@@ -138,6 +176,13 @@ static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurren ...@@ -138,6 +176,13 @@ static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurren
return code; return code;
} }
static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
if (isComputeGroup(pParentsGroup)) {
return pushHierarchicalPlanForCompute(pParentsGroup, pCurrentGroup);
}
return pushHierarchicalPlanForNormal(pParentsGroup, pCurrentGroup);
}
static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) { static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) {
SNodeList* pCurrentGroup = nodesMakeList(); SNodeList* pCurrentGroup = nodesMakeList();
if (NULL == pCurrentGroup) { if (NULL == pCurrentGroup) {
...@@ -155,6 +200,9 @@ static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32 ...@@ -155,6 +200,9 @@ static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32
case SUBPLAN_TYPE_MODIFY: case SUBPLAN_TYPE_MODIFY:
code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup); code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup);
break; break;
case SUBPLAN_TYPE_COMPUTE:
code = scaleOutForCompute(pCxt, pSubplan, level, pCurrentGroup);
break;
default: default:
break; break;
} }
......
...@@ -994,8 +994,20 @@ static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { ...@@ -994,8 +994,20 @@ static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
} }
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType); int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0)); SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
if (NULL != pScanSubplan) {
if (NULL != info.pSubplan->pVgroupList) {
info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
TSWAP(pScanSubplan->pVgroupList, info.pSubplan->pVgroupList);
} else {
info.pSubplan->numOfComputeNodes = 1;
}
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan);
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
++(pCxt->groupId); ++(pCxt->groupId);
pCxt->split = true; pCxt->split = true;
return code; return code;
...@@ -1007,8 +1019,7 @@ static const SSplitRule splitRuleSet[] = { ...@@ -1007,8 +1019,7 @@ static const SSplitRule splitRuleSet[] = {
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit}, {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit}, {.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}, {.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}
{.pName = "QnodeSplit", .splitFunc = qnodeSplit}
}; };
// clang-format on // clang-format on
...@@ -1039,7 +1050,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) { ...@@ -1039,7 +1050,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
} }
} }
} while (split); } while (split);
return TSDB_CODE_SUCCESS; return qnodeSplit(&cxt, pSubplan);
} }
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) { static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
......
...@@ -334,22 +334,22 @@ class TDDnode: ...@@ -334,22 +334,22 @@ class TDDnode:
bkey2 = bytes(key2, encoding="utf8") bkey2 = bytes(key2, encoding="utf8")
logFile = self.logDir + "/taosdlog.0" logFile = self.logDir + "/taosdlog.0"
i = 0 i = 0
while not os.path.exists(logFile): # while not os.path.exists(logFile):
sleep(0.1) # sleep(0.1)
i += 1 # i += 1
if i > 10: # if i > 10:
break # break
tailCmdStr = 'tail -f ' # tailCmdStr = 'tail -f '
if platform.system().lower() == 'windows': # if platform.system().lower() == 'windows':
tailCmdStr = 'tail -n +0 -f ' # tailCmdStr = 'tail -n +0 -f '
popen = subprocess.Popen( # popen = subprocess.Popen(
tailCmdStr + logFile, # tailCmdStr + logFile,
stdout=subprocess.PIPE, # stdout=subprocess.PIPE,
stderr=subprocess.PIPE, # stderr=subprocess.PIPE,
shell=True) # shell=True)
pid = popen.pid # pid = popen.pid
# print('Popen.pid:' + str(pid)) # # print('Popen.pid:' + str(pid))
timeout = time.time() + 60 * 2 # timeout = time.time() + 60 * 2
# while True: # while True:
# line = popen.stdout.readline().strip() # line = popen.stdout.readline().strip()
# print(line) # print(line)
......
...@@ -156,6 +156,7 @@ class TDSql: ...@@ -156,6 +156,7 @@ class TDSql:
def checkRows(self, expectRows): def checkRows(self, expectRows):
if self.queryRows == expectRows: if self.queryRows == expectRows:
tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows)) tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows))
return True
else: else:
caller = inspect.getframeinfo(inspect.stack()[1][0]) caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectRows) args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectRows)
......
...@@ -54,7 +54,9 @@ class TDTestCase: ...@@ -54,7 +54,9 @@ class TDTestCase:
valgrind = 0 valgrind = 0
hostname = socket.gethostname() hostname = socket.gethostname()
dnodes = [] dnodes = []
start_port = 6030 start_port = 6030
start_port_sec = 6130
for num in range(1, dnodes_nums+1): for num in range(1, dnodes_nums+1):
dnode = TDDnode(num) dnode = TDDnode(num)
dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}") dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}")
...@@ -62,6 +64,8 @@ class TDTestCase: ...@@ -62,6 +64,8 @@ class TDTestCase:
dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}") dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}")
dnode.addExtraCfg("monitorFqdn", hostname) dnode.addExtraCfg("monitorFqdn", hostname)
dnode.addExtraCfg("monitorPort", 7043) dnode.addExtraCfg("monitorPort", 7043)
dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}")
dnodes.append(dnode) dnodes.append(dnode)
self.TDDnodes = MyDnodes(dnodes) self.TDDnodes = MyDnodes(dnodes)
......
...@@ -78,6 +78,7 @@ class TDTestCase: ...@@ -78,6 +78,7 @@ class TDTestCase:
print(hostname) print(hostname)
dnodes = [] dnodes = []
start_port = 6030 start_port = 6030
start_port_sec = 6130
for num in range(1, dnodes_nums+1): for num in range(1, dnodes_nums+1):
dnode = TDDnode(num) dnode = TDDnode(num)
dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}") dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}")
...@@ -85,6 +86,7 @@ class TDTestCase: ...@@ -85,6 +86,7 @@ class TDTestCase:
dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}") dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}")
dnode.addExtraCfg("monitorFqdn", hostname) dnode.addExtraCfg("monitorFqdn", hostname)
dnode.addExtraCfg("monitorPort", 7043) dnode.addExtraCfg("monitorPort", 7043)
dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}")
dnodes.append(dnode) dnodes.append(dnode)
self.TDDnodes = MyDnodes(dnodes) self.TDDnodes = MyDnodes(dnodes)
...@@ -116,25 +118,28 @@ class TDTestCase: ...@@ -116,25 +118,28 @@ class TDTestCase:
while count < 10: while count < 10:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='leader' : print("mnode is three nodes")
if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[0][2]=='leader' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='follower':
print("three mnodes is ready in 10s") if tdSql.queryResult[2][2]=='follower':
break print("three mnodes is ready in 10s")
elif tdSql.queryResult[0][2]=='follower' : break
if tdSql.queryResult[1][2]=='leader': elif tdSql.queryResult[0][2]=='follower' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='leader':
print("three mnodes is ready in 10s") if tdSql.queryResult[2][2]=='follower':
break print("three mnodes is ready in 10s")
elif tdSql.queryResult[0][2]=='follower' : break
if tdSql.queryResult[1][2]=='follower': elif tdSql.queryResult[0][2]=='follower' :
if tdSql.queryResult[2][2]=='leader': if tdSql.queryResult[1][2]=='follower':
print("three mnodes is ready in 10s") if tdSql.queryResult[2][2]=='leader':
break print("three mnodes is ready in 10s")
break
count+=1 count+=1
else: else:
print(tdSql.queryResult)
print("three mnodes is not ready in 10s ") print("three mnodes is not ready in 10s ")
return -1
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
...@@ -151,18 +156,21 @@ class TDTestCase: ...@@ -151,18 +156,21 @@ class TDTestCase:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='offline' : print("mnode is three nodes")
if tdSql.queryResult[1][2]=='leader': if tdSql.queryResult[0][2]=='offline' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='leader':
print("stop mnodes on dnode 2 successfully in 10s") if tdSql.queryResult[2][2]=='follower':
break print("stop mnodes on dnode 2 successfully in 10s")
elif tdSql.queryResult[1][2]=='follower': break
if tdSql.queryResult[2][2]=='leader': elif tdSql.queryResult[1][2]=='follower':
print("stop mnodes on dnode 2 successfully in 10s") if tdSql.queryResult[2][2]=='leader':
break print("stop mnodes on dnode 2 successfully in 10s")
break
count+=1 count+=1
else: else:
print("stop mnodes on dnode 2 failed in 10s ") print("stop mnodes on dnode 2 failed in 10s ")
return -1
tdSql.error("drop mnode on dnode 1;")
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
...@@ -174,20 +182,23 @@ class TDTestCase: ...@@ -174,20 +182,23 @@ class TDTestCase:
tdSql.checkData(2,1,'%s:6230'%self.host) tdSql.checkData(2,1,'%s:6230'%self.host)
tdSql.checkData(2,3,'ready') tdSql.checkData(2,3,'ready')
def check3mnode2drop(self): def check3mnode2off(self):
count=0 count=0
while count < 10: while count < 40:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='leader' : print("mnode is three nodes")
if tdSql.queryResult[1][2]=='offline': if tdSql.queryResult[0][2]=='leader' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='offline':
print("stop mnodes on dnode 2 successfully in 10s") if tdSql.queryResult[2][2]=='follower':
break print("stop mnodes on dnode 2 successfully in 10s")
break
count+=1 count+=1
else: else:
print("stop mnodes on dnode 2 failed in 10s ") print("stop mnodes on dnode 2 failed in 10s ")
return -1
tdSql.error("drop mnode on dnode 2;")
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
...@@ -197,7 +208,7 @@ class TDTestCase: ...@@ -197,7 +208,7 @@ class TDTestCase:
tdSql.checkData(1,1,'%s:6130'%self.host) tdSql.checkData(1,1,'%s:6130'%self.host)
tdSql.checkData(1,2,'offline') tdSql.checkData(1,2,'offline')
tdSql.checkData(1,3,'ready') tdSql.checkData(1,3,'ready')
tdSql.checkData(2,1,'%s:6230') tdSql.checkData(2,1,'%s:6230'%self.host)
tdSql.checkData(2,2,'follower') tdSql.checkData(2,2,'follower')
tdSql.checkData(2,3,'ready') tdSql.checkData(2,3,'ready')
...@@ -207,15 +218,17 @@ class TDTestCase: ...@@ -207,15 +218,17 @@ class TDTestCase:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='leader' : print("mnode is three nodes")
if tdSql.queryResult[2][2]=='offline': if tdSql.queryResult[0][2]=='leader' :
if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='offline':
print("stop mnodes on dnode 3 successfully in 10s") if tdSql.queryResult[1][2]=='follower':
break print("stop mnodes on dnode 3 successfully in 10s")
break
count+=1 count+=1
else: else:
print("stop mnodes on dnode 3 failed in 10s") print("stop mnodes on dnode 3 failed in 10s")
return -1
tdSql.error("drop mnode on dnode 3;")
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,1,'%s:6030'%self.host)
...@@ -262,8 +275,25 @@ class TDTestCase: ...@@ -262,8 +275,25 @@ class TDTestCase:
for i in range(1,3): for i in range(1,3):
tdLog.debug("drop mnode on dnode %d"%(i+1)) tdLog.debug("drop mnode on dnode %d"%(i+1))
tdSql.execute("drop mnode on dnode %d"%(i+1)) tdSql.execute("drop mnode on dnode %d"%(i+1))
tdSql.query("show mnodes;")
count=0
while count<10:
time.sleep(1)
tdSql.query("show mnodes;")
if tdSql.checkRows(2):
print("drop mnode %d successfully"%(i+1))
break
count+=1
tdLog.debug("create mnode on dnode %d"%(i+1)) tdLog.debug("create mnode on dnode %d"%(i+1))
tdSql.execute("create mnode on dnode %d"%(i+1)) tdSql.execute("create mnode on dnode %d"%(i+1))
count=0
while count<10:
time.sleep(1)
tdSql.query("show mnodes;")
if tdSql.checkRows(3):
print("drop mnode %d successfully"%(i+1))
break
count+=1
dropcount+=1 dropcount+=1
self.check3mnode() self.check3mnode()
......
...@@ -115,25 +115,27 @@ class TDTestCase: ...@@ -115,25 +115,27 @@ class TDTestCase:
while count < 10: while count < 10:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='leader' : print("mnode is three nodes")
if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[0][2]=='leader' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='follower':
print("three mnodes is ready in 10s") if tdSql.queryResult[2][2]=='follower':
break print("three mnodes is ready in 10s")
elif tdSql.queryResult[0][2]=='follower' : break
if tdSql.queryResult[1][2]=='leader': elif tdSql.queryResult[0][2]=='follower' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='leader':
print("three mnodes is ready in 10s") if tdSql.queryResult[2][2]=='follower':
break print("three mnodes is ready in 10s")
elif tdSql.queryResult[0][2]=='follower' : break
if tdSql.queryResult[1][2]=='follower': elif tdSql.queryResult[0][2]=='follower' :
if tdSql.queryResult[2][2]=='leader': if tdSql.queryResult[1][2]=='follower':
print("three mnodes is ready in 10s") if tdSql.queryResult[2][2]=='leader':
break print("three mnodes is ready in 10s")
break
count+=1 count+=1
else: else:
print("three mnodes is not ready in 10s ") print("three mnodes is not ready in 10s ")
return -1
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
...@@ -145,24 +147,26 @@ class TDTestCase: ...@@ -145,24 +147,26 @@ class TDTestCase:
tdSql.checkData(2,3,'ready') tdSql.checkData(2,3,'ready')
def check3mnode1off(self): def check3mnode1off(self):
tdSql.error("drop mnode on dnode 1;")
count=0 count=0
while count < 10: while count < 10:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='offline' : print("mnode is three nodes")
if tdSql.queryResult[1][2]=='leader': if tdSql.queryResult[0][2]=='offline' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='leader':
print("stop mnodes on dnode 2 successfully in 10s") if tdSql.queryResult[2][2]=='follower':
break print("stop mnodes on dnode 2 successfully in 10s")
elif tdSql.queryResult[1][2]=='follower': break
if tdSql.queryResult[2][2]=='leader': elif tdSql.queryResult[1][2]=='follower':
print("stop mnodes on dnode 2 successfully in 10s") if tdSql.queryResult[2][2]=='leader':
break print("stop mnodes on dnode 2 successfully in 10s")
break
count+=1 count+=1
else: else:
print("stop mnodes on dnode 2 failed in 10s ") print("stop mnodes on dnode 2 failed in 10s ")
return -1
tdSql.error("drop mnode on dnode 1;")
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
...@@ -175,20 +179,22 @@ class TDTestCase: ...@@ -175,20 +179,22 @@ class TDTestCase:
tdSql.checkData(2,3,'ready') tdSql.checkData(2,3,'ready')
def check3mnode2off(self): def check3mnode2off(self):
tdSql.error("drop mnode on dnode 2;")
count=0 count=0
while count < 10: while count < 40:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='leader' : print("mnode is three nodes")
if tdSql.queryResult[1][2]=='offline': if tdSql.queryResult[0][2]=='leader' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='offline':
print("stop mnodes on dnode 2 successfully in 10s") if tdSql.queryResult[2][2]=='follower':
break print("stop mnodes on dnode 2 successfully in 10s")
break
count+=1 count+=1
else: else:
print("stop mnodes on dnode 2 failed in 10s ") print("stop mnodes on dnode 2 failed in 10s ")
return -1
tdSql.error("drop mnode on dnode 2;")
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
...@@ -203,21 +209,22 @@ class TDTestCase: ...@@ -203,21 +209,22 @@ class TDTestCase:
tdSql.checkData(2,3,'ready') tdSql.checkData(2,3,'ready')
def check3mnode3off(self): def check3mnode3off(self):
tdSql.error("drop mnode on dnode 3;")
count=0 count=0
while count < 10: while count < 10:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='leader' : print("mnode is three nodes")
if tdSql.queryResult[2][2]=='offline': if tdSql.queryResult[0][2]=='leader' :
if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='offline':
print("stop mnodes on dnode 3 successfully in 10s") if tdSql.queryResult[1][2]=='follower':
break print("stop mnodes on dnode 3 successfully in 10s")
break
count+=1 count+=1
else: else:
print("stop mnodes on dnode 3 failed in 10s") print("stop mnodes on dnode 3 failed in 10s")
return -1
tdSql.error("drop mnode on dnode 3;")
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,1,'%s:6030'%self.host)
...@@ -264,13 +271,13 @@ class TDTestCase: ...@@ -264,13 +271,13 @@ class TDTestCase:
self.TDDnodes.stoptaosd(3) self.TDDnodes.stoptaosd(3)
self.check3mnode3off() self.check3mnode3off()
self.TDDnodes.starttaosd(2) self.TDDnodes.starttaosd(3)
self.TDDnodes.stoptaosd(1) self.TDDnodes.stoptaosd(1)
self.check3mnode1off() self.check3mnode1off()
self.TDDnodes.starttaosd(1) self.TDDnodes.starttaosd(1)
# self.check3mnode() self.check3mnode()
stopcount =0 stopcount =0
while stopcount <= 2: while stopcount <= 2:
for i in range(dnodenumber): for i in range(dnodenumber):
......
...@@ -13,13 +13,17 @@ import time ...@@ -13,13 +13,17 @@ import time
import socket import socket
import subprocess import subprocess
from multiprocessing import Process from multiprocessing import Process
import threading as thd import threading
import time
import inspect
import ctypes
class MyDnodes(TDDnodes): class MyDnodes(TDDnodes):
def __init__(self ,dnodes_lists): def __init__(self ,dnodes_lists):
super(MyDnodes,self).__init__() super(MyDnodes,self).__init__()
self.dnodes = dnodes_lists # dnode must be TDDnode instance self.dnodes = dnodes_lists # dnode must be TDDnode instance
self.simDeployed = False self.simDeployed = False
class TDTestCase: class TDTestCase:
def init(self,conn ,logSql): def init(self,conn ,logSql):
...@@ -49,11 +53,31 @@ class TDTestCase: ...@@ -49,11 +53,31 @@ class TDTestCase:
buildPath = root[:len(root) - len("/build/bin")] buildPath = root[:len(root) - len("/build/bin")]
break break
return buildPath return buildPath
def _async_raise(self, tid, exctype):
"""raises the exception, performs cleanup if needed"""
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stop_thread(self,thread):
self._async_raise(thread.ident, SystemExit)
def insert_data(self,countstart,countstop): def insert_data(self,countstart,countstop):
# fisrt add data : db\stable\childtable\general table # fisrt add data : db\stable\childtable\general table
for couti in range(countstart,countstop): for couti in range(countstart,countstop):
tdLog.debug("drop database if exists db%d" %couti)
tdSql.execute("drop database if exists db%d" %couti) tdSql.execute("drop database if exists db%d" %couti)
print("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti) tdSql.execute("create database if not exists db%d replica 1 duration 300" %couti)
tdSql.execute("use db%d" %couti) tdSql.execute("use db%d" %couti)
tdSql.execute( tdSql.execute(
...@@ -78,6 +102,7 @@ class TDTestCase: ...@@ -78,6 +102,7 @@ class TDTestCase:
hostname = socket.gethostname() hostname = socket.gethostname()
dnodes = [] dnodes = []
start_port = 6030 start_port = 6030
start_port_sec = 6130
for num in range(1, dnodes_nums+1): for num in range(1, dnodes_nums+1):
dnode = TDDnode(num) dnode = TDDnode(num)
dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}") dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}")
...@@ -85,6 +110,7 @@ class TDTestCase: ...@@ -85,6 +110,7 @@ class TDTestCase:
dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}") dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}")
dnode.addExtraCfg("monitorFqdn", hostname) dnode.addExtraCfg("monitorFqdn", hostname)
dnode.addExtraCfg("monitorPort", 7043) dnode.addExtraCfg("monitorPort", 7043)
dnode.addExtraCfg("secondEp", f"{hostname}:{start_port_sec}")
dnodes.append(dnode) dnodes.append(dnode)
self.TDDnodes = MyDnodes(dnodes) self.TDDnodes = MyDnodes(dnodes)
...@@ -111,30 +137,58 @@ class TDTestCase: ...@@ -111,30 +137,58 @@ class TDTestCase:
time.sleep(2) time.sleep(2)
tdLog.info(" create cluster with %d dnode done! " %dnodes_nums) tdLog.info(" create cluster with %d dnode done! " %dnodes_nums)
def checkdnodes(self,dnodenumber):
count=0
while count < 10:
time.sleep(1)
statusReadyBumber=0
tdSql.query("show dnodes;")
if tdSql.checkRows(dnodenumber) :
print("dnode is %d nodes"%dnodenumber)
for i in range(dnodenumber):
if tdSql.queryResult[i][4] !='ready' :
status=tdSql.queryResult[i][4]
print("dnode:%d status is %s "%(i,status))
break
else:
statusReadyBumber+=1
print(statusReadyBumber)
if statusReadyBumber == dnodenumber :
print("all of %d mnodes is ready in 10s "%dnodenumber)
return True
break
count+=1
else:
print("%d mnodes is not ready in 10s "%dnodenumber)
return False
def check3mnode(self): def check3mnode(self):
count=0 count=0
while count < 10: while count < 10:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='leader' : print("mnode is three nodes")
if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[0][2]=='leader' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='follower':
print("three mnodes is ready in 10s") if tdSql.queryResult[2][2]=='follower':
break print("three mnodes is ready in 10s")
elif tdSql.queryResult[0][2]=='follower' : break
if tdSql.queryResult[1][2]=='leader': elif tdSql.queryResult[0][2]=='follower' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='leader':
print("three mnodes is ready in 10s") if tdSql.queryResult[2][2]=='follower':
break print("three mnodes is ready in 10s")
elif tdSql.queryResult[0][2]=='follower' : break
if tdSql.queryResult[1][2]=='follower': elif tdSql.queryResult[0][2]=='follower' :
if tdSql.queryResult[2][2]=='leader': if tdSql.queryResult[1][2]=='follower':
print("three mnodes is ready in 10s") if tdSql.queryResult[2][2]=='leader':
break print("three mnodes is ready in 10s")
break
count+=1 count+=1
else: else:
print("three mnodes is not ready in 10s ") print("three mnodes is not ready in 10s ")
return -1
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
...@@ -146,24 +200,26 @@ class TDTestCase: ...@@ -146,24 +200,26 @@ class TDTestCase:
tdSql.checkData(2,3,'ready') tdSql.checkData(2,3,'ready')
def check3mnode1off(self): def check3mnode1off(self):
tdSql.error("drop mnode on dnode 1;")
count=0 count=0
while count < 10: while count < 10:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='offline' : print("mnode is three nodes")
if tdSql.queryResult[1][2]=='leader': if tdSql.queryResult[0][2]=='offline' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='leader':
print("stop mnodes on dnode 2 successfully in 10s") if tdSql.queryResult[2][2]=='follower':
break print("stop mnodes on dnode 2 successfully in 10s")
elif tdSql.queryResult[1][2]=='follower': break
if tdSql.queryResult[2][2]=='leader': elif tdSql.queryResult[1][2]=='follower':
print("stop mnodes on dnode 2 successfully in 10s") if tdSql.queryResult[2][2]=='leader':
break print("stop mnodes on dnode 2 successfully in 10s")
break
count+=1 count+=1
else: else:
print("stop mnodes on dnode 2 failed in 10s ") print("stop mnodes on dnode 2 failed in 10s ")
return -1
tdSql.error("drop mnode on dnode 1;")
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
...@@ -176,20 +232,22 @@ class TDTestCase: ...@@ -176,20 +232,22 @@ class TDTestCase:
tdSql.checkData(2,3,'ready') tdSql.checkData(2,3,'ready')
def check3mnode2off(self): def check3mnode2off(self):
tdSql.error("drop mnode on dnode 2;")
count=0 count=0
while count < 40: while count < 40:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='leader' : print("mnode is three nodes")
if tdSql.queryResult[1][2]=='offline': if tdSql.queryResult[0][2]=='leader' :
if tdSql.queryResult[2][2]=='follower': if tdSql.queryResult[1][2]=='offline':
print("stop mnodes on dnode 2 successfully in 10s") if tdSql.queryResult[2][2]=='follower':
break print("stop mnodes on dnode 2 successfully in 10s")
break
count+=1 count+=1
else: else:
print("stop mnodes on dnode 2 failed in 10s ") print("stop mnodes on dnode 2 failed in 10s ")
return -1
tdSql.error("drop mnode on dnode 2;")
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
...@@ -204,21 +262,22 @@ class TDTestCase: ...@@ -204,21 +262,22 @@ class TDTestCase:
tdSql.checkData(2,3,'ready') tdSql.checkData(2,3,'ready')
def check3mnode3off(self): def check3mnode3off(self):
tdSql.error("drop mnode on dnode 3;")
count=0 count=0
while count < 10: while count < 10:
time.sleep(1) time.sleep(1)
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
if tdSql.checkRows(3) : if tdSql.checkRows(3) :
if tdSql.queryResult[0][2]=='leader' : print("mnode is three nodes")
if tdSql.queryResult[2][2]=='offline': if tdSql.queryResult[0][2]=='leader' :
if tdSql.queryResult[1][2]=='follower': if tdSql.queryResult[2][2]=='offline':
print("stop mnodes on dnode 3 successfully in 10s") if tdSql.queryResult[1][2]=='follower':
break print("stop mnodes on dnode 3 successfully in 10s")
break
count+=1 count+=1
else: else:
print("stop mnodes on dnode 3 failed in 10s") print("stop mnodes on dnode 3 failed in 10s")
return -1
tdSql.error("drop mnode on dnode 3;")
tdSql.query("show mnodes;") tdSql.query("show mnodes;")
tdSql.checkRows(3) tdSql.checkRows(3)
tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,1,'%s:6030'%self.host)
...@@ -231,8 +290,6 @@ class TDTestCase: ...@@ -231,8 +290,6 @@ class TDTestCase:
tdSql.checkData(2,2,'offline') tdSql.checkData(2,2,'offline')
tdSql.checkData(2,3,'ready') tdSql.checkData(2,3,'ready')
def five_dnode_three_mnode(self,dnodenumber): def five_dnode_three_mnode(self,dnodenumber):
tdSql.query("show dnodes;") tdSql.query("show dnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host) tdSql.checkData(0,1,'%s:6030'%self.host)
...@@ -252,43 +309,29 @@ class TDTestCase: ...@@ -252,43 +309,29 @@ class TDTestCase:
# fisrt check statut ready # fisrt check statut ready
self.check3mnode() self.check3mnode()
tdSql.error("create mnode on dnode 2") tdSql.error("create mnode on dnode 2")
tdSql.query("show dnodes;") tdSql.query("show dnodes;")
print(tdSql.queryResult) print(tdSql.queryResult)
tdLog.debug("stop all of mnode ")
tdLog.debug("stop and follower of mnode")
self.TDDnodes.stoptaosd(2)
self.check3mnode2off()
self.TDDnodes.starttaosd(2)
self.TDDnodes.stoptaosd(3)
self.check3mnode3off()
self.TDDnodes.starttaosd(3)
self.TDDnodes.stoptaosd(1)
self.check3mnode1off()
self.TDDnodes.starttaosd(1)
# self.check3mnode()
stopcount =0 stopcount =0
while stopcount <= 2: while stopcount <= 2:
for i in range(dnodenumber): for i in range(dnodenumber):
threads = [] # threads=[]
threads.append(thd.Thread(target=self.insert_data, args=(i*2,i*2+2))) # threads = MyThreadFunc(self.insert_data(i*2,i*2+2))
threads[0].start() threads=threading.Thread(target=self.insert_data, args=((stopcount+i)*2,(i+stopcount)*2+2))
threads.start()
self.TDDnodes.stoptaosd(i+1) self.TDDnodes.stoptaosd(i+1)
# if i == 1 :
# self.check3mnode2off()
# elif i == 2 :
# self.check3mnode3off()
# elif i == 0:
# self.check3mnode1off()
self.TDDnodes.starttaosd(i+1) self.TDDnodes.starttaosd(i+1)
threads[0].join()
if self.checkdnodes(5):
print("123")
threads.join()
else:
print("456")
self.stop_thread(threads)
assert 1 == 2 ,"some dnode started failed"
return False
# self.check3mnode() # self.check3mnode()
stopcount+=1 stopcount+=1
self.check3mnode() self.check3mnode()
......
...@@ -59,8 +59,8 @@ class TDTestCase: ...@@ -59,8 +59,8 @@ class TDTestCase:
def initConsumerTable(self,cdbName='cdb'): def initConsumerTable(self,cdbName='cdb'):
tdLog.info("create consume database, and consume info table, and consume result table") tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
# tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
# tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
......
...@@ -101,7 +101,7 @@ python3 ./test.py -f 2-query/tail.py ...@@ -101,7 +101,7 @@ python3 ./test.py -f 2-query/tail.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode1mnode.py
python3 ./test.py -f 6-cluster/5dnode2mnode.py python3 ./test.py -f 6-cluster/5dnode2mnode.py
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py
python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册