dnodes.py 32.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import sys
import os
import os.path
import platform
sangshuduo's avatar
sangshuduo 已提交
18
import distro
19 20
import subprocess
from time import sleep
wafwerar's avatar
wafwerar 已提交
21 22 23 24
import base64
import json
import copy
from fabric2 import Connection
25
from util.log import *
sangshuduo's avatar
sangshuduo 已提交
26
from shutil import which
27 28 29 30 31 32 33


class TDSimClient:
    def __init__(self, path):
        self.testCluster = False
        self.path = path
        self.cfgDict = {
34
            "fqdn": "localhost",
35 36 37 38
            "numOfLogLines": "100000000",
            "locale": "en_US.UTF-8",
            "charset": "UTF-8",
            "asyncLog": "0",
S
Shengliang Guan 已提交
39
            "rpcDebugFlag": "143",
40
            "tmrDebugFlag": "131",
S
Shengliang Guan 已提交
41
            "cDebugFlag": "143",
C
cpwu 已提交
42 43 44
            "uDebugFlag": "143",
            "jniDebugFlag": "143",
            "qDebugFlag": "143",
45
            "supportVnodes": "1024",
H
Hui Li 已提交
46
            "enableQueryHb": "1",
47
            "telemetryReporting": "0",
48 49
            "tqDebugflag": "135",
            "wDebugflag":"135",
50 51 52
        }

    def getLogDir(self):
X
Xuefeng Tan 已提交
53
        self.logDir = os.path.join(self.path,"sim","psim","log")
54 55 56
        return self.logDir

    def getCfgDir(self):
X
Xuefeng Tan 已提交
57
        self.cfgDir = os.path.join(self.path,"sim","psim","cfg")
58 59 60 61 62 63 64 65 66 67 68 69 70
        return self.cfgDir

    def setTestCluster(self, value):
        self.testCluster = value

    def addExtraCfg(self, option, value):
        self.cfgDict.update({option: value})

    def cfg(self, option, value):
        cmd = "echo %s %s >> %s" % (option, value, self.cfgPath)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

71
    def deploy(self, *updatecfgDict):
X
Xuefeng Tan 已提交
72 73 74
        self.logDir = os.path.join(self.path,"sim","psim","log")
        self.cfgDir = os.path.join(self.path,"sim","psim","cfg")
        self.cfgPath = os.path.join(self.path,"sim","psim","cfg","taos.cfg")
75 76 77 78

        cmd = "rm -rf " + self.logDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)
79

wafwerar's avatar
wafwerar 已提交
80 81 82 83
        # cmd = "mkdir -p " + self.logDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.logDir)
84 85 86 87 88

        cmd = "rm -rf " + self.cfgDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

wafwerar's avatar
wafwerar 已提交
89 90 91 92
        # cmd = "mkdir -p " + self.cfgDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.cfgDir)
93 94 95 96 97 98 99 100 101 102 103 104

        cmd = "touch " + self.cfgPath
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        if self.testCluster:
            self.cfg("masterIp", "192.168.0.1")
            self.cfg("secondIp", "192.168.0.2")
        self.cfg("logDir", self.logDir)

        for key, value in self.cfgDict.items():
            self.cfg(key, value)
C
cpwu 已提交
105

106
        try:
C
cpwu 已提交
107
            if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
108 109 110 111 112
                clientCfg = dict (updatecfgDict[0][0].get('clientCfg'))
                for key, value in clientCfg.items():
                    self.cfg(key, value)
        except Exception:
            pass
113 114 115 116 117 118 119 120 121 122 123

        tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath))


class TDDnode:
    def __init__(self, index):
        self.index = index
        self.running = 0
        self.deployed = 0
        self.testCluster = False
        self.valgrind = 0
S
Shengliang Guan 已提交
124
        self.asan = False
wafwerar's avatar
wafwerar 已提交
125
        self.remoteIP = ""
126
        self.cfgDict = {
127
            "fqdn": "localhost",
128 129 130 131 132
            "monitor": "0",
            "maxShellConns": "30000",
            "locale": "en_US.UTF-8",
            "charset": "UTF-8",
            "asyncLog": "0",
S
Shengliang Guan 已提交
133 134 135 136 137 138 139 140
            "mDebugFlag": "143",
            "dDebugFlag": "143",
            "vDebugFlag": "143",
            "tqDebugFlag": "143",
            "cDebugFlag": "143",
            "jniDebugFlag": "143",
            "qDebugFlag": "143",
            "rpcDebugFlag": "143",
141
            "tmrDebugFlag": "131",
S
Shengliang Guan 已提交
142
            "uDebugFlag": "143",
S
Shengliang Guan 已提交
143
            "sDebugFlag": "143",
S
Shengliang Guan 已提交
144 145 146
            "wDebugFlag": "143",
            "numOfLogLines": "100000000",
            "statusInterval": "1",
H
Hui Li 已提交
147
            "enableQueryHb": "1",
148
            "supportVnodes": "1024",
S
Shengliang Guan 已提交
149
            "telemetryReporting": "0"
150 151
        }

