tmqCommon.py 25.8 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
13
import math
P
plum-lihui 已提交
14
from asyncore import loop
P
plum-lihui 已提交
15
from collections import defaultdict
P
plum-lihui 已提交
16
import subprocess
P
plum-lihui 已提交
17 18 19 20 21 22
import random
import string
import threading
import requests
import time
# import socketfrom
P
plum-lihui 已提交
23 24
import json
import toml
P
plum-lihui 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39

import taos
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *

# class actionType(Enum):
#     CREATE_DATABASE = 0
#     CREATE_STABLE   = 1
#     CREATE_CTABLE   = 2
#     INSERT_DATA     = 3

class TMQCom:
40
    def init(self, conn, logSql, replicaVar=1):
41
        self.replicaVar = int(replicaVar)
P
plum-lihui 已提交
42 43 44
        tdSql.init(conn.cursor())
        # tdSql.init(conn.cursor(), logSql)  # output sql.txt file

G
Ganlin Zhao 已提交
45
    def initConsumerTable(self,cdbName='cdb'):
P
plum-lihui 已提交
46 47 48 49
        tdLog.info("create consume database, and consume info table, and consume result table")
        tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
        tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
        tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
G
Ganlin Zhao 已提交
50
        tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
P
plum-lihui 已提交
51 52 53

        tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
        tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
P
plum-lihui 已提交
54
        tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName)
P
plum-lihui 已提交
55

G
Ganlin Zhao 已提交
56
    def initConsumerInfoTable(self,cdbName='cdb'):
P
plum-lihui 已提交
57 58 59 60
        tdLog.info("drop consumeinfo table")
        tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
        tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)

G
Ganlin Zhao 已提交
61
    def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
P
plum-lihui 已提交
62
        sql = "insert into %s.consumeinfo values "%cdbName
P
Ping Xiao 已提交
63
        sql += "(now + %ds, %d, '%s', '%s', %d, %d, %d)"%(consumerId, consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
P
plum-lihui 已提交
64 65 66 67 68 69 70 71 72 73 74 75
        tdLog.info("consume info sql: %s"%sql)
        tdSql.query(sql)

    def selectConsumeResult(self,expectRows,cdbName='cdb'):
        resultList=[]
        while 1:
            tdSql.query("select * from %s.consumeresult"%cdbName)
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
            if tdSql.getRows() == expectRows:
                break
            else:
                time.sleep(5)
G
Ganlin Zhao 已提交
76

P
plum-lihui 已提交
77 78 79
        for i in range(expectRows):
            tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
            resultList.append(tdSql.getData(i , 3))
G
Ganlin Zhao 已提交
80

P
plum-lihui 已提交
81
        return resultList
G
Ganlin Zhao 已提交
82

P
plum-lihui 已提交
83 84 85 86 87 88 89 90 91
    def selectConsumeMsgResult(self,expectRows,cdbName='cdb'):
        resultList=[]
        while 1:
            tdSql.query("select * from %s.consumeresult"%cdbName)
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
            if tdSql.getRows() == expectRows:
                break
            else:
                time.sleep(5)
G
Ganlin Zhao 已提交
92

P
plum-lihui 已提交
93 94 95
        for i in range(expectRows):
            tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
            resultList.append(tdSql.getData(i , 2))
G
Ganlin Zhao 已提交
96

P
plum-lihui 已提交
97
        return resultList
P
plum-lihui 已提交
98

P
plum-lihui 已提交
99
    def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0,snapshot=0):
P
plum-lihui 已提交
100 101 102 103 104 105
        buildPath = tdCom.getBuildPath()
        cfgPath = tdCom.getClientCfgPath()
        if valgrind == 1:
            logFile = cfgPath + '/../log/valgrind-tmq.log'
            shellCmd = 'nohup valgrind --log-file=' + logFile
            shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
G
Ganlin Zhao 已提交
106

P
plum-lihui 已提交
107
        if (platform.system().lower() == 'windows'):
