dnodes.py 30.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-

import sys
import os
import os.path
import platform
import subprocess
from time import sleep
wafwerar's avatar
wafwerar 已提交
20 21 22 23
import base64
import json
import copy
from fabric2 import Connection
24 25 26 27 28 29 30 31
from util.log import *


class TDSimClient:
    def __init__(self, path):
        self.testCluster = False
        self.path = path
        self.cfgDict = {
32
            "fqdn": "localhost",
33 34 35 36
            "numOfLogLines": "100000000",
            "locale": "en_US.UTF-8",
            "charset": "UTF-8",
            "asyncLog": "0",
S
Shengliang Guan 已提交
37
            "rpcDebugFlag": "143",
38
            "tmrDebugFlag": "131",
S
Shengliang Guan 已提交
39
            "cDebugFlag": "143",
C
cpwu 已提交
40 41 42
            "uDebugFlag": "143",
            "jniDebugFlag": "143",
            "qDebugFlag": "143",
43
            "supportVnodes": "1024",
H
Hui Li 已提交
44
            "enableQueryHb": "1",
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
            "telemetryReporting": "0",
        }

    def getLogDir(self):
        self.logDir = "%s/sim/psim/log" % (self.path)
        return self.logDir

    def getCfgDir(self):
        self.cfgDir = "%s/sim/psim/cfg" % (self.path)
        return self.cfgDir

    def setTestCluster(self, value):
        self.testCluster = value

    def addExtraCfg(self, option, value):
        self.cfgDict.update({option: value})

    def cfg(self, option, value):
        cmd = "echo %s %s >> %s" % (option, value, self.cfgPath)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

67
    def deploy(self, *updatecfgDict):
68 69 70 71 72 73 74
        self.logDir = "%s/sim/psim/log" % (self.path)
        self.cfgDir = "%s/sim/psim/cfg" % (self.path)
        self.cfgPath = "%s/sim/psim/cfg/taos.cfg" % (self.path)

        cmd = "rm -rf " + self.logDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)
75

wafwerar's avatar
wafwerar 已提交
76 77 78 79
        # cmd = "mkdir -p " + self.logDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.logDir)
80 81 82 83 84

        cmd = "rm -rf " + self.cfgDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

wafwerar's avatar
wafwerar 已提交
85 86 87 88
        # cmd = "mkdir -p " + self.cfgDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.cfgDir)
89 90 91 92 93 94 95 96 97 98 99 100

        cmd = "touch " + self.cfgPath
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        if self.testCluster:
            self.cfg("masterIp", "192.168.0.1")
            self.cfg("secondIp", "192.168.0.2")
        self.cfg("logDir", self.logDir)

        for key, value in self.cfgDict.items():
            self.cfg(key, value)
C
cpwu 已提交
101

102
        try:
C
cpwu 已提交
103
            if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
104 105 106 107 108
                clientCfg = dict (updatecfgDict[0][0].get('clientCfg'))
                for key, value in clientCfg.items():
                    self.cfg(key, value)
        except Exception:
            pass
109 110 111 112 113 114 115 116 117 118 119

        tdLog.debug("psim is deployed and configured by %s" % (self.cfgPath))


class TDDnode:
    def __init__(self, index):
        self.index = index
        self.running = 0
        self.deployed = 0
        self.testCluster = False
        self.valgrind = 0
S
Shengliang Guan 已提交
120
        self.asan = False
wafwerar's avatar
wafwerar 已提交
121
        self.remoteIP = ""
122
        self.cfgDict = {
123
            "fqdn": "localhost",
124 125 126 127 128
            "monitor": "0",
            "maxShellConns": "30000",
            "locale": "en_US.UTF-8",
            "charset": "UTF-8",
            "asyncLog": "0",
S
Shengliang Guan 已提交
129 130 131 132 133 134 135 136
            "mDebugFlag": "143",
            "dDebugFlag": "143",
            "vDebugFlag": "143",
            "tqDebugFlag": "143",
            "cDebugFlag": "143",
            "jniDebugFlag": "143",
            "qDebugFlag": "143",
            "rpcDebugFlag": "143",
137
            "tmrDebugFlag": "131",
S
Shengliang Guan 已提交
138
            "uDebugFlag": "143",
S
Shengliang Guan 已提交
139
            "sDebugFlag": "143",
S
Shengliang Guan 已提交
140 141 142
            "wDebugFlag": "143",
            "numOfLogLines": "100000000",
            "statusInterval": "1",
H
Hui Li 已提交
143
            "enableQueryHb": "1",
144
            "supportVnodes": "1024",
S
Shengliang Guan 已提交
145
            "telemetryReporting": "0"
146 147
        }

