tmqCommon.py 25.7 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
13
import math
P
plum-lihui 已提交
14
from asyncore import loop
P
plum-lihui 已提交
15
from collections import defaultdict
P
plum-lihui 已提交
16
import subprocess
P
plum-lihui 已提交
17 18 19 20 21 22
import random
import string
import threading
import requests
import time
# import socketfrom
P
plum-lihui 已提交
23 24
import json
import toml
P
plum-lihui 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39

import taos
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *

# class actionType(Enum):
#     CREATE_DATABASE = 0
#     CREATE_STABLE   = 1
#     CREATE_CTABLE   = 2
#     INSERT_DATA     = 3

class TMQCom:
40
    def init(self, conn, logSql, replicaVar=1):
41
        self.replicaVar = int(replicaVar)
P
plum-lihui 已提交
42 43 44
        tdSql.init(conn.cursor())
        # tdSql.init(conn.cursor(), logSql)  # output sql.txt file

G
Ganlin Zhao 已提交
45
    def initConsumerTable(self,cdbName='cdb'):
P
plum-lihui 已提交
46 47 48 49
        tdLog.info("create consume database, and consume info table, and consume result table")
        tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
        tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
        tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
G
Ganlin Zhao 已提交
50
        tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
P
plum-lihui 已提交
51 52 53

        tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
        tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
P
plum-lihui 已提交
54
        tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName)
P
plum-lihui 已提交
55

G
Ganlin Zhao 已提交
56
    def initConsumerInfoTable(self,cdbName='cdb'):
P
plum-lihui 已提交
57 58 59 60
        tdLog.info("drop consumeinfo table")
        tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
        tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)

G
Ganlin Zhao 已提交
61
    def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
P
plum-lihui 已提交
62
        sql = "insert into %s.consumeinfo values "%cdbName
P
Ping Xiao 已提交
63
        sql += "(now + %ds, %d, '%s', '%s', %d, %d, %d)"%(consumerId, consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
P
plum-lihui 已提交
64 65 66 67 68 69 70 71 72 73 74 75
        tdLog.info("consume info sql: %s"%sql)
        tdSql.query(sql)

    def selectConsumeResult(self,expectRows,cdbName='cdb'):
        resultList=[]
        while 1:
            tdSql.query("select * from %s.consumeresult"%cdbName)
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
            if tdSql.getRows() == expectRows:
                break
            else:
                time.sleep(5)
G
Ganlin Zhao 已提交
76

P
plum-lihui 已提交
77 78 79
        for i in range(expectRows):
            tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
            resultList.append(tdSql.getData(i , 3))
G
Ganlin Zhao 已提交
80

P
plum-lihui 已提交
81
        return resultList
G
Ganlin Zhao 已提交
82

P
plum-lihui 已提交
83 84 85 86 87 88 89 90 91
    def selectConsumeMsgResult(self,expectRows,cdbName='cdb'):
        resultList=[]
        while 1:
            tdSql.query("select * from %s.consumeresult"%cdbName)
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
            if tdSql.getRows() == expectRows:
                break
            else:
                time.sleep(5)
G
Ganlin Zhao 已提交
92

P
plum-lihui 已提交
93 94 95
        for i in range(expectRows):
            tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
            resultList.append(tdSql.getData(i , 2))
G
Ganlin Zhao 已提交
96

P
plum-lihui 已提交
97
        return resultList
P
plum-lihui 已提交
98

P
plum-lihui 已提交
99
    def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0,snapshot=0):
P
plum-lihui 已提交
100 101 102 103 104 105
        buildPath = tdCom.getBuildPath()
        cfgPath = tdCom.getClientCfgPath()
        if valgrind == 1:
            logFile = cfgPath + '/../log/valgrind-tmq.log'
            shellCmd = 'nohup valgrind --log-file=' + logFile
            shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
G
Ganlin Zhao 已提交
106

P
plum-lihui 已提交
107
        if (platform.system().lower() == 'windows'):
