dnodes.py 29.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import sys
import os
import os.path
import platform
import subprocess
from time import sleep
wafwerar's avatar
wafwerar 已提交
20 21 22 23
import base64
import json
import copy
from fabric2 import Connection
24 25 26 27 28 29 30 31 32 33 34 35
from util.log import *


class TDSimClient:
    def __init__(self, path):
        self.testCluster = False
        self.path = path
        self.cfgDict = {
            "numOfLogLines": "100000000",
            "locale": "en_US.UTF-8",
            "charset": "UTF-8",
            "asyncLog": "0",
S
Shengliang Guan 已提交
36
            "rpcDebugFlag": "143",
37
            "tmrDebugFlag": "131",
S
Shengliang Guan 已提交
38
            "cDebugFlag": "143",
C
cpwu 已提交
39 40 41
            "uDebugFlag": "143",
            "jniDebugFlag": "143",
            "qDebugFlag": "143",
42
            "supportVnodes": "1024",
H
Hui Li 已提交
43
            "enableQueryHb": "1",
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
            "telemetryReporting": "0",
        }

    def getLogDir(self):
        self.logDir = "%s/sim/psim/log" % (self.path)
        return self.logDir

    def getCfgDir(self):
        self.cfgDir = "%s/sim/psim/cfg" % (self.path)
        return self.cfgDir

    def setTestCluster(self, value):
        self.testCluster = value

    def addExtraCfg(self, option, value):
        self.cfgDict.update({option: value})

    def cfg(self, option, value):
        cmd = "echo %s %s >> %s" % (option, value, self.cfgPath)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

66
    def deploy(self, *updatecfgDict):
67 68 69 70 71 72 73
        self.logDir = "%s/sim/psim/log" % (self.path)
        self.cfgDir = "%s/sim/psim/cfg" % (self.path)
        self.cfgPath = "%s/sim/psim/cfg/taos.cfg" % (self.path)

        cmd = "rm -rf " + self.logDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)
74

wafwerar's avatar
wafwerar 已提交
75 76 77 78
        # cmd = "mkdir -p " + self.logDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.logDir)
79 80 81 82 83

        cmd = "rm -rf " + self.cfgDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

wafwerar's avatar
wafwerar 已提交
84 85 86 87
        # cmd = "mkdir -p " + self.cfgDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.cfgDir)
88 89 90 91 92 93 94 95 96 97 98 99

        cmd = "touch " + self.cfgPath
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        if self.testCluster:
            self.cfg("masterIp", "192.168.0.1")
            self.cfg("secondIp", "192.168.0.2")
        self.cfg("logDir", self.logDir)

        for key, value in self.cfgDict.items():
            self.cfg(key, value)
C
cpwu 已提交
100

101
        try:
C
cpwu 已提交
102
            if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
103 104 105 106 107
                clientCfg = dict (updatecfgDict[0][0].get('clientCfg'))
                for key, value in clientCfg.items():
                    self.cfg(key, value)
        except Exception:
            pass
108 109 110 111 112 113 114 115 116 117 118

        tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath))


class TDDnode:
    def __init__(self, index):
        self.index = index
        self.running = 0
        self.deployed = 0
        self.testCluster = False
        self.valgrind = 0
S
Shengliang Guan 已提交
119
        self.asan = False
wafwerar's avatar
wafwerar 已提交
120
        self.remoteIP = ""
121
        self.cfgDict = {
122 123 124 125 126
            "monitor": "0",
            "maxShellConns": "30000",
            "locale": "en_US.UTF-8",
            "charset": "UTF-8",
            "asyncLog": "0",
S
Shengliang Guan 已提交
127 128 129 130 131 132 133 134
            "mDebugFlag": "143",
            "dDebugFlag": "143",
            "vDebugFlag": "143",
            "tqDebugFlag": "143",
            "cDebugFlag": "143",
            "jniDebugFlag": "143",
            "qDebugFlag": "143",
            "rpcDebugFlag": "143",
135
            "tmrDebugFlag": "131",
S
Shengliang Guan 已提交
136
            "uDebugFlag": "143",
S
Shengliang Guan 已提交
137
            "sDebugFlag": "143",
S
Shengliang Guan 已提交
138 139 140
            "wDebugFlag": "143",
            "numOfLogLines": "100000000",
            "statusInterval": "1",
H
Hui Li 已提交
141
            "enableQueryHb": "1",
142
            "supportVnodes": "1024",
S
Shengliang Guan 已提交
143
            "telemetryReporting": "0"
144 145
        }