wafwerar's avatar
wafwerar 已提交
148
    def init(self, path, remoteIP = ""):
149
        self.path = path
wafwerar's avatar
wafwerar 已提交
150
        self.remoteIP = remoteIP
wafwerar's avatar
wafwerar 已提交
151 152 153 154 155 156
        if (not self.remoteIP == ""):
            try:
                self.config = eval(self.remoteIP)
                self.remote_conn = Connection(host=self.config["host"], port=self.config["port"], user=self.config["user"], connect_kwargs={'password':self.config["password"]})
            except Exception as r:
                print(r)
157 158 159 160 161 162 163

    def setTestCluster(self, value):
        self.testCluster = value

    def setValgrind(self, value):
        self.valgrind = value

S
Shengliang Guan 已提交
164 165 166
    def setAsan(self, value):
        self.asan = value
        if value:
S
Shengliang Guan 已提交
167 168 169 170 171
            selfPath = os.path.dirname(os.path.realpath(__file__))
            if ("community" in selfPath):
                self.execPath = os.path.abspath(self.path + "/community/tests/script/sh/exec.sh")        
            else:
                self.execPath = os.path.abspath(self.path + "/tests/script/sh/exec.sh")        
S
Shengliang Guan 已提交
172

173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
    def getDataSize(self):
        totalSize = 0

        if (self.deployed == 1):
            for dirpath, dirnames, filenames in os.walk(self.dataDir):
                for f in filenames:
                    fp = os.path.join(dirpath, f)

                    if not os.path.islink(fp):
                        totalSize = totalSize + os.path.getsize(fp)

        return totalSize

    def addExtraCfg(self, option, value):
        self.cfgDict.update({option: value})

wafwerar's avatar
wafwerar 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201
    def remoteExec(self, updateCfgDict, execCmd):
        valgrindStr = ''
        if (self.valgrind==1):
            valgrindStr = '-g'
        remoteCfgDict = copy.deepcopy(updateCfgDict)
        if ("logDir" in remoteCfgDict):
            del remoteCfgDict["logDir"]
        if ("dataDir" in remoteCfgDict):
            del remoteCfgDict["dataDir"]
        if ("cfgDir" in remoteCfgDict):
            del remoteCfgDict["cfgDir"]
        remoteCfgDictStr = base64.b64encode(json.dumps(remoteCfgDict).encode()).decode()
        execCmdStr = base64.b64encode(execCmd.encode()).decode()
wafwerar's avatar
wafwerar 已提交
202 203
        with self.remote_conn.cd((self.config["path"]+sys.path[0].replace(self.path, '')).replace('\\','/')):
            self.remote_conn.run("python3 ./test.py %s -d %s -e %s"%(valgrindStr,remoteCfgDictStr,execCmdStr))
wafwerar's avatar
wafwerar 已提交
204

205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
    def deploy(self, *updatecfgDict):
        self.logDir = "%s/sim/dnode%d/log" % (self.path, self.index)
        self.dataDir = "%s/sim/dnode%d/data" % (self.path, self.index)
        self.cfgDir = "%s/sim/dnode%d/cfg" % (self.path, self.index)
        self.cfgPath = "%s/sim/dnode%d/cfg/taos.cfg" % (
            self.path, self.index)

        cmd = "rm -rf " + self.dataDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        cmd = "rm -rf " + self.logDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        cmd = "rm -rf " + self.cfgDir
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

wafwerar's avatar
wafwerar 已提交
224 225 226 227
        # cmd = "mkdir -p " + self.dataDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.dataDir)
228

wafwerar's avatar
wafwerar 已提交
229 230 231 232
        # cmd = "mkdir -p " + self.logDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.logDir)
233