wafwerar's avatar
wafwerar 已提交
108 109 110 111 112 113 114
            processorName = buildPath + '\\build\\bin\\tmq_sim.exe'
            if alias != 0:
                processorNameNew = buildPath + '\\build\\bin\\tmq_sim_new.exe'
                shellCmd = 'cp %s %s'%(processorName, processorNameNew)
                os.system(shellCmd)
                processorName = processorNameNew
            shellCmd = 'mintty -h never ' + processorName + ' -c ' + cfgPath
G
Ganlin Zhao 已提交
115 116
            shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot)
            shellCmd += "> nul 2>&1 &"
P
plum-lihui 已提交
117
        else:
P
plum-lihui 已提交
118 119 120 121 122 123 124
            processorName = buildPath + '/build/bin/tmq_sim'
            if alias != 0:
                processorNameNew = buildPath + '/build/bin/tmq_sim_new'
                shellCmd = 'cp %s %s'%(processorName, processorNameNew)
                os.system(shellCmd)
                processorName = processorNameNew
            shellCmd = 'nohup ' + processorName + ' -c ' + cfgPath
G
Ganlin Zhao 已提交
125
            shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot)
P
plum-lihui 已提交
126 127
            shellCmd += "> /dev/null 2>&1 &"
        tdLog.info(shellCmd)
G
Ganlin Zhao 已提交
128
        os.system(shellCmd)
P
plum-lihui 已提交
129

P
plum-lihui 已提交
130
    def stopTmqSimProcess(self, processorName):
S
Shengliang Guan 已提交
131
        psCmd = "unset LD_PRELOAD; ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"%(processorName)
X
Xuefeng Tan 已提交
132 133
        if platform.system().lower() == 'windows':
            psCmd = "ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"%(processorName)
P
plum-lihui 已提交
134
        processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
wafwerar's avatar
wafwerar 已提交
135
        onlyKillOnceWindows = 0
P
plum-lihui 已提交
136
        while(processID):
wafwerar's avatar
wafwerar 已提交
137
            if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
X
Xuefeng Tan 已提交
138 139 140 141
                if platform.system().lower() == 'windows':
                    killCmd = "kill -INT %s > /dev/nul 2>&1" % processID
                else:
                    killCmd = "unset LD_PRELOAD; kill -INT %s > /dev/null 2>&1" % processID
wafwerar's avatar
wafwerar 已提交
142 143
                os.system(killCmd)
                onlyKillOnceWindows = 1
P
plum-lihui 已提交
144 145 146 147
            time.sleep(0.2)
            processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
        tdLog.debug("%s is stopped by kill -INT" % (processorName))

wmmhello's avatar
wmmhello 已提交
148
    def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
P
plum-lihui 已提交
149 150
        loopFlag = 1
        while loopFlag:
P
plum-lihui 已提交
151 152
            tdSql.query("select * from %s.notifyinfo"%cdbName)
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
P
plum-lihui 已提交
153
            actRows = tdSql.getRows()
wmmhello's avatar
wmmhello 已提交
154 155 156 157
            for i in range(actRows):
                if tdSql.getData(i, 1) == 0:
                    loopFlag = 0
                    break
H
Hui Li 已提交
158
            time.sleep(0.02)
P
plum-lihui 已提交
159 160
        return

wmmhello's avatar
wmmhello 已提交
161
    def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
P
plum-lihui 已提交
162 163
        loopFlag = 1
        while loopFlag:
P
plum-lihui 已提交
164 165
            tdSql.query("select * from %s.notifyinfo"%cdbName)
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
P
plum-lihui 已提交
166
            actRows = tdSql.getRows()
wmmhello's avatar
wmmhello 已提交
167 168 169 170
            for i in range(actRows):
                if tdSql.getData(i, 1) == 1:
                    loopFlag = 0
                    break
H
Hui Li 已提交
171
            time.sleep(0.02)
P
plum-lihui 已提交
172 173
        return

P
plum-lihui 已提交
174 175 176 177 178 179 180 181
    def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
        if dropFlag == 1:
            tsql.execute("drop database if exists %s"%(dbName))

        tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica))
        tdLog.debug("complete to create database %s"%(dbName))
        return

