TD-16821.py 8.2 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17

import taos
import sys
import time
import socket
import os
import threading

from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *

class TDTestCase:
18
    def init(self, conn, logSql, replicaVar=1):
19
        self.replicaVar = int(replicaVar)
P
plum-lihui 已提交
20 21 22 23 24 25 26 27 28 29 30
        tdLog.debug(f"start to excute {__file__}")
        tdSql.init(conn.cursor())
        #tdSql.init(conn.cursor(), logSql)  # output sql.txt file

    def checkFileContent(self, consumerId, queryString):
        buildPath = tdCom.getBuildPath()
        cfgPath = tdCom.getClientCfgPath()
        dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
        cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
        tdLog.info(cmdStr)
        os.system(cmdStr)
G
Ganlin Zhao 已提交
31

P
plum-lihui 已提交
32 33 34 35 36
        consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
        tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))

        consumeFile = open(consumeRowsFile, mode='r')
        queryFile = open(dstFile, mode='r')
G
Ganlin Zhao 已提交
37

P
plum-lihui 已提交
38 39 40 41 42 43
        # skip first line for it is schema
        queryFile.readline()

        while True:
            dst = queryFile.readline()
            src = consumeFile.readline()
G
Ganlin Zhao 已提交
44

P
plum-lihui 已提交
45 46 47 48 49
            if dst:
                if dst != src:
                    tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
            else:
                break
G
Ganlin Zhao 已提交
50
        return
P
plum-lihui 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75

    def tmqCase1(self):
        tdLog.printNoPrefix("======== test case 1: ")
        paraDict = {'dbName':     'db1',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
                    'ctbPrefix':  'ctb',
                    'ctbNum':     1,
                    'rowsPerTbl': 10000,
                    'batchNum':   10,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'pollDelay':  10,
                    'showMsg':    1,
                    'showRow':    1}

        topicNameList = ['topic1', 'topic2', 'topic3']
        expectRowsList = []
        tmqCom.initConsumerTable()
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
76
        tdSql.execute("alter database %s wal_retention_period 3600" % (paraDict['dbName']))
P
plum-lihui 已提交
77 78 79 80 81 82
        tdLog.info("create stb")
        tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
        tdLog.info("create ctb")
        tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
        tdLog.info("insert data")
        tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
G
Ganlin Zhao 已提交
83

P
plum-lihui 已提交
84 85 86 87 88 89 90 91 92
        tdLog.info("create topics from stb with filter")
        # queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
        # queryString = "select ts, c1, c2 from %s.%s" %(paraDict['dbName'], paraDict['stbName'])
        queryString = "select * from %s.%s" %(paraDict['dbName'], paraDict['stbName'])
        sqlString = "create topic %s as stable %s.%s" %(topicNameList[0], paraDict["dbName"],paraDict["stbName"])
        # sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
        tdLog.info("create topic sql: %s"%sqlString)
        tdSql.execute(sqlString)
        # queryString = 'select * from %s.%s'%(paraDict["dbName"],paraDict["stbName"])
G
Ganlin Zhao 已提交
93
        tdSql.query(queryString)
P
plum-lihui 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
        expectRowsList.append(tdSql.getRows())

        # init consume info, and start tmq_sim, then check consume result
        tdLog.info("insert consume info to consume processor")
        consumerId   = 0
        expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
        topicList    = topicNameList[0]
        ifcheckdata  = 1
        ifManualCommit = 1
        keyList      = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])

G
Ganlin Zhao 已提交
109
        tdLog.info("wait the consume result")
P
plum-lihui 已提交
110 111
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
G
Ganlin Zhao 已提交
112

P
plum-lihui 已提交
113 114 115 116
        if expectRowsList[0] != resultList[0]:
            tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
            tdLog.exit("0 tmq consume rows error!")

G
Ganlin Zhao 已提交
117
        self.checkFileContent(consumerId, queryString)
P
plum-lihui 已提交
118 119 120 121 122 123 124 125

        # reinit consume info, and start tmq_sim, then check consume result
        tmqCom.initConsumerTable()

        queryString = "select ts, log(c1), cos(c1) from %s.%s where c1 > 3169" %(paraDict['dbName'], paraDict['stbName'])
        sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
        tdLog.info("create topic sql: %s"%sqlString)
        tdSql.execute(sqlString)
G
Ganlin Zhao 已提交
126
        tdSql.query(queryString)
P
plum-lihui 已提交
127 128 129 130 131 132 133 134 135
        expectRowsList.append(tdSql.getRows())

        consumerId   = 1
        topicList    = topicNameList[1]
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])

G
Ganlin Zhao 已提交
136
        tdLog.info("wait the consume result")
P
plum-lihui 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
        if expectRowsList[1] != resultList[0]:
            tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
            tdLog.exit("1 tmq consume rows error!")

        self.checkFileContent(consumerId, queryString)

        # reinit consume info, and start tmq_sim, then check consume result
        tmqCom.initConsumerTable()

        queryString = "select ts, log(c1), atan(c1) from %s.%s where ts >= %d" %(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+6137)
        sqlString = "create topic %s as %s" %(topicNameList[2], queryString)
        tdLog.info("create topic sql: %s"%sqlString)
        tdSql.execute(sqlString)
G
Ganlin Zhao 已提交
152 153
        tdSql.query(queryString)
        expectRowsList.append(tdSql.getRows())
P
plum-lihui 已提交
154 155 156 157 158 159 160 161

        consumerId   = 2
        topicList    = topicNameList[2]
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])

G
Ganlin Zhao 已提交
162
        tdLog.info("wait the consume result")
P
plum-lihui 已提交
163 164 165 166 167 168 169 170
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
        # if expectRowsList[2] != resultList[0]:
        #     tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
        #     tdLog.exit("2 tmq consume rows error!")

        # self.checkFileContent(consumerId, queryString)

G
Ganlin Zhao 已提交
171
        time.sleep(10)
P
plum-lihui 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
        for i in range(len(topicNameList)):
            tdSql.query("drop topic %s"%topicNameList[i])

        tdLog.printNoPrefix("======== test case 1 end ...... ")

    def run(self):
        tdSql.prepare()
        self.tmqCase1()

    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

event = threading.Event()

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())