wafwerar's avatar
wafwerar 已提交
234 235 236 237
        # cmd = "mkdir -p " + self.cfgDir
        # if os.system(cmd) != 0:
        #     tdLog.exit(cmd)
        os.makedirs(self.cfgDir)
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257

        cmd = "touch " + self.cfgPath
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

        if self.testCluster:
            self.startIP()

        if self.testCluster:
            self.cfg("masterIp", "192.168.0.1")
            self.cfg("secondIp", "192.168.0.2")
            self.cfg("publicIp", "192.168.0.%d" % (self.index))
            self.cfg("internalIp", "192.168.0.%d" % (self.index))
            self.cfg("privateIp", "192.168.0.%d" % (self.index))
        self.cfgDict["dataDir"] = self.dataDir
        self.cfgDict["logDir"] = self.logDir
        # self.cfg("dataDir",self.dataDir)
        # self.cfg("logDir",self.logDir)
        # print(updatecfgDict)
        isFirstDir = 1
258
        if bool(updatecfgDict) and updatecfgDict[0] and updatecfgDict[0][0]:
259
            for key, value in updatecfgDict[0][0].items():
wafwerar's avatar
wafwerar 已提交
260
                if key == "clientCfg" and self.remoteIP == "" and not platform.system().lower() == 'windows':
261
                    continue
262
                if value == 'dataDir':
263 264
                    if isFirstDir:
                        self.cfgDict.pop('dataDir')
265
                        self.cfg(value, key)
266 267
                        isFirstDir = 0
                    else:
268
                        self.cfg(value, key)
269
                else:
270
                    self.addExtraCfg(key, value)
wafwerar's avatar
wafwerar 已提交
271 272 273 274 275
        if (self.remoteIP == ""):
            for key, value in self.cfgDict.items():
                self.cfg(key, value)
        else:
            self.remoteExec(self.cfgDict, "tdDnodes.deploy(%d,updateCfgDict)"%self.index)
276 277 278 279 280 281

        self.deployed = 1
        tdLog.debug(
            "dnode:%d is deployed and configured by %s" %
            (self.index, self.cfgPath))

282
    def getPath(self, tool="taosd"):
283 284 285 286 287 288 289
        selfPath = os.path.dirname(os.path.realpath(__file__))

        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("community")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

290
        paths = []
291
        for root, dirs, files in os.walk(projPath):
wafwerar's avatar
wafwerar 已提交
292
            if ((tool) in files or ("%s.exe"%tool) in files):
293 294
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
295
                    paths.append(os.path.join(root, tool))
296
                    break
wafwerar's avatar
wafwerar 已提交
297 298
        if (len(paths) == 0):
                return ""
299
        return paths[0]
300

haoranc's avatar
haoranc 已提交
301 302 303 304 305 306 307 308 309 310 311 312 313
    def starttaosd(self):
        binPath = self.getPath()

        if (binPath == ""):
            tdLog.exit("taosd not found!")
        else:
            tdLog.info("taosd found: %s" % binPath)

        if self.deployed == 0:
            tdLog.exit("dnode:%d is not deployed" % (self.index))

        if self.valgrind == 0:
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
314
                cmd = "mintty -h never %s -c %s" % (
haoranc's avatar
haoranc 已提交
315 316
                    binPath, self.cfgDir)
            else:
S
Shengliang Guan 已提交
317 318 319 320 321 322 323 324
                if self.asan:
                    asanDir = "%s/sim/asan/dnode%d.asan" % (
                        self.path, self.index)
                    cmd = "nohup %s -c %s > /dev/null 2> %s & " % (
                        binPath, self.cfgDir, asanDir)
                else:
                    cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
                        binPath, self.cfgDir)
haoranc's avatar
haoranc 已提交
325 326 327 328
        else:
            valgrindCmdline = "valgrind --log-file=\"%s/../log/valgrind.log\"  --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir

            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
329
                cmd = "mintty -h never %s %s -c %s" % (
haoranc's avatar
haoranc 已提交
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
                    valgrindCmdline, binPath, self.cfgDir)
            else:
                cmd = "nohup %s %s -c %s 2>&1 & " % (
                    valgrindCmdline, binPath, self.cfgDir)

            print(cmd)

        if (not self.remoteIP == ""):
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.start(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index))
            self.running = 1
        else:
            if os.system(cmd) != 0:
                tdLog.exit(cmd)
            self.running = 1
            tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
            if self.valgrind == 0:
                time.sleep(0.1)
                key1 = 'from offline to online'
                bkey1 = bytes(key1, encoding="utf8")
                key2= 'TDengine initialized successfully'
                bkey2 = bytes(key2, encoding="utf8")
                logFile = self.logDir + "/taosdlog.0"
                i = 0