P
plum-lihui 已提交
182 183
    # self.create_stable() and self.create_ctable() and self.insert_data_interlaceByMultiTbl() : The three functions are matched
    # schema: (ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))
P
plum-lihui 已提交
184
    def create_stable(self,tsql, dbName,stbName):
P
plum-lihui 已提交
185 186
        schemaString = "(ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))"
        tsql.execute("create table if not exists %s.%s %s"%(dbName, stbName, schemaString))
P
plum-lihui 已提交
187 188 189
        tdLog.debug("complete to create %s.%s" %(dbName, stbName))
        return

P
plum-lihui 已提交
190
    def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
P
plum-lihui 已提交
191
        # tsql.execute("use %s" %dbName)
P
plum-lihui 已提交
192 193 194
        pre_create = "create table"
        sql = pre_create
        #tdLog.debug("doing create one  stable %s and %d  child table in %s  ..." %(stbname, count ,dbname))
P
plum-lihui 已提交
195 196
        batchNum = 10
        tblBatched  = 0
P
plum-lihui 已提交
197
        for i in range(ctbNum):
P
plum-lihui 已提交
198
            tagBinaryValue = 'beijing'
P
plum-lihui 已提交
199
            if (i % 2 == 0):
P
plum-lihui 已提交
200
                tagBinaryValue = 'shanghai'
P
plum-lihui 已提交
201
            elif (i % 3 == 0):
P
plum-lihui 已提交
202
                tagBinaryValue = 'changsha'
G
Ganlin Zhao 已提交
203

P
plum-lihui 已提交
204 205 206
            sql += " %s.%s%d using %s.%s tags(%d, %d, %d, '%s', '%s')"%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
            tblBatched += 1
            if (i == ctbNum-1 ) or (tblBatched == batchNum):
P
plum-lihui 已提交
207
                tsql.execute(sql)
P
plum-lihui 已提交
208
                tblBatched = 0
P
plum-lihui 已提交
209
                sql = pre_create
P
plum-lihui 已提交
210

P
plum-lihui 已提交
211 212
        if sql != pre_create:
            tsql.execute(sql)
G
Ganlin Zhao 已提交
213

P
plum-lihui 已提交
214
        tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
G
Ganlin Zhao 已提交
215
        return
P
plum-lihui 已提交
216

P
plum-lihui 已提交
217 218
    def drop_ctable(self, tsql, dbname=None, count=1, default_ctbname_prefix="ctb",ctbStartIdx=0):
        for _ in range(count):
P
plum-lihui 已提交
219
            create_ctable_sql = f'drop table if exists {dbname}.{default_ctbname_prefix}{ctbStartIdx};'
P
plum-lihui 已提交
220 221 222 223
            ctbStartIdx += 1
            tdLog.info("drop ctb sql: %s"%create_ctable_sql)
            tsql.execute(create_ctable_sql)

P
plum-lihui 已提交
224
    # schema: (ts timestamp, c1 int, c2 binary(16))
P
plum-lihui 已提交
225
    def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=None):
P
plum-lihui 已提交
226 227 228 229 230
        tdLog.debug("start to insert data ............")
        tsql.execute("use %s" %dbName)
        pre_insert = "insert into "
        sql = pre_insert

P
plum-lihui 已提交
231 232 233
        if startTs is None:
            t = time.time()
            startTs = int(round(t * 1000))
P
plum-lihui 已提交
234 235
        #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
        for i in range(ctbNum):
P
plum-lihui 已提交
236
            rowsBatched = 0
P
plum-lihui 已提交
237 238 239
            sql += " %s%d values "%(stbName,i)
            for j in range(rowsPerTbl):
                sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
P
plum-lihui 已提交
240 241
                rowsBatched += 1
                if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
P
plum-lihui 已提交
242
                    tsql.execute(sql)
P
plum-lihui 已提交
243
                    rowsBatched = 0