wafwerar's avatar
wafwerar 已提交
108 109 110 111 112 113 114
            processorName = buildPath + '\\build\\bin\\tmq_sim.exe'
            if alias != 0:
                processorNameNew = buildPath + '\\build\\bin\\tmq_sim_new.exe'
                shellCmd = 'cp %s %s'%(processorName, processorNameNew)
                os.system(shellCmd)
                processorName = processorNameNew
            shellCmd = 'mintty -h never ' + processorName + ' -c ' + cfgPath
G
Ganlin Zhao 已提交
115 116
            shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot)
            shellCmd += "> nul 2>&1 &"
P
plum-lihui 已提交
117
        else:
P
plum-lihui 已提交
118 119 120 121 122 123 124
            processorName = buildPath + '/build/bin/tmq_sim'
            if alias != 0:
                processorNameNew = buildPath + '/build/bin/tmq_sim_new'
                shellCmd = 'cp %s %s'%(processorName, processorNameNew)
                os.system(shellCmd)
                processorName = processorNameNew
            shellCmd = 'nohup ' + processorName + ' -c ' + cfgPath
G
Ganlin Zhao 已提交
125
            shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot)
P
plum-lihui 已提交
126 127
            shellCmd += "> /dev/null 2>&1 &"
        tdLog.info(shellCmd)
G
Ganlin Zhao 已提交
128
        os.system(shellCmd)
P
plum-lihui 已提交
129

P
plum-lihui 已提交
130
    def stopTmqSimProcess(self, processorName):
S
Shengliang Guan 已提交
131
        psCmd = "unset LD_PRELOAD; ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"%(processorName)
X
Xuefeng Tan 已提交
132 133
        if platform.system().lower() == 'windows':
            psCmd = "ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"%(processorName)
P
plum-lihui 已提交
134
        processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
wafwerar's avatar
wafwerar 已提交
135
        onlyKillOnceWindows = 0
P
plum-lihui 已提交
136
        while(processID):
wafwerar's avatar
wafwerar 已提交
137
            if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
X
Xuefeng Tan 已提交
138 139 140 141
                if platform.system().lower() == 'windows':
                    killCmd = "kill -INT %s > /dev/nul 2>&1" % processID
                else:
                    killCmd = "unset LD_PRELOAD; kill -INT %s > /dev/null 2>&1" % processID
wafwerar's avatar
wafwerar 已提交
142 143
                os.system(killCmd)
                onlyKillOnceWindows = 1
P
plum-lihui 已提交
144 145 146 147 148 149 150
            time.sleep(0.2)
            processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
        tdLog.debug("%s is stopped by kill -INT" % (processorName))

    def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb',rows=1):
        loopFlag = 1
        while loopFlag:
P
plum-lihui 已提交
151 152
            tdSql.query("select * from %s.notifyinfo"%cdbName)
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
P
plum-lihui 已提交
153 154 155 156 157
            actRows = tdSql.getRows()
            if (actRows >= rows):
                for i in range(actRows):
                    if tdSql.getData(i, 1) == 0:
                        loopFlag = 0
G
Ganlin Zhao 已提交
158
                        break
H
Hui Li 已提交
159
            time.sleep(0.02)
P
plum-lihui 已提交
160 161
        return

P
plum-lihui 已提交
162 163 164
    def getStartCommitNotifyFromTmqsim(self,cdbName='cdb',rows=2):
        loopFlag = 1
        while loopFlag:
P
plum-lihui 已提交
165 166
            tdSql.query("select * from %s.notifyinfo"%cdbName)
            #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
P
plum-lihui 已提交
167 168 169 170 171
            actRows = tdSql.getRows()
            if (actRows >= rows):
                for i in range(actRows):
                    if tdSql.getData(i, 1) == 1:
                        loopFlag = 0
G
Ganlin Zhao 已提交
172
                        break
173
            time.sleep(0.10)
P
plum-lihui 已提交
174 175
        return

P
plum-lihui 已提交
176 177 178 179 180 181 182 183
    def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
        if dropFlag == 1:
            tsql.execute("drop database if exists %s"%(dbName))

        tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica))
        tdLog.debug("complete to create database %s"%(dbName))
        return