haoranc's avatar
haoranc 已提交
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
                # while not os.path.exists(logFile):
                #     sleep(0.1)
                #     i += 1
                #     if i > 10:
                #         break
                # tailCmdStr = 'tail -f '
                # if platform.system().lower() == 'windows':
                #     tailCmdStr = 'tail -n +0 -f '
                # popen = subprocess.Popen(
                #     tailCmdStr + logFile,
                #     stdout=subprocess.PIPE,
                #     stderr=subprocess.PIPE,
                #     shell=True)
                # pid = popen.pid
                # # print('Popen.pid:' + str(pid))
                # timeout = time.time() + 60 * 2
haoranc's avatar
haoranc 已提交
369 370 371 372 373 374 375 376
                # while True:
                #     line = popen.stdout.readline().strip()
                #     print(line)
                #     if bkey1 in line:
                #         popen.kill()
                #         break
                #     elif bkey2 in line:
                #         popen.kill()
C
cpwu 已提交
377
                #         break
haoranc's avatar
haoranc 已提交
378 379 380 381 382 383 384 385 386 387
                #     if time.time() > timeout:
                #         print(time.time(),timeout)
                #         tdLog.exit('wait too long for taosd start')
                tdLog.debug("the dnode:%d has been started." % (self.index))
            else:
                tdLog.debug(
                    "wait 10 seconds for the dnode:%d to start." %
                    (self.index))
                time.sleep(10)

388
    def start(self):
389
        binPath = self.getPath()
390

391
        if (binPath == ""):
392 393
            tdLog.exit("taosd not found!")
        else:
394
            tdLog.info("taosd found: %s" % binPath)
395 396 397 398 399

        if self.deployed == 0:
            tdLog.exit("dnode:%d is not deployed" % (self.index))

        if self.valgrind == 0:
wafwerar's avatar
wafwerar 已提交
400
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
401
                cmd = "mintty -h never %s -c %s" % (
wafwerar's avatar
wafwerar 已提交
402 403
                    binPath, self.cfgDir)
            else:
S
Shengliang Guan 已提交
404
                if self.asan:
S
Shengliang Guan 已提交
405
                    asanDir = "%s/sim/asan/dnode%d.asan" % (
S
Shengliang Guan 已提交
406 407 408 409 410 411
                        self.path, self.index)
                    cmd = "nohup %s -c %s > /dev/null 2> %s & " % (
                        binPath, self.cfgDir, asanDir)
                else:
                    cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
                        binPath, self.cfgDir)
412
        else:
P
plum-lihui 已提交
413
            valgrindCmdline = "valgrind --log-file=\"%s/../log/valgrind.log\"  --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir
414

wafwerar's avatar
wafwerar 已提交
415
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
416
                cmd = "mintty -h never %s %s -c %s" % (
wafwerar's avatar
wafwerar 已提交
417 418 419 420
                    valgrindCmdline, binPath, self.cfgDir)
            else:
                cmd = "nohup %s %s -c %s 2>&1 & " % (
                    valgrindCmdline, binPath, self.cfgDir)
421 422 423

            print(cmd)

wafwerar's avatar
wafwerar 已提交
424
        if (not self.remoteIP == ""):
wafwerar's avatar
wafwerar 已提交
425
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.start(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index))
wafwerar's avatar
wafwerar 已提交
426
            self.running = 1
wafwerar's avatar
wafwerar 已提交
427
        else:
wafwerar's avatar
wafwerar 已提交
428
            os.system("rm -rf %s/taosdlog.0"%self.logDir)
wafwerar's avatar
wafwerar 已提交
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
            if os.system(cmd) != 0:
                tdLog.exit(cmd)
            self.running = 1
            tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
            if self.valgrind == 0:
                time.sleep(0.1)
                key = 'from offline to online'
                bkey = bytes(key, encoding="utf8")
                logFile = self.logDir + "/taosdlog.0"
                i = 0
                while not os.path.exists(logFile):
                    sleep(0.1)
                    i += 1
                    if i > 50:
                        break
wafwerar's avatar
wafwerar 已提交
444
                with open(logFile) as f:
445
                    timeout = time.time() + 10 * 2
wafwerar's avatar
wafwerar 已提交
446 447 448 449 450 451 452
                    while True:
                        line = f.readline().encode('utf-8')
                        if bkey in line:
                            break
                        if time.time() > timeout:
                            tdLog.exit('wait too long for taosd start')
                    tdLog.debug("the dnode:%d has been started." % (self.index))