P
plum-lihui 已提交
244 245 246 247 248 249 250 251 252
                    if j < rowsPerTbl - 1:
                        sql = "insert into %s%d values " %(stbName,i)
                    else:
                        sql = "insert into "
        #end sql
        if sql != pre_insert:
            #print("insert sql:%s"%sql)
            tsql.execute(sql)
        tdLog.debug("insert data ............ [OK]")
G
Ganlin Zhao 已提交
253
        return
P
plum-lihui 已提交
254

P
plum-lihui 已提交
255
    # schema: (ts timestamp, c1 int, c2 int, c3 binary(16))
P
plum-lihui 已提交
256 257 258 259 260 261 262 263 264 265
    def insert_data_1(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
        tdLog.debug("start to insert data ............")
        tsql.execute("use %s" %dbName)
        pre_insert = "insert into "
        sql = pre_insert

        t = time.time()
        startTs = int(round(t * 1000))
        #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
        for i in range(ctbNum):
P
plum-lihui 已提交
266
            rowsBatched = 0
P
plum-lihui 已提交
267 268 269 270 271 272
            sql += " %s%d values "%(ctbPrefix,i)
            for j in range(rowsPerTbl):
                if (j % 2 == 0):
                    sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j)
                else:
                    sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, -j, j)
P
plum-lihui 已提交
273 274
                rowsBatched += 1
                if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
P
plum-lihui 已提交
275
                    tsql.execute(sql)
P
plum-lihui 已提交
276
                    rowsBatched = 0
P
plum-lihui 已提交
277 278 279 280 281 282 283 284 285 286 287
                    if j < rowsPerTbl - 1:
                        sql = "insert into %s%d values " %(ctbPrefix,i)
                    else:
                        sql = "insert into "
        #end sql
        if sql != pre_insert:
            #print("insert sql:%s"%sql)
            tsql.execute(sql)
        tdLog.debug("insert data ............ [OK]")
        return

P
plum-lihui 已提交
288
    # schema: (ts timestamp, c1 int, c2 int, c3 binary(16), c4 timestamp)
P
plum-lihui 已提交
289
    def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,ctbStartIdx=0):
P
plum-lihui 已提交
290 291 292 293 294 295 296 297 298
        tdLog.debug("start to insert data ............")
        tsql.execute("use %s" %dbName)
        pre_insert = "insert into "
        sql = pre_insert

        t = time.time()
        startTs = int(round(t * 1000))
        #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
        for i in range(ctbNum):
P
plum-lihui 已提交
299
            rowsBatched = 0
P
plum-lihui 已提交
300
            sql += " %s%d values "%(ctbPrefix,i+ctbStartIdx)
P
plum-lihui 已提交
301 302 303 304 305
            for j in range(rowsPerTbl):
                if (j % 2 == 0):
                    sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j)
                else:
                    sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, -j, j)
P
plum-lihui 已提交
306 307
                rowsBatched += 1
                if (rowsBatched == batchNum) or (j == rowsPerTbl - 1):
P
plum-lihui 已提交
308
                    tsql.execute(sql)
P
plum-lihui 已提交
309
                    rowsBatched = 0
P
plum-lihui 已提交
310
                    if j < rowsPerTbl - 1:
P
plum-lihui 已提交
311
                        sql = "insert into %s%d values " %(ctbPrefix,i+ctbStartIdx)
P
plum-lihui 已提交
312 313 314 315 316 317 318 319 320
                    else:
                        sql = "insert into "
        #end sql
        if sql != pre_insert:
            #print("insert sql:%s"%sql)
            tsql.execute(sql)
        tdLog.debug("insert data ............ [OK]")
        return

P
plum-lihui 已提交
321 322
    # schema: (ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))
    def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0,ctbStartIdx=0):
P
plum-lihui 已提交
323 324 325 326 327 328 329 330 331 332 333 334 335 336
        tdLog.debug("start to insert data ............")
        tsql.execute("use %s" %dbName)
        pre_insert = "insert into "
        sql = pre_insert

        if startTs == 0:
            t = time.time()
            startTs = int(round(t * 1000))

        ctbDict = {}
        for i in range(ctbNum):
            ctbDict[i] = 0

        #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