wafwerar's avatar
wafwerar 已提交
146
    def init(self, path, remoteIP = ""):
147
        self.path = path
wafwerar's avatar
wafwerar 已提交
148
        self.remoteIP = remoteIP
wafwerar's avatar
wafwerar 已提交
149 150 151 152 153 154
        if (not self.remoteIP == ""):
            try:
                self.config = eval(self.remoteIP)
                self.remote_conn = Connection(host=self.config["host"], port=self.config["port"], user=self.config["user"], connect_kwargs={'password':self.config["password"]})
            except Exception as r:
                print(r)
155 156 157 158 159 160 161

    def setTestCluster(self, value):
        self.testCluster = value

    def setValgrind(self, value):
        self.valgrind = value

S
Shengliang Guan 已提交
162 163 164 165 166
    def setAsan(self, value):
        self.asan = value
        if value:
            self.execPath = os.path.abspath(self.path + "/tests/script/sh/exec.sh")        

167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
    def getDataSize(self):
        totalSize = 0

        if (self.deployed == 1):
            for dirpath, dirnames, filenames in os.walk(self.dataDir):
                for f in filenames:
                    fp = os.path.join(dirpath, f)

                    if not os.path.islink(fp):
                        totalSize = totalSize + os.path.getsize(fp)

        return totalSize

    def addExtraCfg(self, option, value):
        self.cfgDict.update({option: value})

wafwerar's avatar
wafwerar 已提交
183 184 185 186 187 188 189 190 191 192 193 194 195
    def remoteExec(self, updateCfgDict, execCmd):
        valgrindStr = ''
        if (self.valgrind==1):
            valgrindStr = '-g'
        remoteCfgDict = copy.deepcopy(updateCfgDict)
        if ("logDir" in remoteCfgDict):
            del remoteCfgDict["logDir"]
        if ("dataDir" in remoteCfgDict):
            del remoteCfgDict["dataDir"]
        if ("cfgDir" in remoteCfgDict):
            del remoteCfgDict["cfgDir"]
        remoteCfgDictStr = base64.b64encode(json.dumps(remoteCfgDict).encode()).decode()
        execCmdStr = base64.b64encode(execCmd.encode()).decode()
wafwerar's avatar
wafwerar 已提交
196 197
        with self.remote_conn.cd((self.config["path"]+sys.path[0].replace(self.path, '')).replace('\\','/')):
            self.remote_conn.run("python3 ./test.py %s -d %s -e %s"%(valgrindStr,remoteCfgDictStr,execCmdStr))
wafwerar's avatar
wafwerar 已提交
198

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
    def deploy(self, *updatecfgDict):
        self.logDir = "%s/sim/dnode%d/log" % (self.path, self.index)
        self.dataDir = "%s/sim/dnode%d/data" % (self.path, self.index)
        self.cfgDir = "%s/sim/dnode%d/cfg" % (self.path, self.index)
        self.cfgPath = "%s/sim/dnode%d/cfg/taos.cfg" % (
            self.path, self.index)

        cmd = "rm -rf " + self.dataDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        cmd = "rm -rf " + self.logDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        cmd = "rm -rf " + self.cfgDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

wafwerar's avatar
wafwerar 已提交
218 219 220 221
        # cmd = "mkdir -p " + self.dataDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.dataDir)
222

wafwerar's avatar
wafwerar 已提交
223 224 225 226
        # cmd = "mkdir -p " + self.logDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.logDir)
227