wafwerar's avatar
wafwerar 已提交
152
    def init(self, path, remoteIP = ""):
153
        self.path = path
wafwerar's avatar
wafwerar 已提交
154
        self.remoteIP = remoteIP
wafwerar's avatar
wafwerar 已提交
155 156 157 158 159 160
        if (not self.remoteIP == ""):
            try:
                self.config = eval(self.remoteIP)
                self.remote_conn = Connection(host=self.config["host"], port=self.config["port"], user=self.config["user"], connect_kwargs={'password':self.config["password"]})
            except Exception as r:
                print(r)
161 162 163 164 165 166 167

    def setTestCluster(self, value):
        self.testCluster = value

    def setValgrind(self, value):
        self.valgrind = value

S
Shengliang Guan 已提交
168 169 170
    def setAsan(self, value):
        self.asan = value
        if value:
S
Shengliang Guan 已提交
171 172
            selfPath = os.path.dirname(os.path.realpath(__file__))
            if ("community" in selfPath):
sangshuduo's avatar
sangshuduo 已提交
173
                self.execPath = os.path.abspath(self.path + "/community/tests/script/sh/exec.sh")
S
Shengliang Guan 已提交
174
            else:
sangshuduo's avatar
sangshuduo 已提交
175
                self.execPath = os.path.abspath(self.path + "/tests/script/sh/exec.sh")
S
Shengliang Guan 已提交
176

177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
    def getDataSize(self):
        totalSize = 0

        if (self.deployed == 1):
            for dirpath, dirnames, filenames in os.walk(self.dataDir):
                for f in filenames:
                    fp = os.path.join(dirpath, f)

                    if not os.path.islink(fp):
                        totalSize = totalSize + os.path.getsize(fp)

        return totalSize

    def addExtraCfg(self, option, value):
        self.cfgDict.update({option: value})

wafwerar's avatar
wafwerar 已提交
193 194 195 196 197 198 199 200 201 202 203 204 205
    def remoteExec(self, updateCfgDict, execCmd):
        valgrindStr = ''
        if (self.valgrind==1):
            valgrindStr = '-g'
        remoteCfgDict = copy.deepcopy(updateCfgDict)
        if ("logDir" in remoteCfgDict):
            del remoteCfgDict["logDir"]
        if ("dataDir" in remoteCfgDict):
            del remoteCfgDict["dataDir"]
        if ("cfgDir" in remoteCfgDict):
            del remoteCfgDict["cfgDir"]
        remoteCfgDictStr = base64.b64encode(json.dumps(remoteCfgDict).encode()).decode()
        execCmdStr = base64.b64encode(execCmd.encode()).decode()
wafwerar's avatar
wafwerar 已提交
206 207
        with self.remote_conn.cd((self.config["path"]+sys.path[0].replace(self.path, '')).replace('\\','/')):
            self.remote_conn.run("python3 ./test.py %s -d %s -e %s"%(valgrindStr,remoteCfgDictStr,execCmdStr))
wafwerar's avatar
wafwerar 已提交
208

209
    def deploy(self, *updatecfgDict):
X
Xuefeng Tan 已提交
210 211 212 213
        self.logDir = os.path.join(self.path,"sim","dnode%d" % self.index, "log")
        self.dataDir = os.path.join(self.path,"sim","dnode%d" % self.index, "data")
        self.cfgDir = os.path.join(self.path,"sim","dnode%d" % self.index, "cfg")
        self.cfgPath = os.path.join(self.path,"sim","dnode%d" % self.index, "cfg","taos.cfg")
214 215 216 217 218 219 220 221 222 223 224 225 226

        cmd = "rm -rf " + self.dataDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        cmd = "rm -rf " + self.logDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        cmd = "rm -rf " + self.cfgDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

wafwerar's avatar
wafwerar 已提交
227 228 229 230
        # cmd = "mkdir -p " + self.dataDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.dataDir)
231

wafwerar's avatar
wafwerar 已提交
232 233 234 235
        # cmd = "mkdir -p " + self.logDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.logDir)
236