P
plum-lihui 已提交
184 185
    # self.create_stable() and self.create_ctable() and self.insert_data_interlaceByMultiTbl() : The three functions are matched
    # schema: (ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))
P
plum-lihui 已提交
186
    def create_stable(self,tsql, dbName,stbName):
P
plum-lihui 已提交
187 188
        schemaString = "(ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))"
        tsql.execute("create table if not exists %s.%s %s"%(dbName, stbName, schemaString))
P
plum-lihui 已提交
189 190 191
        tdLog.debug("complete to create %s.%s" %(dbName, stbName))
        return

P
plum-lihui 已提交
192
    def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
P
plum-lihui 已提交
193
        # tsql.execute("use %s" %dbName)
P
plum-lihui 已提交
194 195 196
        pre_create = "create table"
        sql = pre_create
        #tdLog.debug("doing create one  stable %s and %d  child table in %s  ..." %(stbname, count ,dbname))
P
plum-lihui 已提交
197 198
        batchNum = 10
        tblBatched  = 0
P
plum-lihui 已提交
199
        for i in range(ctbNum):
P
plum-lihui 已提交
200
            tagBinaryValue = 'beijing'
P
plum-lihui 已提交
201
            if (i % 2 == 0):
P
plum-lihui 已提交
202
                tagBinaryValue = 'shanghai'
P
plum-lihui 已提交
203
            elif (i % 3 == 0):
P
plum-lihui 已提交
204
                tagBinaryValue = 'changsha'
G
Ganlin Zhao 已提交
205

P
plum-lihui 已提交
206 207 208
            sql += " %s.%s%d using %s.%s tags(%d, %d, %d, '%s', '%s')"%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
            tblBatched += 1
            if (i == ctbNum-1 ) or (tblBatched == batchNum):
P
plum-lihui 已提交
209
                tsql.execute(sql)
P
plum-lihui 已提交
210
                tblBatched = 0
P
plum-lihui 已提交
211
                sql = pre_create
P
plum-lihui 已提交
212

P
plum-lihui 已提交
213 214
        if sql != pre_create:
            tsql.execute(sql)
G
Ganlin Zhao 已提交
215

P
plum-lihui 已提交
216
        tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
G
Ganlin Zhao 已提交
217
        return
P
plum-lihui 已提交
218

P
plum-lihui 已提交
219 220
    def drop_ctable(self, tsql, dbname=None, count=1, default_ctbname_prefix="ctb",ctbStartIdx=0):
        for _ in range(count):
P
plum-lihui 已提交
221
            create_ctable_sql = f'drop table if exists {dbname}.{default_ctbname_prefix}{ctbStartIdx};'
P
plum-lihui 已提交
222 223 224 225
            ctbStartIdx += 1
            tdLog.info("drop ctb sql: %s"%create_ctable_sql)
            tsql.execute(create_ctable_sql)

P
plum-lihui 已提交
226
    # schema: (ts timestamp, c1 int, c2 binary(16))
P
plum-lihui 已提交
227
    def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=None):
P
plum-lihui 已提交
228 229 230 231 232
        tdLog.debug("start to insert data ............")
        tsql.execute("use %s" %dbName)
        pre_insert = "insert into "
        sql = pre_insert

P
plum-lihui 已提交
233 234 235
        if startTs is None:
            t = time.time()
            startTs = int(round(t * 1000))
P
plum-lihui 已提交
236 237
        #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
        for i in range(ctbNum):
P
plum-lihui 已提交
238
            rowsBatched = 0
P
plum-lihui 已提交
239 240 241
            sql += " %s%d values "%(stbName,i)
            for j in range(rowsPerTbl):
                sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
P
plum-lihui 已提交
242 243
                rowsBatched += 1
                if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
P
plum-lihui 已提交
244
                    tsql.execute(sql)
P
plum-lihui 已提交
245
                    rowsBatched = 0
P
plum-lihui 已提交
246 247 248 249 250 251 252 253 254
                    if j < rowsPerTbl - 1:
                        sql = "insert into %s%d values " %(stbName,i)
                    else:
                        sql = "insert into "
        #end sql
        if sql != pre_insert:
            #print("insert sql:%s"%sql)
            tsql.execute(sql)
        tdLog.debug("insert data ............ [OK]")