wafwerar's avatar
wafwerar 已提交
228 229 230 231
        # cmd = "mkdir -p " + self.cfgDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.cfgDir)
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251

        cmd = "touch " + self.cfgPath
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        if self.testCluster:
            self.startIP()

        if self.testCluster:
            self.cfg("masterIp", "192.168.0.1")
            self.cfg("secondIp", "192.168.0.2")
            self.cfg("publicIp", "192.168.0.%d" % (self.index))
            self.cfg("internalIp", "192.168.0.%d" % (self.index))
            self.cfg("privateIp", "192.168.0.%d" % (self.index))
        self.cfgDict["dataDir"] = self.dataDir
        self.cfgDict["logDir"] = self.logDir
        # self.cfg("dataDir",self.dataDir)
        # self.cfg("logDir",self.logDir)
        # print(updatecfgDict)
        isFirstDir = 1
252
        if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
253
            for key, value in updatecfgDict[0][0].items():
wafwerar's avatar
wafwerar 已提交
254
                if key == "clientCfg" and self.remoteIP == "" and not platform.system().lower() == 'windows':
255
                    continue
256
                if value == 'dataDir':
257 258
                    if isFirstDir:
                        self.cfgDict.pop('dataDir')
259
                        self.cfg(value, key)
260 261
                        isFirstDir = 0
                    else:
262
                        self.cfg(value, key)
263
                else:
264
                    self.addExtraCfg(key, value)
wafwerar's avatar
wafwerar 已提交
265 266 267 268 269
        if (self.remoteIP == ""):
            for key, value in self.cfgDict.items():
                self.cfg(key, value)
        else:
            self.remoteExec(self.cfgDict, "tdDnodes.deploy(%d,updateCfgDict)"%self.index)
270 271 272 273 274 275

        self.deployed = 1
        tdLog.debug(
            "dnode:%d is deployed and configured by %s" %
            (self.index, self.cfgPath))

276
    def getPath(self, tool="taosd"):
277 278 279 280 281 282 283
        selfPath = os.path.dirname(os.path.realpath(__file__))

        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("community")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

284
        paths = []
285
        for root, dirs, files in os.walk(projPath):
wafwerar's avatar
wafwerar 已提交
286
            if ((tool) in files or ("%s.exe"%tool) in files):
287 288
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
289
                    paths.append(os.path.join(root, tool))
290
                    break
wafwerar's avatar
wafwerar 已提交
291 292
        if (len(paths) == 0):
                return ""
293
        return paths[0]
294

haoranc's avatar
haoranc 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307
    def starttaosd(self):
        binPath = self.getPath()

        if (binPath == ""):
            tdLog.exit("taosd not found!")
        else:
            tdLog.info("taosd found: %s" % binPath)

        if self.deployed == 0:
            tdLog.exit("dnode:%d is not deployed" % (self.index))

        if self.valgrind == 0:
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
308
                cmd = "mintty -h never %s -c %s" % (
haoranc's avatar
haoranc 已提交
309 310 311 312 313 314 315 316
                    binPath, self.cfgDir)
            else:
                cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
                    binPath, self.cfgDir)
        else:
            valgrindCmdline = "valgrind --log-file=\"%s/../log/valgrind.log\"  --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir

            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
317
                cmd = "mintty -h never %s %s -c %s" % (
haoranc's avatar
haoranc 已提交
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
                    valgrindCmdline, binPath, self.cfgDir)
            else:
                cmd = "nohup %s %s -c %s 2>&1 & " % (
                    valgrindCmdline, binPath, self.cfgDir)

            print(cmd)

        if (not self.remoteIP == ""):
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.start(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index))
            self.running = 1
        else:
            if os.system(cmd) != 0:
                tdLog.exit(cmd)
            self.running = 1
            tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
            if self.valgrind == 0:
                time.sleep(0.1)
                key1 = 'from offline to online'
                bkey1 = bytes(key1, encoding="utf8")
                key2= 'TDengine initialized successfully'
                bkey2 = bytes(key2, encoding="utf8")
                logFile = self.logDir + "/taosdlog.0"
                i = 0