wafwerar's avatar
wafwerar 已提交
237 238 239 240
        # cmd = "mkdir -p " + self.cfgDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.cfgDir)
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260

        cmd = "touch " + self.cfgPath
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        if self.testCluster:
            self.startIP()

        if self.testCluster:
            self.cfg("masterIp", "192.168.0.1")
            self.cfg("secondIp", "192.168.0.2")
            self.cfg("publicIp", "192.168.0.%d" % (self.index))
            self.cfg("internalIp", "192.168.0.%d" % (self.index))
            self.cfg("privateIp", "192.168.0.%d" % (self.index))
        self.cfgDict["dataDir"] = self.dataDir
        self.cfgDict["logDir"] = self.logDir
        # self.cfg("dataDir",self.dataDir)
        # self.cfg("logDir",self.logDir)
        # print(updatecfgDict)
        isFirstDir = 1
261
        if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
262
            for key, value in updatecfgDict[0][0].items():
wafwerar's avatar
wafwerar 已提交
263
                if key == "clientCfg" and self.remoteIP == "" and not platform.system().lower() == 'windows':
264
                    continue
265
                if value == 'dataDir':
266 267
                    if isFirstDir:
                        self.cfgDict.pop('dataDir')
268
                        self.cfg(value, key)
269 270
                        isFirstDir = 0
                    else:
271
                        self.cfg(value, key)
272
                else:
273
                    self.addExtraCfg(key, value)
wafwerar's avatar
wafwerar 已提交
274 275 276 277 278
        if (self.remoteIP == ""):
            for key, value in self.cfgDict.items():
                self.cfg(key, value)
        else:
            self.remoteExec(self.cfgDict, "tdDnodes.deploy(%d,updateCfgDict)"%self.index)
279 280 281 282 283 284

        self.deployed = 1
        tdLog.debug(
            "dnode:%d is deployed and configured by %s" %
            (self.index, self.cfgPath))

285
    def getPath(self, tool="taosd"):
286 287 288 289 290 291 292
        selfPath = os.path.dirname(os.path.realpath(__file__))

        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("community")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

293
        paths = []
294
        for root, dirs, files in os.walk(projPath):
wafwerar's avatar
wafwerar 已提交
295
            if ((tool) in files or ("%s.exe"%tool) in files):
296 297
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
298
                    paths.append(os.path.join(root, tool))
299
                    break
wafwerar's avatar
wafwerar 已提交
300 301
        if (len(paths) == 0):
                return ""
302
        return paths[0]
303

haoranc's avatar
haoranc 已提交
304 305 306 307 308 309 310 311 312 313 314 315 316
    def starttaosd(self):
        binPath = self.getPath()

        if (binPath == ""):
            tdLog.exit("taosd not found!")
        else:
            tdLog.info("taosd found: %s" % binPath)

        if self.deployed == 0:
            tdLog.exit("dnode:%d is not deployed" % (self.index))

        if self.valgrind == 0:
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
317
                cmd = "mintty -h never %s -c %s" % (
haoranc's avatar
haoranc 已提交
318 319
                    binPath, self.cfgDir)
            else:
S
Shengliang Guan 已提交
320 321 322 323 324 325 326 327
                if self.asan:
                    asanDir = "%s/sim/asan/dnode%d.asan" % (
                        self.path, self.index)
                    cmd = "nohup %s -c %s > /dev/null 2> %s & " % (
                        binPath, self.cfgDir, asanDir)
                else:
                    cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
                        binPath, self.cfgDir)
haoranc's avatar
haoranc 已提交
328 329 330 331
        else:
            valgrindCmdline = "valgrind --log-file=\"%s/../log/valgrind.log\"  --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir

            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
332
                cmd = "mintty -h never %s %s -c %s" % (
haoranc's avatar
haoranc 已提交
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
                    valgrindCmdline, binPath, self.cfgDir)
            else:
                cmd = "nohup %s %s -c %s 2>&1 & " % (
                    valgrindCmdline, binPath, self.cfgDir)

            print(cmd)

        if (not self.remoteIP == ""):
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.start(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index))
            self.running = 1
        else:
            if os.system(cmd) != 0:
                tdLog.exit(cmd)
            self.running = 1
            tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
            if self.valgrind == 0:
                time.sleep(0.1)
                key1 = 'from offline to online'
                bkey1 = bytes(key1, encoding="utf8")
                key2= 'TDengine initialized successfully'
                bkey2 = bytes(key2, encoding="utf8")
                logFile = self.logDir + "/taosdlog.0"
                i = 0