P
plum-lihui 已提交
337
        rowsOfCtb = 0
P
plum-lihui 已提交
338 339
        while rowsOfCtb < rowsPerTbl:
            for i in range(ctbNum):
P
plum-lihui 已提交
340 341
                sql += " %s.%s%d values "%(dbName,ctbPrefix,i+ctbStartIdx)
                rowsBatched = 0
P
plum-lihui 已提交
342
                for k in range(batchNum):
P
plum-lihui 已提交
343
                    if (k % 2 == 0):
P
plum-lihui 已提交
344
                        sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+ctbDict[i], ctbDict[i],ctbDict[i], ctbDict[i],i+ctbStartIdx,k)
P
plum-lihui 已提交
345
                    else:
P
plum-lihui 已提交
346
                        sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+ctbDict[i],-ctbDict[i],ctbDict[i],-ctbDict[i],i+ctbStartIdx,k)
P
plum-lihui 已提交
347 348

                    rowsBatched += 1
P
plum-lihui 已提交
349
                    ctbDict[i] += 1
P
plum-lihui 已提交
350
                    if (rowsBatched == batchNum) or (ctbDict[i] == rowsPerTbl):
P
plum-lihui 已提交
351
                        tsql.execute(sql)
P
plum-lihui 已提交
352
                        rowsBatched = 0
P
plum-lihui 已提交
353 354 355 356 357 358 359
                        sql = "insert into "
                        break
            rowsOfCtb = ctbDict[0]

        tdLog.debug("insert data ............ [OK]")
        return

P
plum-lihui 已提交
360 361 362 363 364 365 366 367 368 369 370 371
    def threadFunctionForInsertByInterlace(self, **paraDict):
        # create new connector for new tdSql instance in my thread
        newTdSql = tdCom.newTdSql()
        self.insert_data_interlaceByMultiTbl(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"])
        return

    def asyncInsertDataByInterlace(self, paraDict):
        pThread = threading.Thread(target=self.threadFunctionForInsertByInterlace, kwargs=paraDict)
        pThread.start()
        return pThread

    def insert_data_with_autoCreateTbl(self,tsql,dbName,stbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0,ctbStartIdx=0):
P
plum-lihui 已提交
372
        tdLog.debug("start to insert data with auto create child table ............")
P
plum-lihui 已提交
373 374 375 376 377 378 379
        tsql.execute("use %s" %dbName)
        pre_insert = "insert into "
        sql = pre_insert

        if startTs == 0:
            t = time.time()
            startTs = int(round(t * 1000))
G
Ganlin Zhao 已提交
380

P
plum-lihui 已提交
381
        #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
G
Ganlin Zhao 已提交
382
        rowsBatched = 0
P
plum-lihui 已提交
383
        for i in range(ctbNum):
P
plum-lihui 已提交
384 385 386 387 388
            tagBinaryValue = 'beijing'
            if (i % 2 == 0):
                tagBinaryValue = 'shanghai'
            elif (i % 3 == 0):
                tagBinaryValue = 'changsha'
G
Ganlin Zhao 已提交
389

P
plum-lihui 已提交
390
            sql += " %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
P
plum-lihui 已提交
391
            for j in range(rowsPerTbl):
P
plum-lihui 已提交
392
                sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+j, j,j, j,i+ctbStartIdx,rowsBatched)
P
plum-lihui 已提交
393 394
                rowsBatched += 1
                if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
P
plum-lihui 已提交
395
                    tsql.execute(sql)
P
plum-lihui 已提交
396
                    rowsBatched = 0
P
plum-lihui 已提交
397
                    if j < rowsPerTbl - 1:
P
plum-lihui 已提交
398
                        sql = "insert into %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
P
plum-lihui 已提交
399 400 401 402 403 404 405
                    else:
                        sql = "insert into "
        #end sql
        if sql != pre_insert:
            #print("insert sql:%s"%sql)
            tsql.execute(sql)
        tdLog.debug("insert data ............ [OK]")
P
plum-lihui 已提交
406 407
        return