haoranc's avatar
haoranc 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
                # while not os.path.exists(logFile):
                #     sleep(0.1)
                #     i += 1
                #     if i > 10:
                #         break
                # tailCmdStr = 'tail -f '
                # if platform.system().lower() == 'windows':
                #     tailCmdStr = 'tail -n +0 -f '
                # popen = subprocess.Popen(
                #     tailCmdStr + logFile,
                #     stdout=subprocess.PIPE,
                #     stderr=subprocess.PIPE,
                #     shell=True)
                # pid = popen.pid
                # # print('Popen.pid:' + str(pid))
                # timeout = time.time() + 60 * 2
haoranc's avatar
haoranc 已提交
357 358 359 360 361 362 363 364
                # while True:
                #     line = popen.stdout.readline().strip()
                #     print(line)
                #     if bkey1 in line:
                #         popen.kill()
                #         break
                #     elif bkey2 in line:
                #         popen.kill()
C
cpwu 已提交
365
                #         break
haoranc's avatar
haoranc 已提交
366 367 368 369 370 371 372 373 374 375
                #     if time.time() > timeout:
                #         print(time.time(),timeout)
                #         tdLog.exit('wait too long for taosd start')
                tdLog.debug("the dnode:%d has been started." % (self.index))
            else:
                tdLog.debug(
                    "wait 10 seconds for the dnode:%d to start." %
                    (self.index))
                time.sleep(10)

376
    def start(self):
377
        binPath = self.getPath()
378

379
        if (binPath == ""):
380 381
            tdLog.exit("taosd not found!")
        else:
382
            tdLog.info("taosd found: %s" % binPath)
383 384 385 386 387

        if self.deployed == 0:
            tdLog.exit("dnode:%d is not deployed" % (self.index))

        if self.valgrind == 0:
wafwerar's avatar
wafwerar 已提交
388
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
389
                cmd = "mintty -h never %s -c %s" % (
wafwerar's avatar
wafwerar 已提交
390 391
                    binPath, self.cfgDir)
            else:
S
Shengliang Guan 已提交
392
                if self.asan:
S
Shengliang Guan 已提交
393
                    asanDir = "%s/sim/asan/dnode%d.asan" % (
S
Shengliang Guan 已提交
394 395 396 397 398 399
                        self.path, self.index)
                    cmd = "nohup %s -c %s > /dev/null 2> %s & " % (
                        binPath, self.cfgDir, asanDir)
                else:
                    cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
                        binPath, self.cfgDir)
400
        else:
P
plum-lihui 已提交
401
            valgrindCmdline = "valgrind --log-file=\"%s/../log/valgrind.log\"  --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir
402

wafwerar's avatar
wafwerar 已提交
403
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
404
                cmd = "mintty -h never %s %s -c %s" % (
wafwerar's avatar
wafwerar 已提交
405 406 407 408
                    valgrindCmdline, binPath, self.cfgDir)
            else:
                cmd = "nohup %s %s -c %s 2>&1 & " % (
                    valgrindCmdline, binPath, self.cfgDir)
409 410 411

            print(cmd)

wafwerar's avatar
wafwerar 已提交
412
        if (not self.remoteIP == ""):
wafwerar's avatar
wafwerar 已提交
413
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.start(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index))
wafwerar's avatar
wafwerar 已提交
414
            self.running = 1
wafwerar's avatar
wafwerar 已提交
415
        else:
wafwerar's avatar
wafwerar 已提交
416
            os.system("rm -rf %s/taosdlog.0"%self.logDir)
wafwerar's avatar
wafwerar 已提交
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
            if os.system(cmd) != 0:
                tdLog.exit(cmd)
            self.running = 1
            tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
            if self.valgrind == 0:
                time.sleep(0.1)
                key = 'from offline to online'
                bkey = bytes(key, encoding="utf8")
                logFile = self.logDir + "/taosdlog.0"
                i = 0
                while not os.path.exists(logFile):
                    sleep(0.1)
                    i += 1
                    if i > 50:
                        break
wafwerar's avatar
wafwerar 已提交
432
                with open(logFile) as f:
433
                    timeout = time.time() + 10 * 2
wafwerar's avatar
wafwerar 已提交
434 435 436 437 438 439 440
                    while True:
                        line = f.readline().encode('utf-8')
                        if bkey in line:
                            break
                        if time.time() > timeout:
                            tdLog.exit('wait too long for taosd start')
                    tdLog.debug("the dnode:%d has been started." % (self.index))
