tmqShow.py 7.5 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17

import taos
import sys
import time
import socket
import os
import threading

from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *

class TDTestCase:
18
    def init(self, conn, logSql, replicaVar=1):
P
plum-lihui 已提交
19 20 21
        tdLog.debug(f"start to excute {__file__}")
        tdSql.init(conn.cursor())
        #tdSql.init(conn.cursor(), logSql)  # output sql.txt file
P
plum-lihui 已提交
22 23 24 25 26
    def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,offset=1,cdbName='cdb'):
        sql = "insert into %s.consumeinfo values "%cdbName
        sql += "(now+%ds, %d, '%s', '%s', %d, %d, %d)"%(offset,consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
        tdLog.info("consume info sql: %s"%sql)
        tdSql.query(sql)
P
plum-lihui 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39

    def tmqCase1(self):
        tdLog.printNoPrefix("======== test case 1: ")
        paraDict = {'dbName':     'db1',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    4,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
                    'ctbPrefix':  'ctb',
P
plum-lihui 已提交
40
                    'ctbNum':     100,
P
plum-lihui 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
                    'rowsPerTbl': 4000,
                    'batchNum':   15,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'pollDelay':  20,
                    'showMsg':    1,
                    'showRow':    1}

        topicNameList = ['topic1', 'topic2', 'topic3', 'topic4']
        consumeGroupIdList = ['cgrp1', 'cgrp1', 'cgrp3', 'cgrp4']
        consumerIdList = [0, 1, 2, 3]
        tmqCom.initConsumerTable()
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict['vgroups'],replica=1)
        tdLog.info("create stb")
        tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
        tdLog.info("create ctb")
        tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
        # tdLog.info("insert data")
        # tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
C
cpwu 已提交
59

P
plum-lihui 已提交
60 61 62 63
        tdLog.info("create 4 topics")
        sqlString = "create topic %s as database %s" %(topicNameList[0], paraDict['dbName'])
        tdLog.info("create topic sql: %s"%sqlString)
        tdSql.execute(sqlString)
C
cpwu 已提交
64

P
plum-lihui 已提交
65 66
        sqlString = "create topic %s as stable %s.%s" %(topicNameList[1], paraDict['dbName'], paraDict['stbName'])
        tdLog.info("create topic sql: %s"%sqlString)
C
cpwu 已提交
67
        tdSql.execute(sqlString)
P
plum-lihui 已提交
68 69 70 71 72

        queryString = "select * from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
        sqlString = "create topic %s as %s" %(topicNameList[2], queryString)
        tdLog.info("create topic sql: %s"%sqlString)
        tdSql.execute(sqlString)
C
cpwu 已提交
73

P
plum-lihui 已提交
74 75 76 77 78 79
        queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
        sqlString = "create topic %s as %s " %(topicNameList[3], queryString)
        tdLog.info("create topic sql: %s"%sqlString)
        tdSql.execute(sqlString)

        tdSql.query("show topics")
C
cpwu 已提交
80
        tdLog.debug(tdSql.queryResult)
P
plum-lihui 已提交
81 82 83
        rows = tdSql.getRows()
        if rows != len(consumerIdList):
            tdLog.exit("topic rows error")
C
cpwu 已提交
84

P
plum-lihui 已提交
85 86 87 88 89 90 91 92 93 94
        for i in range (rows):
            topicName = tdSql.getData(i,0)
            matchFlag = 0
            while matchFlag == 0:
                for j in range(len(topicNameList)):
                    if topicName == topicNameList[j]:
                        matchFlag = 1
                        break
                if matchFlag == 0:
                    tdLog.exit("topic name: %s is error", topicName)
C
cpwu 已提交
95

P
plum-lihui 已提交
96 97 98 99 100
        # init consume info, and start tmq_sim, then check consume result
        tdLog.info("insert consume info to consume processor")
        expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
        topicList    = topicNameList[0]
        ifcheckdata  = 0
C
cpwu 已提交
101
        ifManualCommit = 0
P
plum-lihui 已提交
102
        keyList      = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[0]
P
plum-lihui 已提交
103 104
        tsOffset=1
        self.insertConsumerInfo(consumerIdList[0], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit,tsOffset)
C
cpwu 已提交
105

P
plum-lihui 已提交
106 107
        topicList    = topicNameList[1]
        keyList      = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[1]
P
plum-lihui 已提交
108 109
        tsOffset=2
        self.insertConsumerInfo(consumerIdList[1], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit,tsOffset)
C
cpwu 已提交
110

P
plum-lihui 已提交
111 112
        topicList    = topicNameList[2]
        keyList      = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[2]
P
plum-lihui 已提交
113 114
        tsOffset=3
        self.insertConsumerInfo(consumerIdList[2], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit,tsOffset)
C
cpwu 已提交
115

P
plum-lihui 已提交
116 117
        topicList    = topicNameList[3]
        keyList      = 'group.id:%s, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'%consumeGroupIdList[3]
P
plum-lihui 已提交
118 119
        tsOffset=4
        self.insertConsumerInfo(consumerIdList[3], expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit,tsOffset)
P
plum-lihui 已提交
120 121 122 123 124 125 126

        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])

        tdLog.info("async insert data")
        pThread = tmqCom.asyncInsertData(paraDict)

P
plum-lihui 已提交
127 128
        tmqCom.getStartConsumeNotifyFromTmqsim();
        #time.sleep(5)
P
plum-lihui 已提交
129 130
        tdLog.info("check show consumers")
        tdSql.query("show consumers")
C
cpwu 已提交
131
        # tdLog.info(tdSql.queryResult)
P
plum-lihui 已提交
132 133 134 135
        rows = tdSql.getRows()
        tdLog.info("show consumers rows: %d"%rows)
        if rows != len(topicNameList):
            tdLog.exit("show consumers rows error")
C
cpwu 已提交
136 137

        tdLog.info("check show subscriptions")
P
plum-lihui 已提交
138
        tdSql.query("show subscriptions")
C
cpwu 已提交
139
        # tdLog.debug(tdSql.queryResult)
P
plum-lihui 已提交
140 141 142 143 144 145
        rows = tdSql.getRows()
        tdLog.info("show subscriptions rows: %d"%rows)
        if rows != paraDict['vgroups'] * len(topicNameList):
            tdLog.exit("show subscriptions rows error")

        pThread.join()
C
cpwu 已提交
146

P
plum-lihui 已提交
147 148 149
        tdLog.info("insert process end, and start to check consume result")
        expectRows = len(consumerIdList)
        _ = tmqCom.selectConsumeResult(expectRows)
C
cpwu 已提交
150 151

        time.sleep(10)
P
plum-lihui 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
        for i in range(len(topicNameList)):
            tdSql.query("drop topic %s"%topicNameList[i])

        tdLog.printNoPrefix("======== test case 1 end ...... ")

    def run(self):
        tdSql.prepare()
        self.tmqCase1()

    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

event = threading.Event()

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())