haoranc's avatar
haoranc 已提交
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
                # while not os.path.exists(logFile):
                #     sleep(0.1)
                #     i += 1
                #     if i > 10:
                #         break
                # tailCmdStr = 'tail -f '
                # if platform.system().lower() == 'windows':
                #     tailCmdStr = 'tail -n +0 -f '
                # popen = subprocess.Popen(
                #     tailCmdStr + logFile,
                #     stdout=subprocess.PIPE,
                #     stderr=subprocess.PIPE,
                #     shell=True)
                # pid = popen.pid
                # # print('Popen.pid:' + str(pid))
                # timeout = time.time() + 60 * 2
haoranc's avatar
haoranc 已提交
372 373 374 375 376 377 378 379
                # while True:
                #     line = popen.stdout.readline().strip()
                #     print(line)
                #     if bkey1 in line:
                #         popen.kill()
                #         break
                #     elif bkey2 in line:
                #         popen.kill()
C
cpwu 已提交
380
                #         break
haoranc's avatar
haoranc 已提交
381 382 383 384 385 386 387 388 389 390
                #     if time.time() > timeout:
                #         print(time.time(),timeout)
                #         tdLog.exit('wait too long for taosd start')
                tdLog.debug("the dnode:%d has been started." % (self.index))
            else:
                tdLog.debug(
                    "wait 10 seconds for the dnode:%d to start." %
                    (self.index))
                time.sleep(10)

391
    def start(self):
392
        binPath = self.getPath()
393

394
        if (binPath == ""):
395 396
            tdLog.exit("taosd not found!")
        else:
397
            tdLog.info("taosd found: %s" % binPath)
398 399 400 401 402

        if self.deployed == 0:
            tdLog.exit("dnode:%d is not deployed" % (self.index))

        if self.valgrind == 0:
wafwerar's avatar
wafwerar 已提交
403
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
404
                cmd = "mintty -h never %s -c %s" % (
wafwerar's avatar
wafwerar 已提交
405 406
                    binPath, self.cfgDir)
            else:
S
Shengliang Guan 已提交
407
                if self.asan:
S
Shengliang Guan 已提交
408
                    asanDir = "%s/sim/asan/dnode%d.asan" % (
S
Shengliang Guan 已提交
409 410 411 412 413 414
                        self.path, self.index)
                    cmd = "nohup %s -c %s > /dev/null 2> %s & " % (
                        binPath, self.cfgDir, asanDir)
                else:
                    cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
                        binPath, self.cfgDir)
415
        else:
P
plum-lihui 已提交
416
            valgrindCmdline = "valgrind --log-file=\"%s/../log/valgrind.log\"  --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir
417

wafwerar's avatar
wafwerar 已提交
418
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
419
                cmd = "mintty -h never %s %s -c %s" % (
wafwerar's avatar
wafwerar 已提交
420 421 422 423
                    valgrindCmdline, binPath, self.cfgDir)
            else:
                cmd = "nohup %s %s -c %s 2>&1 & " % (
                    valgrindCmdline, binPath, self.cfgDir)
424 425 426

            print(cmd)

wafwerar's avatar
wafwerar 已提交
427
        if (not self.remoteIP == ""):
wafwerar's avatar
wafwerar 已提交
428
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.start(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index))
wafwerar's avatar
wafwerar 已提交
429
            self.running = 1
wafwerar's avatar
wafwerar 已提交
430
        else:
wafwerar's avatar
wafwerar 已提交
431
            os.system("rm -rf %s/taosdlog.0"%self.logDir)
wafwerar's avatar
wafwerar 已提交
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
            if os.system(cmd) != 0:
                tdLog.exit(cmd)
            self.running = 1
            tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
            if self.valgrind == 0:
                time.sleep(0.1)
                key = 'from offline to online'
                bkey = bytes(key, encoding="utf8")
                logFile = self.logDir + "/taosdlog.0"
                i = 0
                while not os.path.exists(logFile):
                    sleep(0.1)
                    i += 1
                    if i > 50:
                        break
wafwerar's avatar
wafwerar 已提交
447
                with open(logFile) as f:
448
                    timeout = time.time() + 10 * 2
wafwerar's avatar
wafwerar 已提交
449 450 451 452 453 454 455
                    while True:
                        line = f.readline().encode('utf-8')
                        if bkey in line:
                            break
                        if time.time() > timeout:
                            tdLog.exit('wait too long for taosd start')
                    tdLog.debug("the dnode:%d has been started." % (self.index))
wafwerar's avatar
wafwerar 已提交
456 457 458 459 460
            else:
                tdLog.debug(
                    "wait 10 seconds for the dnode:%d to start." %
                    (self.index))
                time.sleep(10)
461

462
    def startWithoutSleep(self):
463
        binPath = self.getPath()