G
Ganlin Zhao 已提交
255
        return
P
plum-lihui 已提交
256

P
plum-lihui 已提交
257
    # schema: (ts timestamp, c1 int, c2 int, c3 binary(16))
P
plum-lihui 已提交
258 259 260 261 262 263 264 265 266 267
    def insert_data_1(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
        tdLog.debug("start to insert data ............")
        tsql.execute("use %s" %dbName)
        pre_insert = "insert into "
        sql = pre_insert

        t = time.time()
        startTs = int(round(t * 1000))
        #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
        for i in range(ctbNum):
P
plum-lihui 已提交
268
            rowsBatched = 0
P
plum-lihui 已提交
269 270 271 272 273 274
            sql += " %s%d values "%(ctbPrefix,i)
            for j in range(rowsPerTbl):
                if (j % 2 == 0):
                    sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j)
                else:
                    sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, -j, j)
P
plum-lihui 已提交
275 276
                rowsBatched += 1
                if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
P
plum-lihui 已提交
277
                    tsql.execute(sql)
P
plum-lihui 已提交
278
                    rowsBatched = 0
P
plum-lihui 已提交
279 280 281 282 283 284 285 286 287 288 289
                    if j < rowsPerTbl - 1:
                        sql = "insert into %s%d values " %(ctbPrefix,i)
                    else:
                        sql = "insert into "
        #end sql
        if sql != pre_insert:
            #print("insert sql:%s"%sql)
            tsql.execute(sql)
        tdLog.debug("insert data ............ [OK]")
        return

P
plum-lihui 已提交
290
    # schema: (ts timestamp, c1 int, c2 int, c3 binary(16), c4 timestamp)
P
plum-lihui 已提交
291
    def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,ctbStartIdx=0):
P
plum-lihui 已提交
292 293 294 295 296 297 298 299 300
        tdLog.debug("start to insert data ............")
        tsql.execute("use %s" %dbName)
        pre_insert = "insert into "
        sql = pre_insert

        t = time.time()
        startTs = int(round(t * 1000))
        #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
        for i in range(ctbNum):
P
plum-lihui 已提交
301
            rowsBatched = 0
P
plum-lihui 已提交
302
            sql += " %s%d values "%(ctbPrefix,i+ctbStartIdx)
P
plum-lihui 已提交
303 304 305 306 307
            for j in range(rowsPerTbl):
                if (j % 2 == 0):
                    sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j)
                else:
                    sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, -j, j)
P
plum-lihui 已提交
308 309
                rowsBatched += 1
                if (rowsBatched == batchNum) or (j == rowsPerTbl - 1):
P
plum-lihui 已提交
310
                    tsql.execute(sql)
P
plum-lihui 已提交
311
                    rowsBatched = 0
P
plum-lihui 已提交
312
                    if j < rowsPerTbl - 1:
P
plum-lihui 已提交
313
                        sql = "insert into %s%d values " %(ctbPrefix,i+ctbStartIdx)
P
plum-lihui 已提交
314 315 316 317 318 319 320 321 322
                    else:
                        sql = "insert into "
        #end sql
        if sql != pre_insert:
            #print("insert sql:%s"%sql)
            tsql.execute(sql)
        tdLog.debug("insert data ............ [OK]")
        return

P
plum-lihui 已提交
323 324
    # schema: (ts timestamp, c1 int, c2 bigint, c3 double, c4 binary(32), c5 nchar(32), c6 timestamp) tags (t1 int, t2 bigint, t3 double, t4 binary(32), t5 nchar(32))
    def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0,ctbStartIdx=0):
P
plum-lihui 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338
        tdLog.debug("start to insert data ............")
        tsql.execute("use %s" %dbName)
        pre_insert = "insert into "
        sql = pre_insert

        if startTs == 0:
            t = time.time()
            startTs = int(round(t * 1000))

        ctbDict = {}
        for i in range(ctbNum):
            ctbDict[i] = 0

        #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