P
plum-lihui 已提交
408
    def syncCreateDbStbCtbInsertData(self, tsql, paraDict):
P
plum-lihui 已提交
409 410 411
        tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"])
        tdCom.create_stable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
        tdCom.create_ctable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
P
plum-lihui 已提交
412 413
        if "event" in paraDict and type(paraDict['event']) == type(threading.Event()):
            paraDict["event"].set()
P
plum-lihui 已提交
414 415 416 417 418 419

        ctbPrefix = paraDict['ctbPrefix']
        ctbNum = paraDict["ctbNum"]
        for i in range(ctbNum):
            tbName = '%s%s'%(ctbPrefix,i)
            tdCom.insert_rows(tsql,dbname=paraDict["dbName"],tbname=tbName,start_ts_value=paraDict['startTs'],count=paraDict['rowsPerTbl'])
G
Ganlin Zhao 已提交
420
        return
P
plum-lihui 已提交
421 422 423 424 425 426 427 428 429 430 431 432

    def threadFunction(self, **paraDict):
        # create new connector for new tdSql instance in my thread
        newTdSql = tdCom.newTdSql()
        self.syncCreateDbStbCtbInsertData(self, newTdSql, paraDict)
        return

    def asyncCreateDbStbCtbInsertData(self, paraDict):
        pThread = threading.Thread(target=self.threadFunction, kwargs=paraDict)
        pThread.start()
        return pThread

P
plum-lihui 已提交
433 434 435
    def threadFunctionForInsert(self, **paraDict):
        # create new connector for new tdSql instance in my thread
        newTdSql = tdCom.newTdSql()
P
plum-lihui 已提交
436 437 438 439
        if 'ctbStartIdx' in paraDict.keys():
            self.insert_data_2(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"])
        else:
            self.insert_data_2(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
P
plum-lihui 已提交
440 441 442 443 444 445 446
        return

    def asyncInsertData(self, paraDict):
        pThread = threading.Thread(target=self.threadFunctionForInsert, kwargs=paraDict)
        pThread.start()
        return pThread

P
plum-lihui 已提交
447
    def checkFileContent(self, consumerId, queryString, skipRowsOfCons=0):
P
plum-lihui 已提交
448 449 450 451 452 453
        buildPath = tdCom.getBuildPath()
        cfgPath = tdCom.getClientCfgPath()
        dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
        cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
        tdLog.info(cmdStr)
        os.system(cmdStr)
G
Ganlin Zhao 已提交
454

P
plum-lihui 已提交
455 456 457 458 459
        consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
        tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))

        consumeFile = open(consumeRowsFile, mode='r')
        queryFile = open(dstFile, mode='r')
G
Ganlin Zhao 已提交
460

P
plum-lihui 已提交
461 462
        # skip first line for it is schema
        queryFile.readline()
G
Ganlin Zhao 已提交
463

P
plum-lihui 已提交
464 465
        # skip offset for consumer
        for i in range(0,skipRowsOfCons):
G
Ganlin Zhao 已提交
466 467
            consumeFile.readline()

P
plum-lihui 已提交
468 469 470
        while True:
            dst = queryFile.readline()
            src = consumeFile.readline()
471 472 473 474
            dstSplit = dst.split(',')
            srcSplit = src.split(',')

            if not dst or not src:
P
plum-lihui 已提交
475
                break
476 477 478 479
            if len(dstSplit) != len(srcSplit):
                tdLog.exit("consumerId %d consume rows len is not match the rows by direct query,len(dstSplit):%d != len(srcSplit):%d, dst:%s, src:%s"
                           %(consumerId, len(dstSplit), len(srcSplit), dst, src))

480 481 482 483 484 485
            for i in range(len(dstSplit)):
                if srcSplit[i] != dstSplit[i]:
                    srcFloat = float(srcSplit[i])
                    dstFloat = float(dstSplit[i])
                    if not math.isclose(srcFloat, dstFloat, abs_tol=1e-9):
                        tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
G
Ganlin Zhao 已提交
486
        return