wafwerar's avatar
wafwerar 已提交
441 442 443 444 445
            else:
                tdLog.debug(
                    "wait 10 seconds for the dnode:%d to start." %
                    (self.index))
                time.sleep(10)
446

447
    def startWithoutSleep(self):
448
        binPath = self.getPath()
449

450
        if (binPath == ""):
451 452
            tdLog.exit("taosd not found!")
        else:
453
            tdLog.info("taosd found: %s" % binPath)
454 455 456 457 458

        if self.deployed == 0:
            tdLog.exit("dnode:%d is not deployed" % (self.index))

        if self.valgrind == 0:
S
Shengliang Guan 已提交
459
            if self.asan:
S
Shengliang Guan 已提交
460
               asanDir = "%s/sim/asan/dnode%d.asan" % (
S
Shengliang Guan 已提交
461 462 463 464 465 466
                   self.path, self.index)
               cmd = "nohup %s -c %s > /dev/null 2> %s & " % (
                   binPath, self.cfgDir, asanDir)
            else:
                cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
                    binPath, self.cfgDir)
467
        else:
P
plum-lihui 已提交
468
            valgrindCmdline = "valgrind  --log-file=\"%s/../log/valgrind.log\"  --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir
469 470 471 472 473 474

            cmd = "nohup %s %s -c %s 2>&1 & " % (
                valgrindCmdline, binPath, self.cfgDir)

            print(cmd)

wafwerar's avatar
wafwerar 已提交
475 476 477 478
        if (self.remoteIP == ""):
            if os.system(cmd) != 0:
                tdLog.exit(cmd)
        else:
479
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.startWithoutSleep(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index))
wafwerar's avatar
wafwerar 已提交
480

481 482 483 484
        self.running = 1
        tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))

    def stop(self):
S
Shengliang Guan 已提交
485 486 487 488 489 490
        if self.asan:
            stopCmd = "%s -s stop -n dnode%d" % (self.execPath, self.index)
            tdLog.info("execute script: " + stopCmd)
            os.system(stopCmd)
            return

wafwerar's avatar
wafwerar 已提交
491
        if (not self.remoteIP == ""):
492
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].stop()"%(self.index-1,self.index-1))
wafwerar's avatar
wafwerar 已提交
493
            tdLog.info("stop dnode%d"%self.index)
wafwerar's avatar
wafwerar 已提交
494
            return
495 496 497 498 499 500
        if self.valgrind == 0:
            toBeKilled = "taosd"
        else:
            toBeKilled = "valgrind.bin"

        if self.running != 0:
wafwerar's avatar
wafwerar 已提交
501
            psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs" % toBeKilled
502
            processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
503
                psCmd, shell=True).decode("utf-8").strip()
C
cpwu 已提交
504

wafwerar's avatar
wafwerar 已提交
505
            onlyKillOnceWindows = 0
506
            while(processID):
wafwerar's avatar
wafwerar 已提交
507
                if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
haoranc's avatar
haoranc 已提交
508
                    killCmd = "kill -INT %s > /dev/null 2>&1" % processID
wafwerar's avatar
wafwerar 已提交
509 510
                    if platform.system().lower() == 'windows':
                        killCmd = "kill -INT %s > nul 2>&1" % processID
wafwerar's avatar
wafwerar 已提交
511 512
                    os.system(killCmd)
                    onlyKillOnceWindows = 1
513 514
                time.sleep(1)
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
515
                    psCmd, shell=True).decode("utf-8").strip()
wafwerar's avatar
wafwerar 已提交
516 517
            if not platform.system().lower() == 'windows':
                for port in range(6030, 6041):
wafwerar's avatar
wafwerar 已提交
518
                    fuserCmd = "fuser -k -n tcp %d > /dev/null" % port
wafwerar's avatar
wafwerar 已提交
519
                    os.system(fuserCmd)
520 521 522 523
            if self.valgrind:
                time.sleep(2)

            self.running = 0
haoranc's avatar
haoranc 已提交
524
            tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index))