464

465
        if (binPath == ""):
466 467
            tdLog.exit("taosd not found!")
        else:
468
            tdLog.info("taosd found: %s" % binPath)
469 470 471 472 473

        if self.deployed == 0:
            tdLog.exit("dnode:%d is not deployed" % (self.index))

        if self.valgrind == 0:
X
Xuefeng Tan 已提交
474 475
            if platform.system().lower() == 'windows':
                cmd = "mintty -h never %s -c %s" % (binPath, self.cfgDir)
S
Shengliang Guan 已提交
476
            else:
X
Xuefeng Tan 已提交
477 478 479 480 481 482 483 484
                if self.asan:
                    asanDir = "%s/sim/asan/dnode%d.asan" % (
                        self.path, self.index)
                    cmd = "nohup %s -c %s > /dev/null 2> %s & " % (
                        binPath, self.cfgDir, asanDir)
                else:
                    cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
                        binPath, self.cfgDir)
485
        else:
P
plum-lihui 已提交
486
            valgrindCmdline = "valgrind  --log-file=\"%s/../log/valgrind.log\"  --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir
X
Xuefeng Tan 已提交
487 488 489 490 491 492
            if platform.system().lower() == 'windows':
                cmd = "mintty -h never %s %s -c %s" % (
                    valgrindCmdline, binPath, self.cfgDir)
            else:
                cmd = "nohup %s %s -c %s 2>&1 & " % (
                    valgrindCmdline, binPath, self.cfgDir)
493 494
            print(cmd)

wafwerar's avatar
wafwerar 已提交
495 496 497 498
        if (self.remoteIP == ""):
            if os.system(cmd) != 0:
                tdLog.exit(cmd)
        else:
499
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.startWithoutSleep(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index))
wafwerar's avatar
wafwerar 已提交
500

501 502 503 504
        self.running = 1
        tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))

    def stop(self):
S
Shengliang Guan 已提交
505 506 507 508 509 510
        if self.asan:
            stopCmd = "%s -s stop -n dnode%d" % (self.execPath, self.index)
            tdLog.info("execute script: " + stopCmd)
            os.system(stopCmd)
            return

wafwerar's avatar
wafwerar 已提交
511
        if (not self.remoteIP == ""):
512
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].stop()"%(self.index-1,self.index-1))
wafwerar's avatar
wafwerar 已提交
513
            tdLog.info("stop dnode%d"%self.index)
wafwerar's avatar
wafwerar 已提交
514
            return
515 516 517 518 519 520
        if self.valgrind == 0:
            toBeKilled = "taosd"
        else:
            toBeKilled = "valgrind.bin"

        if self.running != 0:
wafwerar's avatar
wafwerar 已提交
521
            psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs" % toBeKilled
522
            processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
523
                psCmd, shell=True).decode("utf-8").strip()
C
cpwu 已提交
524

wafwerar's avatar
wafwerar 已提交
525
            onlyKillOnceWindows = 0
526
            while(processID):
wafwerar's avatar
wafwerar 已提交
527
                if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
haoranc's avatar
haoranc 已提交
528
                    killCmd = "kill -INT %s > /dev/null 2>&1" % processID
wafwerar's avatar
wafwerar 已提交
529 530
                    if platform.system().lower() == 'windows':
                        killCmd = "kill -INT %s > nul 2>&1" % processID
wafwerar's avatar
wafwerar 已提交
531 532
                    os.system(killCmd)
                    onlyKillOnceWindows = 1
533 534
                time.sleep(1)
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
535
                    psCmd, shell=True).decode("utf-8").strip()
wafwerar's avatar
wafwerar 已提交
536 537
            if not platform.system().lower() == 'windows':
                for port in range(6030, 6041):
wafwerar's avatar
wafwerar 已提交
538
                    fuserCmd = "fuser -k -n tcp %d > /dev/null" % port
wafwerar's avatar
wafwerar 已提交
539
                    os.system(fuserCmd)
540 541 542 543
            if self.valgrind:
                time.sleep(2)

            self.running = 0
haoranc's avatar
haoranc 已提交
544
            tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index))
545

haoranc's avatar
haoranc 已提交
546 547

    def stoptaosd(self):
S
Shengliang Guan 已提交
548 549 550 551 552 553
        if self.asan:
            stopCmd = "%s -s stop -n dnode%d" % (self.execPath, self.index)
            tdLog.info("execute script: " + stopCmd)
            os.system(stopCmd)
            return