wafwerar's avatar
wafwerar 已提交
453 454 455 456 457
            else:
                tdLog.debug(
                    "wait 10 seconds for the dnode:%d to start." %
                    (self.index))
                time.sleep(10)
458

459
    def startWithoutSleep(self):
460
        binPath = self.getPath()
461

462
        if (binPath == ""):
463 464
            tdLog.exit("taosd not found!")
        else:
465
            tdLog.info("taosd found: %s" % binPath)
466 467 468 469 470

        if self.deployed == 0:
            tdLog.exit("dnode:%d is not deployed" % (self.index))

        if self.valgrind == 0:
S
Shengliang Guan 已提交
471
            if self.asan:
S
Shengliang Guan 已提交
472
               asanDir = "%s/sim/asan/dnode%d.asan" % (
S
Shengliang Guan 已提交
473 474 475 476 477 478
                   self.path, self.index)
               cmd = "nohup %s -c %s > /dev/null 2> %s & " % (
                   binPath, self.cfgDir, asanDir)
            else:
                cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
                    binPath, self.cfgDir)
479
        else:
P
plum-lihui 已提交
480
            valgrindCmdline = "valgrind  --log-file=\"%s/../log/valgrind.log\"  --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir
481 482 483 484 485 486

            cmd = "nohup %s %s -c %s 2>&1 & " % (
                valgrindCmdline, binPath, self.cfgDir)

            print(cmd)

wafwerar's avatar
wafwerar 已提交
487 488 489 490
        if (self.remoteIP == ""):
            if os.system(cmd) != 0:
                tdLog.exit(cmd)
        else:
491
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].deployed=1\ntdDnodes.dnodes[%d].logDir=\"%%s/sim/dnode%%d/log\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.dnodes[%d].cfgDir=\"%%s/sim/dnode%%d/cfg\"%%(tdDnodes.dnodes[%d].path,%d)\ntdDnodes.startWithoutSleep(%d)"%(self.index-1,self.index-1,self.index-1,self.index,self.index-1,self.index-1,self.index,self.index))
wafwerar's avatar
wafwerar 已提交
492

493 494 495 496
        self.running = 1
        tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))

    def stop(self):
S
Shengliang Guan 已提交
497 498 499 500 501 502
        if self.asan:
            stopCmd = "%s -s stop -n dnode%d" % (self.execPath, self.index)
            tdLog.info("execute script: " + stopCmd)
            os.system(stopCmd)
            return

wafwerar's avatar
wafwerar 已提交
503
        if (not self.remoteIP == ""):
504
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].stop()"%(self.index-1,self.index-1))
wafwerar's avatar
wafwerar 已提交
505
            tdLog.info("stop dnode%d"%self.index)
wafwerar's avatar
wafwerar 已提交
506
            return
507 508 509 510 511 512
        if self.valgrind == 0:
            toBeKilled = "taosd"
        else:
            toBeKilled = "valgrind.bin"

        if self.running != 0:
wafwerar's avatar
wafwerar 已提交
513
            psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs" % toBeKilled
514
            processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
515
                psCmd, shell=True).decode("utf-8").strip()
C
cpwu 已提交
516

wafwerar's avatar
wafwerar 已提交
517
            onlyKillOnceWindows = 0
518
            while(processID):
wafwerar's avatar
wafwerar 已提交
519
                if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
haoranc's avatar
haoranc 已提交
520
                    killCmd = "kill -INT %s > /dev/null 2>&1" % processID
wafwerar's avatar
wafwerar 已提交
521 522
                    if platform.system().lower() == 'windows':
                        killCmd = "kill -INT %s > nul 2>&1" % processID
wafwerar's avatar
wafwerar 已提交
523 524
                    os.system(killCmd)
                    onlyKillOnceWindows = 1
525 526
                time.sleep(1)
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
527
                    psCmd, shell=True).decode("utf-8").strip()
wafwerar's avatar
wafwerar 已提交
528 529
            if not platform.system().lower() == 'windows':
                for port in range(6030, 6041):
wafwerar's avatar
wafwerar 已提交
530
                    fuserCmd = "fuser -k -n tcp %d > /dev/null" % port
wafwerar's avatar
wafwerar 已提交
531
                    os.system(fuserCmd)
532 533 534 535
            if self.valgrind:
                time.sleep(2)

            self.running = 0
haoranc's avatar
haoranc 已提交
536
            tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index))
537

haoranc's avatar
haoranc 已提交
538 539

    def stoptaosd(self):