525

haoranc's avatar
haoranc 已提交
526 527

    def stoptaosd(self):
S
Shengliang Guan 已提交
528 529 530 531 532 533
        if self.asan:
            stopCmd = "%s -s stop -n dnode%d" % (self.execPath, self.index)
            tdLog.info("execute script: " + stopCmd)
            os.system(stopCmd)
            return

haoranc's avatar
haoranc 已提交
534 535 536 537 538 539 540 541 542 543
        if (not self.remoteIP == ""):
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].stop()"%(self.index-1,self.index-1))
            tdLog.info("stop dnode%d"%self.index)
            return
        if self.valgrind == 0:
            toBeKilled = "taosd"
        else:
            toBeKilled = "valgrind.bin"

        if self.running != 0:
wafwerar's avatar
wafwerar 已提交
544
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
545
                psCmd = "for /f %%a in ('wmic process where \"name='taosd.exe' and CommandLine like '%%dnode%d%%'\" get processId ^| xargs echo ^| awk ^'{print $2}^' ^&^& echo aa') do @(ps | grep %%a | awk '{print $1}' | xargs)" % (self.index)
wafwerar's avatar
wafwerar 已提交
546
            else:
wafwerar's avatar
wafwerar 已提交
547
                psCmd = "ps -ef|grep -w %s| grep dnode%d|grep -v grep | awk '{print $2}' | xargs" % (toBeKilled,self.index)
wafwerar's avatar
wafwerar 已提交
548
            processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
549
                psCmd, shell=True).decode("utf-8").strip()
wafwerar's avatar
wafwerar 已提交
550

wafwerar's avatar
wafwerar 已提交
551
            onlyKillOnceWindows = 0
wafwerar's avatar
wafwerar 已提交
552
            while(processID):
wafwerar's avatar
wafwerar 已提交
553
                if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
haoranc's avatar
haoranc 已提交
554
                    killCmd = "kill -INT %s > /dev/null 2>&1" % processID
wafwerar's avatar
wafwerar 已提交
555 556
                    os.system(killCmd)
                    onlyKillOnceWindows = 1
wafwerar's avatar
wafwerar 已提交
557
                time.sleep(1)
haoranc's avatar
haoranc 已提交
558
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
559
                    psCmd, shell=True).decode("utf-8").strip()
wafwerar's avatar
wafwerar 已提交
560 561
            if self.valgrind:
                time.sleep(2)
haoranc's avatar
haoranc 已提交
562 563

            self.running = 0
haoranc's avatar
haoranc 已提交
564
            tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index))
haoranc's avatar
haoranc 已提交
565

566
    def forcestop(self):
S
Shengliang Guan 已提交
567 568 569 570 571 572 573
        if self.asan:
            stopCmd = "%s -s stop -n dnode%d -x SIGKILL" + \
                (self.execPath, self.index)
            tdLog.info("execute script: " + stopCmd)
            os.system(stopCmd)
            return

wafwerar's avatar
wafwerar 已提交
574
        if (not self.remoteIP == ""):
575
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].forcestop()"%(self.index-1,self.index-1))
wafwerar's avatar
wafwerar 已提交
576
            return
577 578 579 580 581 582
        if self.valgrind == 0:
            toBeKilled = "taosd"
        else:
            toBeKilled = "valgrind.bin"

        if self.running != 0:
wafwerar's avatar
wafwerar 已提交
583
            psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs" % toBeKilled
584
            processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
585
                psCmd, shell=True).decode("utf-8").strip()
586

wafwerar's avatar
wafwerar 已提交
587
            onlyKillOnceWindows = 0
588
            while(processID):
wafwerar's avatar
wafwerar 已提交
589 590 591 592
                if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
                    killCmd = "kill -KILL %s > /dev/null 2>&1" % processID
                    os.system(killCmd)
                    onlyKillOnceWindows = 1