P
plum-lihui 已提交
339
        rowsOfCtb = 0
P
plum-lihui 已提交
340 341
        while rowsOfCtb < rowsPerTbl:
            for i in range(ctbNum):
P
plum-lihui 已提交
342 343
                sql += " %s.%s%d values "%(dbName,ctbPrefix,i+ctbStartIdx)
                rowsBatched = 0
P
plum-lihui 已提交
344
                for k in range(batchNum):
P
plum-lihui 已提交
345
                    if (k % 2 == 0):
P
plum-lihui 已提交
346
                        sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+ctbDict[i], ctbDict[i],ctbDict[i], ctbDict[i],i+ctbStartIdx,k)
P
plum-lihui 已提交
347
                    else:
P
plum-lihui 已提交
348
                        sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+ctbDict[i],-ctbDict[i],ctbDict[i],-ctbDict[i],i+ctbStartIdx,k)
P
plum-lihui 已提交
349 350

                    rowsBatched += 1
P
plum-lihui 已提交
351
                    ctbDict[i] += 1
P
plum-lihui 已提交
352
                    if (rowsBatched == batchNum) or (ctbDict[i] == rowsPerTbl):
P
plum-lihui 已提交
353
                        tsql.execute(sql)
P
plum-lihui 已提交
354
                        rowsBatched = 0
P
plum-lihui 已提交
355 356 357 358 359 360 361
                        sql = "insert into "
                        break
            rowsOfCtb = ctbDict[0]

        tdLog.debug("insert data ............ [OK]")
        return

P
plum-lihui 已提交
362 363 364 365 366 367 368 369 370 371 372 373
    def threadFunctionForInsertByInterlace(self, **paraDict):
        # create new connector for new tdSql instance in my thread
        newTdSql = tdCom.newTdSql()
        self.insert_data_interlaceByMultiTbl(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"])
        return

    def asyncInsertDataByInterlace(self, paraDict):
        pThread = threading.Thread(target=self.threadFunctionForInsertByInterlace, kwargs=paraDict)
        pThread.start()
        return pThread

    def insert_data_with_autoCreateTbl(self,tsql,dbName,stbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0,ctbStartIdx=0):
P
plum-lihui 已提交
374
        tdLog.debug("start to insert data with auto create child table ............")
P
plum-lihui 已提交
375 376 377 378 379 380 381
        tsql.execute("use %s" %dbName)
        pre_insert = "insert into "
        sql = pre_insert

        if startTs == 0:
            t = time.time()
            startTs = int(round(t * 1000))
G
Ganlin Zhao 已提交
382

P
plum-lihui 已提交
383
        #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
G
Ganlin Zhao 已提交
384
        rowsBatched = 0
P
plum-lihui 已提交
385
        for i in range(ctbNum):
P
plum-lihui 已提交
386 387 388 389 390
            tagBinaryValue = 'beijing'
            if (i % 2 == 0):
                tagBinaryValue = 'shanghai'
            elif (i % 3 == 0):
                tagBinaryValue = 'changsha'
G
Ganlin Zhao 已提交
391

P
plum-lihui 已提交
392
            sql += " %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
P
plum-lihui 已提交
393
            for j in range(rowsPerTbl):
P
plum-lihui 已提交
394
                sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+j, j,j, j,i+ctbStartIdx,rowsBatched)
P
plum-lihui 已提交
395 396
                rowsBatched += 1
                if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
P
plum-lihui 已提交
397
                    tsql.execute(sql)
P
plum-lihui 已提交
398
                    rowsBatched = 0
P
plum-lihui 已提交
399
                    if j < rowsPerTbl - 1:
P
plum-lihui 已提交
400
                        sql = "insert into %s.%s%d using %s.%s tags (%d, %d, %d, '%s', '%s') values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
P
plum-lihui 已提交
401 402 403 404 405 406 407
                    else:
                        sql = "insert into "
        #end sql
        if sql != pre_insert:
            #print("insert sql:%s"%sql)
            tsql.execute(sql)
        tdLog.debug("insert data ............ [OK]")
P
plum-lihui 已提交
408 409
        return