S
Shengliang Guan 已提交
540 541 542 543 544 545
        if self.asan:
            stopCmd = "%s -s stop -n dnode%d" % (self.execPath, self.index)
            tdLog.info("execute script: " + stopCmd)
            os.system(stopCmd)
            return

haoranc's avatar
haoranc 已提交
546 547 548 549 550 551 552 553 554 555
        if (not self.remoteIP == ""):
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].stop()"%(self.index-1,self.index-1))
            tdLog.info("stop dnode%d"%self.index)
            return
        if self.valgrind == 0:
            toBeKilled = "taosd"
        else:
            toBeKilled = "valgrind.bin"

        if self.running != 0:
wafwerar's avatar
wafwerar 已提交
556
            if platform.system().lower() == 'windows':
wafwerar's avatar
wafwerar 已提交
557
                psCmd = "for /f %%a in ('wmic process where \"name='taosd.exe' and CommandLine like '%%dnode%d%%'\" get processId ^| xargs echo ^| awk ^'{print $2}^' ^&^& echo aa') do @(ps | grep %%a | awk '{print $1}' | xargs)" % (self.index)
wafwerar's avatar
wafwerar 已提交
558
            else:
wafwerar's avatar
wafwerar 已提交
559
                psCmd = "ps -ef|grep -w %s| grep dnode%d|grep -v grep | awk '{print $2}' | xargs" % (toBeKilled,self.index)
wafwerar's avatar
wafwerar 已提交
560
            processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
561
                psCmd, shell=True).decode("utf-8").strip()
wafwerar's avatar
wafwerar 已提交
562

wafwerar's avatar
wafwerar 已提交
563
            onlyKillOnceWindows = 0
wafwerar's avatar
wafwerar 已提交
564
            while(processID):
wafwerar's avatar
wafwerar 已提交
565
                if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
haoranc's avatar
haoranc 已提交
566
                    killCmd = "kill -INT %s > /dev/null 2>&1" % processID
wafwerar's avatar
wafwerar 已提交
567 568
                    os.system(killCmd)
                    onlyKillOnceWindows = 1
wafwerar's avatar
wafwerar 已提交
569
                time.sleep(1)
haoranc's avatar
haoranc 已提交
570
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
571
                    psCmd, shell=True).decode("utf-8").strip()
wafwerar's avatar
wafwerar 已提交
572 573
            if self.valgrind:
                time.sleep(2)
haoranc's avatar
haoranc 已提交
574 575

            self.running = 0
haoranc's avatar
haoranc 已提交
576
            tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index))
haoranc's avatar
haoranc 已提交
577

578
    def forcestop(self):
S
Shengliang Guan 已提交
579 580 581 582 583 584 585
        if self.asan:
            stopCmd = "%s -s stop -n dnode%d -x SIGKILL" + \
                (self.execPath, self.index)
            tdLog.info("execute script: " + stopCmd)
            os.system(stopCmd)
            return

wafwerar's avatar
wafwerar 已提交
586
        if (not self.remoteIP == ""):
587
            self.remoteExec(self.cfgDict, "tdDnodes.dnodes[%d].running=1\ntdDnodes.dnodes[%d].forcestop()"%(self.index-1,self.index-1))
wafwerar's avatar
wafwerar 已提交
588
            return
589 590 591 592 593 594
        if self.valgrind == 0:
            toBeKilled = "taosd"
        else:
            toBeKilled = "valgrind.bin"

        if self.running != 0:
wafwerar's avatar
wafwerar 已提交
595
            psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs" % toBeKilled
596
            processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
597
                psCmd, shell=True).decode("utf-8").strip()
598

wafwerar's avatar
wafwerar 已提交
599
            onlyKillOnceWindows = 0
600
            while(processID):
wafwerar's avatar
wafwerar 已提交
601 602 603 604
                if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
                    killCmd = "kill -KILL %s > /dev/null 2>&1" % processID
                    os.system(killCmd)
                    onlyKillOnceWindows = 1
605 606
                time.sleep(1)
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
607
                    psCmd, shell=True).decode("utf-8").strip()
