dataFromTsdbNWal.py 11.7 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

import taos
import sys
import time
import socket
import os
import threading
import math

from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *

class TDTestCase:
    def __init__(self):
P
plum-lihui 已提交
20 21
        self.vgroups    = 4
        self.ctbNum     = 1
P
plum-lihui 已提交
22
        self.rowsPerTbl = 10000
G
Ganlin Zhao 已提交
23

24
    def init(self, conn, logSql, replicaVar=1):
25
        self.replicaVar = int(replicaVar)
P
plum-lihui 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
        tdLog.debug(f"start to excute {__file__}")
        tdSql.init(conn.cursor(), False)

    def prepareTestEnv(self):
        tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    1,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ctb',
                    'ctbStartIdx': 0,
P
plum-lihui 已提交
42
                    'ctbNum':     1,
P
plum-lihui 已提交
43
                    'rowsPerTbl': 10000,
P
plum-lihui 已提交
44
                    'batchNum':   100,
P
plum-lihui 已提交
45 46 47 48 49 50 51 52 53
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'pollDelay':  10,
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   1}

        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
        paraDict['rowsPerTbl'] = self.rowsPerTbl
G
Ganlin Zhao 已提交
54

P
plum-lihui 已提交
55 56 57 58 59 60 61
        tmqCom.initConsumerTable()
        tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
        tdLog.info("create stb")
        tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
        tdLog.info("create ctb")
        tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
                             ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
G
Ganlin Zhao 已提交
62

P
plum-lihui 已提交
63 64 65 66
        tdLog.info("insert data")
        tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
                                               ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
                                               startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
G
Ganlin Zhao 已提交
67

P
plum-lihui 已提交
68
        tdLog.info("flush db to let data falls into the disk")
P
plum-lihui 已提交
69
        tdSql.query("flush database %s"%(paraDict['dbName']))
70
        tdSql.execute("alter database %s wal_retention_period 3600"%(paraDict['dbName']))
P
plum-lihui 已提交
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
        return

    def tmqCase1(self):
        tdLog.printNoPrefix("======== test case 1: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    1,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ctb',
                    'ctbStartIdx': 0,
                    'ctbNum':     10,
                    'rowsPerTbl': 10000,
P
plum-lihui 已提交
88
                    'batchNum':   100,
P
plum-lihui 已提交
89
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
P
plum-lihui 已提交
90
                    'pollDelay':  5,
P
plum-lihui 已提交
91 92 93 94 95 96 97
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   1}

        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
        paraDict['rowsPerTbl'] = self.rowsPerTbl
G
Ganlin Zhao 已提交
98

P
plum-lihui 已提交
99 100 101
        topicNameList = ['topic1']
        expectRowsList = []
        tmqCom.initConsumerTable()
G
Ganlin Zhao 已提交
102

P
plum-lihui 已提交
103 104 105 106 107 108
        tdLog.info("create topics from stb with filter")
        queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
        # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
        sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
        tdLog.info("create topic sql: %s"%sqlString)
        tdSql.execute(sqlString)
G
Ganlin Zhao 已提交
109
        # tdSql.query(queryString)
P
plum-lihui 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122
        # expectRowsList.append(tdSql.getRows())

        # init consume info, and start tmq_sim, then check consume result
        tdLog.info("insert consume info to consume processor")
        consumerId   = 0
        expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
        topicList    = topicNameList[0]
        ifcheckdata  = 1
        ifManualCommit = 1
        keyList      = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        # after start consume, continue insert some data
P
plum-lihui 已提交
123 124
        paraDict['batchNum']   = 100
        paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl
P
plum-lihui 已提交
125
        pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
G
Ganlin Zhao 已提交
126

P
plum-lihui 已提交
127 128
        tdLog.info("start consume processor")
        tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
P
plum-lihui 已提交
129

P
plum-lihui 已提交
130
        pInsertThread.join()
G
Ganlin Zhao 已提交
131 132

        tdSql.query(queryString)
P
plum-lihui 已提交
133
        expectRowsList.append(tdSql.getRows())
G
Ganlin Zhao 已提交
134 135

        tdLog.info("wait the consume result")
P
plum-lihui 已提交
136 137
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
G
Ganlin Zhao 已提交
138

P
plum-lihui 已提交
139
        tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
P
plum-lihui 已提交
140 141 142
        if expectRowsList[0] != resultList[0]:
            tdLog.exit("%d tmq consume rows error!"%consumerId)

G
Ganlin Zhao 已提交
143
        tmqCom.checkFileContent(consumerId, queryString)
P
plum-lihui 已提交
144

P
plum-lihui 已提交
145 146
        tdSql.query("flush database %s"%(paraDict['dbName']))

P
plum-lihui 已提交
147
        for i in range(len(topicNameList)):