P
plum-lihui 已提交
410
    def syncCreateDbStbCtbInsertData(self, tsql, paraDict):
P
plum-lihui 已提交
411 412 413
        tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"])
        tdCom.create_stable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
        tdCom.create_ctable(tsql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
P
plum-lihui 已提交
414 415
        if "event" in paraDict and type(paraDict['event']) == type(threading.Event()):
            paraDict["event"].set()
P
plum-lihui 已提交
416 417 418 419 420 421

        ctbPrefix = paraDict['ctbPrefix']
        ctbNum = paraDict["ctbNum"]
        for i in range(ctbNum):
            tbName = '%s%s'%(ctbPrefix,i)
            tdCom.insert_rows(tsql,dbname=paraDict["dbName"],tbname=tbName,start_ts_value=paraDict['startTs'],count=paraDict['rowsPerTbl'])
G
Ganlin Zhao 已提交
422
        return
P
plum-lihui 已提交
423 424 425 426 427 428 429 430 431 432 433 434

    def threadFunction(self, **paraDict):
        # create new connector for new tdSql instance in my thread
        newTdSql = tdCom.newTdSql()
        self.syncCreateDbStbCtbInsertData(self, newTdSql, paraDict)
        return

    def asyncCreateDbStbCtbInsertData(self, paraDict):
        pThread = threading.Thread(target=self.threadFunction, kwargs=paraDict)
        pThread.start()
        return pThread

P
plum-lihui 已提交
435 436 437
    def threadFunctionForInsert(self, **paraDict):
        # create new connector for new tdSql instance in my thread
        newTdSql = tdCom.newTdSql()
P
plum-lihui 已提交
438 439 440 441
        if 'ctbStartIdx' in paraDict.keys():
            self.insert_data_2(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"])
        else:
            self.insert_data_2(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
P
plum-lihui 已提交
442 443 444 445 446 447 448
        return

    def asyncInsertData(self, paraDict):
        pThread = threading.Thread(target=self.threadFunctionForInsert, kwargs=paraDict)
        pThread.start()
        return pThread

P
plum-lihui 已提交
449
    def checkFileContent(self, consumerId, queryString, skipRowsOfCons=0):
P
plum-lihui 已提交
450 451 452 453 454 455
        buildPath = tdCom.getBuildPath()
        cfgPath = tdCom.getClientCfgPath()
        dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
        cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
        tdLog.info(cmdStr)
        os.system(cmdStr)
G
Ganlin Zhao 已提交
456

P
plum-lihui 已提交
457 458 459 460 461
        consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
        tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))

        consumeFile = open(consumeRowsFile, mode='r')
        queryFile = open(dstFile, mode='r')
G
Ganlin Zhao 已提交
462

P
plum-lihui 已提交
463 464
        # skip first line for it is schema
        queryFile.readline()
G
Ganlin Zhao 已提交
465

P
plum-lihui 已提交
466 467
        # skip offset for consumer
        for i in range(0,skipRowsOfCons):
G
Ganlin Zhao 已提交
468 469
            consumeFile.readline()

P
plum-lihui 已提交
470 471 472
        while True:
            dst = queryFile.readline()
            src = consumeFile.readline()
473 474 475 476
            dstSplit = dst.split(',')
            srcSplit = src.split(',')

            if not dst or not src:
P
plum-lihui 已提交
477
                break
478 479 480 481
            if len(dstSplit) != len(srcSplit):
                tdLog.exit("consumerId %d consume rows len is not match the rows by direct query,len(dstSplit):%d != len(srcSplit):%d, dst:%s, src:%s"
                           %(consumerId, len(dstSplit), len(srcSplit), dst, src))

482 483 484 485 486 487
            for i in range(len(dstSplit)):
                if srcSplit[i] != dstSplit[i]:
                    srcFloat = float(srcSplit[i])
                    dstFloat = float(dstSplit[i])
                    if not math.isclose(srcFloat, dstFloat, abs_tol=1e-9):
                        tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
G
Ganlin Zhao 已提交
488
        return
P
plum-lihui 已提交
489 490 491 492 493 494 495 496 497

    def getResultFileByTaosShell(self, consumerId, queryString):
        buildPath = tdCom.getBuildPath()
        cfgPath = tdCom.getClientCfgPath()
        dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
        cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
        tdLog.info(cmdStr)
        os.system(cmdStr)
        return dstFile
G
Ganlin Zhao 已提交
498 499 500

    def checkTmqConsumeFileContent(self, consumerId, dstFile):
        cfgPath = tdCom.getClientCfgPath()
P
plum-lihui 已提交
501 502 503 504 505
        consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
        tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))

        consumeFile = open(consumeRowsFile, mode='r')
        queryFile = open(dstFile, mode='r')