608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
            for port in range(6030, 6041):
                fuserCmd = "fuser -k -n tcp %d" % port
                os.system(fuserCmd)
            if self.valgrind:
                time.sleep(2)

            self.running = 0
            tdLog.debug("dnode:%d is stopped by kill -KILL" % (self.index))

    def startIP(self):
        cmd = "sudo ifconfig lo:%d 192.168.0.%d up" % (self.index, self.index)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

    def stopIP(self):
        cmd = "sudo ifconfig lo:%d 192.168.0.%d down" % (
            self.index, self.index)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

    def cfg(self, option, value):
        cmd = "echo %s %s >> %s" % (option, value, self.cfgPath)
        if os.system(cmd) != 0:
            tdLog.exit(cmd)

    def getDnodeRootDir(self, index):
        dnodeRootDir = "%s/sim/psim/dnode%d" % (self.path, index)
        return dnodeRootDir

    def getDnodesRootDir(self):
        dnodesRootDir = "%s/sim/psim" % (self.path)
        return dnodesRootDir


class TDDnodes:
    def __init__(self):
        self.dnodes = []
        self.dnodes.append(TDDnode(1))
        self.dnodes.append(TDDnode(2))
        self.dnodes.append(TDDnode(3))
        self.dnodes.append(TDDnode(4))
        self.dnodes.append(TDDnode(5))
        self.dnodes.append(TDDnode(6))
        self.dnodes.append(TDDnode(7))
        self.dnodes.append(TDDnode(8))
        self.dnodes.append(TDDnode(9))
        self.dnodes.append(TDDnode(10))
        self.simDeployed = False
wafwerar's avatar
wafwerar 已提交
656 657
        self.testCluster = False
        self.valgrind = 0
S
Shengliang Guan 已提交
658
        self.asan = False
659
        self.killValgrind = 1
660

wafwerar's avatar
wafwerar 已提交
661
    def init(self, path, remoteIP = ""):
wafwerar's avatar
wafwerar 已提交
662
        binPath = self.dnodes[0].getPath() + "/../../../"
wafwerar's avatar
wafwerar 已提交
663
        # tdLog.debug("binPath %s" % (binPath))
664
        binPath = os.path.realpath(binPath)
wafwerar's avatar
wafwerar 已提交
665
        # tdLog.debug("binPath real path %s" % (binPath))
666 667 668 669 670 671 672

        if path == "":
            self.path = os.path.abspath(binPath + "../../")
        else:
            self.path = os.path.realpath(path)

        for i in range(len(self.dnodes)):
wafwerar's avatar
wafwerar 已提交
673
            self.dnodes[i].init(self.path, remoteIP)
674 675 676 677 678 679 680 681
        self.sim = TDSimClient(self.path)

    def setTestCluster(self, value):
        self.testCluster = value

    def setValgrind(self, value):
        self.valgrind = value

S
Shengliang Guan 已提交
682 683 684
    def setAsan(self, value):
        self.asan = value
        if value:
S
Shengliang Guan 已提交
685 686 687 688 689 690 691
            selfPath = os.path.dirname(os.path.realpath(__file__))
            if ("community" in selfPath):
                self.stopDnodesPath = os.path.abspath(self.path + "/community/tests/script/sh/stop_dnodes.sh")
                self.stopDnodesSigintPath = os.path.abspath(self.path + "/community/tests/script/sh/sigint_stop_dnodes.sh")
            else:    
                self.stopDnodesPath = os.path.abspath(self.path + "/tests/script/sh/stop_dnodes.sh")
                self.stopDnodesSigintPath = os.path.abspath(self.path + "/tests/script/sh/sigint_stop_dnodes.sh")
S
Shengliang Guan 已提交
692 693
            tdLog.info("run in address sanitizer mode")

694 695 696
    def setKillValgrind(self, value):
        self.killValgrind = value

697 698 699 700
    def deploy(self, index, *updatecfgDict):
        self.sim.setTestCluster(self.testCluster)

        if (self.simDeployed == False):
701
            self.sim.deploy(updatecfgDict)
702 703 704 705 706
            self.simDeployed = True

        self.check(index)
        self.dnodes[index - 1].setTestCluster(self.testCluster)
        self.dnodes[index - 1].setValgrind(self.valgrind)
S
Shengliang Guan 已提交
707
        self.dnodes[index - 1].setAsan(self.asan)
708 709 710 711 712 713
        self.dnodes[index - 1].deploy(updatecfgDict)

    def cfg(self, index, option, value):
        self.check(index)
        self.dnodes[index - 1].cfg(option, value)

haoranc's avatar
haoranc 已提交
714 715 716 717 718 719 720 721
    def starttaosd(self, index):
        self.check(index)
        self.dnodes[index - 1].starttaosd()

    def stoptaosd(self, index):
        self.check(index)
        self.dnodes[index - 1].stoptaosd()