haoranc's avatar
haoranc 已提交
554 555 556 557 558 559 560 561 562 563
        if (not self.remoteIP == ""):
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].stop()"%(self.index-1,self.index-1))
            tdLog.info("stop dnode%d"%self.index)
            return
        if self.valgrind == 0:
            toBeKilled = "taosd"
        else:
            toBeKilled = "valgrind.bin"

        if self.running != 0:
wafwerar's avatar
wafwerar 已提交
564
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
565
                psCmd = "for /f %%a in ('wmic process where \"name='taosd.exe' and CommandLine like '%%dnode%d%%'\" get processId ^| xargs echo ^| awk ^'{print $2}^' ^&^& echo aa') do @(ps | grep %%a | awk '{print $1}' | xargs)" % (self.index)
wafwerar's avatar
wafwerar 已提交
566
            else:
wafwerar's avatar
wafwerar 已提交
567
                psCmd = "ps -ef|grep -w %s| grep dnode%d|grep -v grep | awk '{print $2}' | xargs" % (toBeKilled,self.index)
wafwerar's avatar
wafwerar 已提交
568
            processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
569
                psCmd, shell=True).decode("utf-8").strip()
wafwerar's avatar
wafwerar 已提交
570

wafwerar's avatar
wafwerar 已提交
571
            onlyKillOnceWindows = 0
wafwerar's avatar
wafwerar 已提交
572
            while(processID):
wafwerar's avatar
wafwerar 已提交
573
                if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
haoranc's avatar
haoranc 已提交
574
                    killCmd = "kill -INT %s > /dev/null 2>&1" % processID
X
Xuefeng Tan 已提交
575 576
                    if platform.system().lower() == 'windows':
                        killCmd = "kill -INT %s > nul 2>&1" % processID
wafwerar's avatar
wafwerar 已提交
577 578
                    os.system(killCmd)
                    onlyKillOnceWindows = 1
wafwerar's avatar
wafwerar 已提交
579
                time.sleep(1)
haoranc's avatar
haoranc 已提交
580
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
581
                    psCmd, shell=True).decode("utf-8").strip()
wafwerar's avatar
wafwerar 已提交
582 583
            if self.valgrind:
                time.sleep(2)
haoranc's avatar
haoranc 已提交
584 585

            self.running = 0
haoranc's avatar
haoranc 已提交
586
            tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index))
haoranc's avatar
haoranc 已提交
587

588
    def forcestop(self):
S
Shengliang Guan 已提交
589 590 591 592 593 594 595
        if self.asan:
            stopCmd = "%s -s stop -n dnode%d -x SIGKILL" + \
                (self.execPath, self.index)
            tdLog.info("execute script: " + stopCmd)
            os.system(stopCmd)
            return

wafwerar's avatar
wafwerar 已提交
596
        if (not self.remoteIP == ""):
597
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].forcestop()"%(self.index-1,self.index-1))
wafwerar's avatar
wafwerar 已提交
598
            return
599 600 601 602 603 604
        if self.valgrind == 0:
            toBeKilled = "taosd"
        else:
            toBeKilled = "valgrind.bin"

        if self.running != 0:
wafwerar's avatar
wafwerar 已提交
605
            psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs" % toBeKilled
606
            processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
607
                psCmd, shell=True).decode("utf-8").strip()
608

wafwerar's avatar
wafwerar 已提交
609
            onlyKillOnceWindows = 0
610
            while(processID):
wafwerar's avatar
wafwerar 已提交
611 612 613 614
                if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
                    killCmd = "kill -KILL %s > /dev/null 2>&1" % processID
                    os.system(killCmd)
                    onlyKillOnceWindows = 1
615 616
                time.sleep(1)
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
617
                    psCmd, shell=True).decode("utf-8").strip()
618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
            for port in range(6030, 6041):
                fuserCmd = "fuser -k -n tcp %d" % port
                os.system(fuserCmd)
            if self.valgrind:
                time.sleep(2)

            self.running = 0
            tdLog.debug("dnode:%d is stopped by kill -KILL" % (self.index))

    def startIP(self):
        cmd = "sudo ifconfig lo:%d 192.168.0.%d up" % (self.index, self.index)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

    def stopIP(self):
        cmd = "sudo ifconfig lo:%d 192.168.0.%d down" % (
            self.index, self.index)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

    def cfg(self, option, value):
        cmd = "echo %s %s >> %s" % (option, value, self.cfgPath)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

    def getDnodeRootDir(self, index):
X
Xuefeng Tan 已提交
644
        dnodeRootDir = os.path.join(self.path,"sim","psim","dnode%d" % index)
645 646 647
        return dnodeRootDir

    def getDnodesRootDir(self):