593 594
                time.sleep(1)
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
595
                    psCmd, shell=True).decode("utf-8").strip()
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643
            for port in range(6030, 6041):
                fuserCmd = "fuser -k -n tcp %d" % port
                os.system(fuserCmd)
            if self.valgrind:
                time.sleep(2)

            self.running = 0
            tdLog.debug("dnode:%d is stopped by kill -KILL" % (self.index))

    def startIP(self):
        cmd = "sudo ifconfig lo:%d 192.168.0.%d up" % (self.index, self.index)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

    def stopIP(self):
        cmd = "sudo ifconfig lo:%d 192.168.0.%d down" % (
            self.index, self.index)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

    def cfg(self, option, value):
        cmd = "echo %s %s >> %s" % (option, value, self.cfgPath)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

    def getDnodeRootDir(self, index):
        dnodeRootDir = "%s/sim/psim/dnode%d" % (self.path, index)
        return dnodeRootDir

    def getDnodesRootDir(self):
        dnodesRootDir = "%s/sim/psim" % (self.path)
        return dnodesRootDir


class TDDnodes:
    def __init__(self):
        self.dnodes = []
        self.dnodes.append(TDDnode(1))
        self.dnodes.append(TDDnode(2))
        self.dnodes.append(TDDnode(3))
        self.dnodes.append(TDDnode(4))
        self.dnodes.append(TDDnode(5))
        self.dnodes.append(TDDnode(6))
        self.dnodes.append(TDDnode(7))
        self.dnodes.append(TDDnode(8))
        self.dnodes.append(TDDnode(9))
        self.dnodes.append(TDDnode(10))
        self.simDeployed = False
wafwerar's avatar
wafwerar 已提交
644 645
        self.testCluster = False
        self.valgrind = 0
S
Shengliang Guan 已提交
646
        self.asan = False
647
        self.killValgrind = 1
648

wafwerar's avatar
wafwerar 已提交
649
    def init(self, path, remoteIP = ""):
wafwerar's avatar
wafwerar 已提交
650
        binPath = self.dnodes[0].getPath() + "/../../../"
wafwerar's avatar
wafwerar 已提交
651
        # tdLog.debug("binPath %s" % (binPath))
652
        binPath = os.path.realpath(binPath)
wafwerar's avatar
wafwerar 已提交
653
        # tdLog.debug("binPath real path %s" % (binPath))
654 655 656 657 658 659 660

        if path == "":
            self.path = os.path.abspath(binPath + "../../")
        else:
            self.path = os.path.realpath(path)

        for i in range(len(self.dnodes)):
wafwerar's avatar
wafwerar 已提交
661
            self.dnodes[i].init(self.path, remoteIP)
662 663 664 665 666 667 668 669
        self.sim = TDSimClient(self.path)

    def setTestCluster(self, value):
        self.testCluster = value

    def setValgrind(self, value):
        self.valgrind = value

S
Shengliang Guan 已提交
670 671 672 673 674 675 676
    def setAsan(self, value):
        self.asan = value
        if value:
            self.stopDnodesPath = os.path.abspath(self.path + "/tests/script/sh/stop_dnodes.sh")
            self.stopDnodesSigintPath = os.path.abspath(self.path + "/tests/script/sh/sigint_stop_dnodes.sh")
            tdLog.info("run in address sanitizer mode")

677 678 679
    def setKillValgrind(self, value):
        self.killValgrind = value

680 681 682 683
    def deploy(self, index, *updatecfgDict):
        self.sim.setTestCluster(self.testCluster)

        if (self.simDeployed == False):
684
            self.sim.deploy(updatecfgDict)
685 686 687 688 689
            self.simDeployed = True

        self.check(index)
        self.dnodes[index - 1].setTestCluster(self.testCluster)
        self.dnodes[index - 1].setValgrind(self.valgrind)
S
Shengliang Guan 已提交
690
        self.dnodes[index - 1].setAsan(self.asan)
691 692 693 694 695 696
        self.dnodes[index - 1].deploy(updatecfgDict)

    def cfg(self, index, option, value):
        self.check(index)
        self.dnodes[index - 1].cfg(option, value)

haoranc's avatar
haoranc 已提交
697 698 699 700 701 702 703 704
    def starttaosd(self, index):
        self.check(index)
        self.dnodes[index - 1].starttaosd()

    def stoptaosd(self, index):
        self.check(index)
        self.dnodes[index - 1].stoptaosd()