G
Ganlin Zhao 已提交
506

P
plum-lihui 已提交
507 508 509 510 511 512 513 514 515 516 517 518 519 520
        # skip first line for it is schema
        queryFile.readline()
        lines = 0
        while True:
            dst = queryFile.readline()
            src = consumeFile.readline()
            lines += 1
            if dst:
                if dst != src:
                    tdLog.info("src row: %s"%src)
                    tdLog.info("dst row: %s"%dst)
                    tdLog.exit("consumerId %d consume rows[%d] is not match the rows by direct query"%(consumerId, lines))
            else:
                break
G
Ganlin Zhao 已提交
521
        return
P
plum-lihui 已提交
522

P
plum-lihui 已提交
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
    def create_ntable(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=None, colPrefix='c', tblNum=1, **kwargs):
        tb_params = ""
        if len(kwargs) > 0:
            for param, value in kwargs.items():
                tb_params += f'{param} "{value}" '
        column_type_str = tdCom.gen_column_type_str(colPrefix, column_elm_list)

        for _ in range(tblNum):
            create_table_sql = f'create table {dbname}.{tbname_prefix}{tbname_index_start_num} ({column_type_str}) {tb_params};'
            tbname_index_start_num += 1
            tsql.execute(create_table_sql)

    def insert_rows_into_ntbl(self, tsql, dbname=None, tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=None, startTs=None, tblNum=1, rows=1):
        if startTs is None:
            startTs = tdCom.genTs()[0]

        for tblIdx in range(tblNum):
            for rowIdx in range(rows):
                column_value_list = tdCom.gen_column_value_list(column_ele_list, f'{startTs}+{rowIdx}s')
                column_value_str = ''
                idx = 0
                for column_value in column_value_list:
                    if isinstance(column_value, str) and idx != 0:
                        column_value_str += f'"{column_value}", '
                    else:
                        column_value_str += f'{column_value}, '
                        idx += 1
                column_value_str = column_value_str.rstrip()[:-1]
                insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});'
                tsql.execute(insert_sql)
G
Ganlin Zhao 已提交
553

P
plum-lihui 已提交
554
    def waitSubscriptionExit(self, tsql, topicName):
P
plum-lihui 已提交
555
        wait_cnt = 0
P
plum-lihui 已提交
556 557
        while True:
            exit_flag = 1
P
plum-lihui 已提交
558
            tsql.query("show subscriptions")
P
plum-lihui 已提交
559 560 561 562
            rows = tsql.getRows()
            for idx in range (rows):
                if tsql.getData(idx, 0) != topicName:
                    continue
G
Ganlin Zhao 已提交
563

P
plum-lihui 已提交
564 565 566 567 568 569 570
                if tsql.getData(idx, 3) == None:
                    continue
                else:
                    time.sleep(0.5)
                    wait_cnt += 1
                    exit_flag = 0
                    break
G
Ganlin Zhao 已提交
571

P
plum-lihui 已提交
572 573
            if exit_flag == 1:
                break
G
Ganlin Zhao 已提交
574

P
plum-lihui 已提交
575 576 577
        tsql.query("show subscriptions")
        tdLog.info("show subscriptions:")
        tdLog.info(tsql.queryResult)
P
plum-lihui 已提交
578
        tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
P
plum-lihui 已提交
579

P
plum-lihui 已提交
580 581 582 583
    def close(self):
        self.cursor.close()

tmqCom = TMQCom()