X
Xuefeng Tan 已提交
648
        dnodesRootDir = os.path.join(self.path,"sim","psim")
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665
        return dnodesRootDir


class TDDnodes:
    def __init__(self):
        self.dnodes = []
        self.dnodes.append(TDDnode(1))
        self.dnodes.append(TDDnode(2))
        self.dnodes.append(TDDnode(3))
        self.dnodes.append(TDDnode(4))
        self.dnodes.append(TDDnode(5))
        self.dnodes.append(TDDnode(6))
        self.dnodes.append(TDDnode(7))
        self.dnodes.append(TDDnode(8))
        self.dnodes.append(TDDnode(9))
        self.dnodes.append(TDDnode(10))
        self.simDeployed = False
wafwerar's avatar
wafwerar 已提交
666 667
        self.testCluster = False
        self.valgrind = 0
S
Shengliang Guan 已提交
668
        self.asan = False
669
        self.killValgrind = 1
670

wafwerar's avatar
wafwerar 已提交
671
    def init(self, path, remoteIP = ""):
wafwerar's avatar
wafwerar 已提交
672
        binPath = self.dnodes[0].getPath() + "/../../../"
wafwerar's avatar
wafwerar 已提交
673
        # tdLog.debug("binPath %s" % (binPath))
674
        binPath = os.path.realpath(binPath)
wafwerar's avatar
wafwerar 已提交
675
        # tdLog.debug("binPath real path %s" % (binPath))
676 677 678 679 680 681 682

        if path == "":
            self.path = os.path.abspath(binPath + "../../")
        else:
            self.path = os.path.realpath(path)

        for i in range(len(self.dnodes)):
wafwerar's avatar
wafwerar 已提交
683
            self.dnodes[i].init(self.path, remoteIP)
684 685 686 687 688 689 690 691
        self.sim = TDSimClient(self.path)

    def setTestCluster(self, value):
        self.testCluster = value

    def setValgrind(self, value):
        self.valgrind = value

S
Shengliang Guan 已提交
692 693 694
    def setAsan(self, value):
        self.asan = value
        if value:
S
Shengliang Guan 已提交
695 696 697 698
            selfPath = os.path.dirname(os.path.realpath(__file__))
            if ("community" in selfPath):
                self.stopDnodesPath = os.path.abspath(self.path + "/community/tests/script/sh/stop_dnodes.sh")
                self.stopDnodesSigintPath = os.path.abspath(self.path + "/community/tests/script/sh/sigint_stop_dnodes.sh")
sangshuduo's avatar
sangshuduo 已提交
699
            else:
S
Shengliang Guan 已提交
700 701
                self.stopDnodesPath = os.path.abspath(self.path + "/tests/script/sh/stop_dnodes.sh")
                self.stopDnodesSigintPath = os.path.abspath(self.path + "/tests/script/sh/sigint_stop_dnodes.sh")
S
Shengliang Guan 已提交
702 703
            tdLog.info("run in address sanitizer mode")

704 705 706
    def setKillValgrind(self, value):
        self.killValgrind = value

707 708 709 710
    def deploy(self, index, *updatecfgDict):
        self.sim.setTestCluster(self.testCluster)

        if (self.simDeployed == False):
711
            self.sim.deploy(updatecfgDict)
712 713 714 715 716
            self.simDeployed = True

        self.check(index)
        self.dnodes[index - 1].setTestCluster(self.testCluster)
        self.dnodes[index - 1].setValgrind(self.valgrind)
S
Shengliang Guan 已提交
717
        self.dnodes[index - 1].setAsan(self.asan)
718 719 720 721 722 723
        self.dnodes[index - 1].deploy(updatecfgDict)

    def cfg(self, index, option, value):
        self.check(index)
        self.dnodes[index - 1].cfg(option, value)

haoranc's avatar
haoranc 已提交
724 725 726 727 728 729 730 731
    def starttaosd(self, index):
        self.check(index)
        self.dnodes[index - 1].starttaosd()

    def stoptaosd(self, index):
        self.check(index)
        self.dnodes[index - 1].stoptaosd()

732 733
    def start(self, index):
        self.check(index)
wafwerar's avatar
wafwerar 已提交
734
        self.dnodes[index - 1].start()
wafwerar's avatar
wafwerar 已提交
735