705 706
    def start(self, index):
        self.check(index)
wafwerar's avatar
wafwerar 已提交
707
        self.dnodes[index - 1].start()
wafwerar's avatar
wafwerar 已提交
708

709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740
    def startWithoutSleep(self, index):
        self.check(index)
        self.dnodes[index - 1].startWithoutSleep()

    def stop(self, index):
        self.check(index)
        self.dnodes[index - 1].stop()

    def getDataSize(self, index):
        self.check(index)
        return self.dnodes[index - 1].getDataSize()

    def forcestop(self, index):
        self.check(index)
        self.dnodes[index - 1].forcestop()

    def startIP(self, index):
        self.check(index)

        if self.testCluster:
            self.dnodes[index - 1].startIP()

    def stopIP(self, index):
        self.check(index)

        if self.dnodes[index - 1].testCluster:
            self.dnodes[index - 1].stopIP()

    def check(self, index):
        if index < 1 or index > 10:
            tdLog.exit("index:%d should on a scale of [1, 10]" % (index))

S
Shengliang Guan 已提交
741 742 743 744 745 746 747 748
    def StopAllSigint(self):
        tdLog.info("stop all dnodes sigint")
        if self.asan:
            tdLog.info("execute script: %s" % self.stopDnodesSigintPath)
            os.system(self.stopDnodesSigintPath)
            tdLog.info("execute finished")
            return

749 750
    def stopAll(self):
        tdLog.info("stop all dnodes")
S
Shengliang Guan 已提交
751 752 753 754 755 756
        if self.asan:
            tdLog.info("execute script: %s" % self.stopDnodesPath)
            os.system(self.stopDnodesPath)
            tdLog.info("execute finished")
            return

wafwerar's avatar
wafwerar 已提交
757 758 759
        if (not self.dnodes[0].remoteIP == ""):
            self.dnodes[0].remoteExec(self.dnodes[0].cfgDict, "for i in range(len(tdDnodes.dnodes)):\n    tdDnodes.dnodes[i].running=1\ntdDnodes.stopAll()")
            return
760 761 762
        for i in range(len(self.dnodes)):
            self.dnodes[i].stop()

wafwerar's avatar
wafwerar 已提交
763 764
        psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
        processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
765 766 767 768 769
        if processID:
            cmd = "sudo systemctl stop taosd"
            os.system(cmd)
        # if os.system(cmd) != 0 :
        # tdLog.exit(cmd)
wafwerar's avatar
wafwerar 已提交
770 771
        psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
        processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
772
        while(processID):
wafwerar's avatar
wafwerar 已提交
773 774 775 776
            if platform.system().lower() == 'windows':
                killCmd = "kill -9 %s > nul 2>&1" % processID
            else:
                killCmd = "kill -9 %s > /dev/null 2>&1" % processID
777 778 779
            os.system(killCmd)
            time.sleep(1)
            processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
780
                psCmd, shell=True).decode("utf-8").strip()
781

782
        if self.killValgrind == 1:
wafwerar's avatar
wafwerar 已提交
783 784
            psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs"
            processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
785
            while(processID):
wafwerar's avatar
wafwerar 已提交
786 787 788 789
                if platform.system().lower() == 'windows':
                    killCmd = "kill -TERM %s > nul 2>&1" % processID
                else:
                    killCmd = "kill -TERM %s > /dev/null 2>&1" % processID
790 791 792
                os.system(killCmd)
                time.sleep(1)
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
793
                    psCmd, shell=True).decode("utf-8").strip()
794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812

        # if os.system(cmd) != 0 :
        # tdLog.exit(cmd)

    def getDnodesRootDir(self):
        dnodesRootDir = "%s/sim" % (self.path)
        return dnodesRootDir

    def getSimCfgPath(self):
        return self.sim.getCfgDir()

    def getSimLogPath(self):
        return self.sim.getLogDir()

    def addSimExtraCfg(self, option, value):
        self.sim.addExtraCfg(option, value)


tdDnodes = TDDnodes()