P
plum-lihui 已提交
487 488 489 490 491 492 493 494 495

    def getResultFileByTaosShell(self, consumerId, queryString):
        buildPath = tdCom.getBuildPath()
        cfgPath = tdCom.getClientCfgPath()
        dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
        cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
        tdLog.info(cmdStr)
        os.system(cmdStr)
        return dstFile
G
Ganlin Zhao 已提交
496 497 498

    def checkTmqConsumeFileContent(self, consumerId, dstFile):
        cfgPath = tdCom.getClientCfgPath()
P
plum-lihui 已提交
499 500 501 502 503
        consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
        tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))

        consumeFile = open(consumeRowsFile, mode='r')
        queryFile = open(dstFile, mode='r')
G
Ganlin Zhao 已提交
504

P
plum-lihui 已提交
505 506 507 508 509 510 511 512 513 514 515 516 517 518
        # skip first line for it is schema
        queryFile.readline()
        lines = 0
        while True:
            dst = queryFile.readline()
            src = consumeFile.readline()
            lines += 1
            if dst:
                if dst != src:
                    tdLog.info("src row: %s"%src)
                    tdLog.info("dst row: %s"%dst)
                    tdLog.exit("consumerId %d consume rows[%d] is not match the rows by direct query"%(consumerId, lines))
            else:
                break
G
Ganlin Zhao 已提交
519
        return
P
plum-lihui 已提交
520

P
plum-lihui 已提交
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
    def create_ntable(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=None, colPrefix='c', tblNum=1, **kwargs):
        tb_params = ""
        if len(kwargs) > 0:
            for param, value in kwargs.items():
                tb_params += f'{param} "{value}" '
        column_type_str = tdCom.gen_column_type_str(colPrefix, column_elm_list)

        for _ in range(tblNum):
            create_table_sql = f'create table {dbname}.{tbname_prefix}{tbname_index_start_num} ({column_type_str}) {tb_params};'
            tbname_index_start_num += 1
            tsql.execute(create_table_sql)

    def insert_rows_into_ntbl(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=None, startTs=None, tblNum=1, rows=1):
        if startTs is None:
            startTs = tdCom.genTs()[0]

        for tblIdx in range(tblNum):
            for rowIdx in range(rows):
                column_value_list = tdCom.gen_column_value_list(column_ele_list, f'{startTs}+{rowIdx}s')
                column_value_str = ''
                idx = 0
                for column_value in column_value_list:
                    if isinstance(column_value, str) and idx != 0:
                        column_value_str += f'"{column_value}", '
                    else:
                        column_value_str += f'{column_value}, '
                        idx += 1
                column_value_str = column_value_str.rstrip()[:-1]
                insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});'
                tsql.execute(insert_sql)
G
Ganlin Zhao 已提交
551

P
plum-lihui 已提交
552
    def waitSubscriptionExit(self, tsql, topicName):
P
plum-lihui 已提交
553
        wait_cnt = 0
P
plum-lihui 已提交
554 555
        while True:
            exit_flag = 1
P
plum-lihui 已提交
556
            tsql.query("show subscriptions")
P
plum-lihui 已提交
557 558 559 560
            rows = tsql.getRows()
            for idx in range (rows):
                if tsql.getData(idx, 0) != topicName:
                    continue
G
Ganlin Zhao 已提交
561

P
plum-lihui 已提交
562 563 564 565 566 567 568
                if tsql.getData(idx, 3) == None:
                    continue
                else:
                    time.sleep(0.5)
                    wait_cnt += 1
                    exit_flag = 0
                    break
G
Ganlin Zhao 已提交
569

P
plum-lihui 已提交
570 571
            if exit_flag == 1:
                break
G
Ganlin Zhao 已提交
572

P
plum-lihui 已提交
573 574 575
        tsql.query("show subscriptions")
        tdLog.info("show subscriptions:")
        tdLog.info(tsql.queryResult)
P
plum-lihui 已提交
576
        tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
P
plum-lihui 已提交
577

P
plum-lihui 已提交
578 579 580 581
    def close(self):
        self.cursor.close()

tmqCom = TMQCom()