736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767
    def startWithoutSleep(self, index):
        self.check(index)
        self.dnodes[index - 1].startWithoutSleep()

    def stop(self, index):
        self.check(index)
        self.dnodes[index - 1].stop()

    def getDataSize(self, index):
        self.check(index)
        return self.dnodes[index - 1].getDataSize()

    def forcestop(self, index):
        self.check(index)
        self.dnodes[index - 1].forcestop()

    def startIP(self, index):
        self.check(index)

        if self.testCluster:
            self.dnodes[index - 1].startIP()

    def stopIP(self, index):
        self.check(index)

        if self.dnodes[index - 1].testCluster:
            self.dnodes[index - 1].stopIP()

    def check(self, index):
        if index < 1 or index > 10:
            tdLog.exit("index:%d should on a scale of [1, 10]" % (index))

S
Shengliang Guan 已提交
768
    def StopAllSigint(self):
S
Shengliang Guan 已提交
769
        tdLog.info("stop all dnodes sigint, asan:%d" % self.asan)
S
Shengliang Guan 已提交
770 771 772 773 774 775
        if self.asan:
            tdLog.info("execute script: %s" % self.stopDnodesSigintPath)
            os.system(self.stopDnodesSigintPath)
            tdLog.info("execute finished")
            return

776
    def stopAll(self):
S
Shengliang Guan 已提交
777
        tdLog.info("stop all dnodes, asan:%d" % self.asan)
sangshuduo's avatar
sangshuduo 已提交
778 779
        distro_id = distro.id()
        if self.asan and distro_id != "alpine":
S
Shengliang Guan 已提交
780 781 782 783 784
            tdLog.info("execute script: %s" % self.stopDnodesPath)
            os.system(self.stopDnodesPath)
            tdLog.info("execute finished")
            return

wafwerar's avatar
wafwerar 已提交
785 786 787
        if (not self.dnodes[0].remoteIP == ""):
            self.dnodes[0].remoteExec(self.dnodes[0].cfgDict, "for i in range(len(tdDnodes.dnodes)):\n    tdDnodes.dnodes[i].running=1\ntdDnodes.stopAll()")
            return
788 789 790
        for i in range(len(self.dnodes)):
            self.dnodes[i].stop()

sangshuduo's avatar
sangshuduo 已提交
791 792 793 794 795 796 797

        if (distro_id == "alpine"):
            print(distro_id)
            psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
            processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
            while(processID):
                print(processID)
X
Xuefeng Tan 已提交
798 799 800 801 802 803 804 805 806 807 808
                killCmd = "kill -9 %s > /dev/null 2>&1" % processID
                os.system(killCmd)
                time.sleep(1)
                processID = subprocess.check_output(
                    psCmd, shell=True).decode("utf-8").strip()
        elif platform.system().lower() == 'windows':
            psCmd = "for /f %a in ('wmic process where \"name='taosd.exe'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)"
            processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
            while(processID):
                print(processID)
                killCmd = "kill -9 %s > nul 2>&1" % processID
sangshuduo's avatar
sangshuduo 已提交
809 810 811 812 813 814 815 816 817 818 819 820 821 822 823
                os.system(killCmd)
                time.sleep(1)
                processID = subprocess.check_output(
                    psCmd, shell=True).decode("utf-8").strip()
        else:
            psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
            processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
            if processID:
                cmd = "sudo systemctl stop taosd"
                os.system(cmd)
            # if os.system(cmd) != 0 :
            # tdLog.exit(cmd)
            psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
            processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
            while(processID):
X
Xuefeng Tan 已提交
824
                killCmd = "kill -9 %s > /dev/null 2>&1" % processID
sangshuduo's avatar
sangshuduo 已提交
825 826 827 828
                os.system(killCmd)
                time.sleep(1)
                processID = subprocess.check_output(
                    psCmd, shell=True).decode("utf-8").strip()
829

830
        if self.killValgrind == 1:
wafwerar's avatar
wafwerar 已提交
831 832
            psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs"
            processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
833
            while(processID):
wafwerar's avatar
wafwerar 已提交
834 835 836 837
                if platform.system().lower() == 'windows':
                    killCmd = "kill -TERM %s > nul 2>&1" % processID
                else:
                    killCmd = "kill -TERM %s > /dev/null 2>&1" % processID
838 839 840
                os.system(killCmd)
                time.sleep(1)
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
841
                    psCmd, shell=True).decode("utf-8").strip()
842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858

        # if os.system(cmd) != 0 :
        # tdLog.exit(cmd)

    def getDnodesRootDir(self):
        dnodesRootDir = "%s/sim" % (self.path)
        return dnodesRootDir

    def getSimCfgPath(self):
        return self.sim.getCfgDir()

    def getSimLogPath(self):
        return self.sim.getLogDir()

    def addSimExtraCfg(self, option, value):
        self.sim.addExtraCfg(option, value)

859 860 861
    def getAsan(self):
        return self.asan

862
tdDnodes = TDDnodes()