722 723
    def start(self, index):
        self.check(index)
wafwerar's avatar
wafwerar 已提交
724
        self.dnodes[index - 1].start()
wafwerar's avatar
wafwerar 已提交
725

726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
    def startWithoutSleep(self, index):
        self.check(index)
        self.dnodes[index - 1].startWithoutSleep()

    def stop(self, index):
        self.check(index)
        self.dnodes[index - 1].stop()

    def getDataSize(self, index):
        self.check(index)
        return self.dnodes[index - 1].getDataSize()

    def forcestop(self, index):
        self.check(index)
        self.dnodes[index - 1].forcestop()

    def startIP(self, index):
        self.check(index)

        if self.testCluster:
            self.dnodes[index - 1].startIP()

    def stopIP(self, index):
        self.check(index)

        if self.dnodes[index - 1].testCluster:
            self.dnodes[index - 1].stopIP()

    def check(self, index):
        if index < 1 or index > 10:
            tdLog.exit("index:%d should on a scale of [1, 10]" % (index))

S
Shengliang Guan 已提交
758
    def StopAllSigint(self):
S
Shengliang Guan 已提交
759
        tdLog.info("stop all dnodes sigint, asan:%d" % self.asan)
S
Shengliang Guan 已提交
760 761 762 763 764 765
        if self.asan:
            tdLog.info("execute script: %s" % self.stopDnodesSigintPath)
            os.system(self.stopDnodesSigintPath)
            tdLog.info("execute finished")
            return

766
    def stopAll(self):
S
Shengliang Guan 已提交
767
        tdLog.info("stop all dnodes, asan:%d" % self.asan)
S
Shengliang Guan 已提交
768 769 770 771 772 773
        if self.asan:
            tdLog.info("execute script: %s" % self.stopDnodesPath)
            os.system(self.stopDnodesPath)
            tdLog.info("execute finished")
            return

wafwerar's avatar
wafwerar 已提交
774 775 776
        if (not self.dnodes[0].remoteIP == ""):
            self.dnodes[0].remoteExec(self.dnodes[0].cfgDict, "for i in range(len(tdDnodes.dnodes)):\n    tdDnodes.dnodes[i].running=1\ntdDnodes.stopAll()")
            return
777 778 779
        for i in range(len(self.dnodes)):
            self.dnodes[i].stop()

wafwerar's avatar
wafwerar 已提交
780 781
        psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
        processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
782 783 784 785 786
        if processID:
            cmd = "sudo systemctl stop taosd"
            os.system(cmd)
        # if os.system(cmd) != 0 :
        # tdLog.exit(cmd)
wafwerar's avatar
wafwerar 已提交
787 788
        psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
        processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
789
        while(processID):
wafwerar's avatar
wafwerar 已提交
790 791 792 793
            if platform.system().lower() == 'windows':
                killCmd = "kill -9 %s > nul 2>&1" % processID
            else:
                killCmd = "kill -9 %s > /dev/null 2>&1" % processID
794 795 796
            os.system(killCmd)
            time.sleep(1)
            processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
797
                psCmd, shell=True).decode("utf-8").strip()
798

799
        if self.killValgrind == 1:
wafwerar's avatar
wafwerar 已提交
800 801
            psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs"
            processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
802
            while(processID):
wafwerar's avatar
wafwerar 已提交
803 804 805 806
                if platform.system().lower() == 'windows':
                    killCmd = "kill -TERM %s > nul 2>&1" % processID
                else:
                    killCmd = "kill -TERM %s > /dev/null 2>&1" % processID
807 808 809
                os.system(killCmd)
                time.sleep(1)
                processID = subprocess.check_output(
wafwerar's avatar
wafwerar 已提交
810
                    psCmd, shell=True).decode("utf-8").strip()
811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827

        # if os.system(cmd) != 0 :
        # tdLog.exit(cmd)

    def getDnodesRootDir(self):
        dnodesRootDir = "%s/sim" % (self.path)
        return dnodesRootDir

    def getSimCfgPath(self):
        return self.sim.getCfgDir()

    def getSimLogPath(self):
        return self.sim.getLogDir()

    def addSimExtraCfg(self, option, value):
        self.sim.addExtraCfg(option, value)

828 829 830
    def getAsan(self):
        return self.asan

831 832

tdDnodes = TDDnodes()