G
Ganlin Zhao 已提交
148
            tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
P
plum-lihui 已提交
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
            tdSql.query("drop topic %s"%topicNameList[i])

        tdLog.printNoPrefix("======== test case 1 end ...... ")

    def tmqCase2(self):
        tdLog.printNoPrefix("======== test case 2: ")
        paraDict = {'dbName':     'dbt',
                    'dropFlag':   1,
                    'event':      '',
                    'vgroups':    1,
                    'stbName':    'stb',
                    'colPrefix':  'c',
                    'tagPrefix':  't',
                    'colSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
                    'tagSchema':   [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
                    'ctbPrefix':  'ctb',
                    'ctbStartIdx': 0,
                    'ctbNum':     10,
                    'rowsPerTbl': 10000,
                    'batchNum':   10,
                    'startTs':    1640966400000,  # 2022-01-01 00:00:00.000
                    'pollDelay':  3,
                    'showMsg':    1,
                    'showRow':    1,
                    'snapshot':   1}

        paraDict['vgroups'] = self.vgroups
        paraDict['ctbNum'] = self.ctbNum
        paraDict['rowsPerTbl'] = self.rowsPerTbl
G
Ganlin Zhao 已提交
178

P
plum-lihui 已提交
179 180 181
        topicNameList = ['topic1']
        expectRowsList = []
        tmqCom.initConsumerTable()
G
Ganlin Zhao 已提交
182

P
plum-lihui 已提交
183 184 185 186 187 188
        tdLog.info("create topics from stb with filter")
        queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
        # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
        sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
        tdLog.info("create topic sql: %s"%sqlString)
        tdSql.execute(sqlString)
G
Ganlin Zhao 已提交
189
        tdSql.query(queryString)
P
plum-lihui 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
        expectRowsList.append(tdSql.getRows())
        totalRowsInserted = expectRowsList[0]

        # init consume info, and start tmq_sim, then check consume result
        tdLog.info("insert consume info to consume processor")
        consumerId   = 1
        expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
        topicList    = topicNameList[0]
        ifcheckdata  = 1
        ifManualCommit = 1
        keyList      = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)

        tdLog.info("start consume processor 0")
        tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
G
Ganlin Zhao 已提交
205 206
        tdLog.info("wait the consume result")

P
plum-lihui 已提交
207 208
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
P
plum-lihui 已提交
209
        actConsumeRows = resultList[0]
G
Ganlin Zhao 已提交
210

P
plum-lihui 已提交
211 212
        tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(actConsumeRows, expectrowcnt, totalRowsInserted))
        if not (expectrowcnt <= actConsumeRows and totalRowsInserted >= actConsumeRows):
P
plum-lihui 已提交
213
            tdLog.exit("%d tmq consume rows error!"%consumerId)
G
Ganlin Zhao 已提交
214

P
plum-lihui 已提交
215 216 217 218 219
        # reinit consume info, and start tmq_sim, then check consume result
        tmqCom.initConsumerTable()
        consumerId   = 2
        expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
        tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
G
Ganlin Zhao 已提交
220

P
plum-lihui 已提交
221 222
        tdLog.info("start consume processor 1")
        tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
G
Ganlin Zhao 已提交
223 224
        tdLog.info("wait the consume result")

P
plum-lihui 已提交
225 226
        expectRows = 1
        resultList = tmqCom.selectConsumeResult(expectRows)
G
Ganlin Zhao 已提交
227

P
plum-lihui 已提交
228
        actConsumeRows = resultList[0]
G
Ganlin Zhao 已提交
229
        tdLog.info("act consume rows: %d, expect rows: %d, act insert rows: %d"%(actConsumeRows, expectrowcnt, totalRowsInserted))
P
plum-lihui 已提交
230
        if not ((actConsumeRows >= expectrowcnt) and (totalRowsInserted > actConsumeRows)):
P
plum-lihui 已提交
231 232 233
            tdLog.exit("%d tmq consume rows error!"%consumerId)

        for i in range(len(topicNameList)):
G
Ganlin Zhao 已提交
234
            tmqCom.waitSubscriptionExit(tdSql,topicNameList[i])
P
plum-lihui 已提交
235 236 237 238 239 240 241 242
            tdSql.query("drop topic %s"%topicNameList[i])

        tdLog.printNoPrefix("======== test case 2 end ...... ")

    def run(self):
        tdSql.prepare()
        self.prepareTestEnv()
        self.tmqCase1()
P
plum-lihui 已提交
243
        self.tmqCase2()
P
plum-lihui 已提交
244 245 246 247 248 249 250 251 252

    def stop(self):
        tdSql.close()
        tdLog.success(f"{__file__} successfully executed")

event = threading.Event()

tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())