提交 95f76406 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/bugfix/td-1089' into feature/os

...@@ -53,12 +53,18 @@ typedef struct SParsedDataColInfo { ...@@ -53,12 +53,18 @@ typedef struct SParsedDataColInfo {
bool hasVal[TSDB_MAX_COLUMNS]; bool hasVal[TSDB_MAX_COLUMNS];
} SParsedDataColInfo; } SParsedDataColInfo;
#pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL,
// and set 'pack' to 1 to avoid break existing code.
typedef struct STidTags { typedef struct STidTags {
int16_t padding;
int64_t uid; int64_t uid;
int32_t tid; int32_t tid;
int32_t vgId; int32_t vgId;
char tag[]; char tag[];
} STidTags; } STidTags;
#pragma pack(pop)
typedef struct SJoinSupporter { typedef struct SJoinSupporter {
SSubqueryState* pState; SSubqueryState* pState;
...@@ -224,6 +230,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); ...@@ -224,6 +230,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscInitQueryInfo(SQueryInfo* pQueryInfo); void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd); void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
......
...@@ -169,7 +169,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -169,7 +169,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
// (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE // (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE
if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(dataBytes + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE); *bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE);
*interBytes = *bytes; *interBytes = *bytes;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -912,7 +912,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -912,7 +912,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY;
pQueryMsg->head.contLen = htonl(msgLen); pQueryMsg->head.contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= size); assert(msgLen + minMsgSize() <= pCmd->allocSize);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -230,6 +230,19 @@ static SArray* getTableList( SSqlObj* pSql ) { ...@@ -230,6 +230,19 @@ static SArray* getTableList( SSqlObj* pSql ) {
return result; return result;
} }
static int32_t compareTidTag(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*)p1;
const STidTags* t2 = (const STidTags*)p2;
if (t1->vgId != t2->vgId) {
return (t1->vgId > t2->vgId) ? 1 : -1;
}
if (t1->tid != t2->tid) {
return (t1->tid > t2->tid) ? 1 : -1;
}
return 0;
}
static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
SSqlObj* pSql = pSub->pSql; SSqlObj* pSql = pSub->pSql;
...@@ -270,7 +283,8 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { ...@@ -270,7 +283,8 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
pSub->progress = progress; pSub->progress = progress;
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
taosArraySort( tables, tscCompareTidTags ); taosArraySort( tables, compareTidTag );
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
tscBuildVgroupTableInfo(pSql, pTableMetaInfo, tables); tscBuildVgroupTableInfo(pSql, pTableMetaInfo, tables);
} }
taosArrayDestroy(tables); taosArrayDestroy(tables);
...@@ -410,6 +424,9 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -410,6 +424,9 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
} }
} }
size_t size = taosArrayGetSize(pSub->progress) * sizeof(STableIdInfo);
size += sizeof(SQueryTableMsg) + 4096;
tscAllocPayload(&pSql->cmd, size);
for (int retry = 0; retry < 3; retry++) { for (int retry = 0; retry < 3; retry++) {
tscRemoveFromSqlList(pSql); tscRemoveFromSqlList(pSql);
......
...@@ -1556,12 +1556,22 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) { ...@@ -1556,12 +1556,22 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) {
} }
} }
void tscFreeVgroupTableInfo(SArray* pVgroupTables) {
if (pVgroupTables != NULL) {
for (size_t i = 0; i < taosArrayGetSize(pVgroupTables); i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
taosArrayDestroy(pInfo->itemList);
}
taosArrayDestroy(pVgroupTables);
}
}
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
tscDebug("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); tscDebug("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
tscClearTableMetaInfo(pTableMetaInfo, removeFromCache); tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
free(pTableMetaInfo); free(pTableMetaInfo);
} }
......
...@@ -6549,12 +6549,15 @@ static void buildTagQueryResult(SQInfo* pQInfo) { ...@@ -6549,12 +6549,15 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
int32_t i = pQInfo->tableIndex++; int32_t i = pQInfo->tableIndex++;
STableQueryInfo *item = taosArrayGetP(pa, i); STableQueryInfo *item = taosArrayGetP(pa, i);
char *output = pQuery->sdata[0]->data + i * rsize; char *output = pQuery->sdata[0]->data + count * rsize;
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
output = varDataVal(output); output = varDataVal(output);
STableId* id = TSDB_TABLEID(item->pTable); STableId* id = TSDB_TABLEID(item->pTable);
*(int16_t *)output = 0;
output += sizeof(int16_t);
*(int64_t *)output = id->uid; // memory align problem, todo serialize *(int64_t *)output = id->uid; // memory align problem, todo serialize
output += sizeof(id->uid); output += sizeof(id->uid);
......
...@@ -2444,11 +2444,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2444,11 +2444,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
destroyTableMemIterator(pTableCheckInfo); destroyTableMemIterator(pTableCheckInfo);
if (pTableCheckInfo->pDataCols != NULL) { tdFreeDataCols(pTableCheckInfo->pDataCols);
taosTFree(pTableCheckInfo->pDataCols->buf); pTableCheckInfo->pDataCols = NULL;
}
taosTFree(pTableCheckInfo->pDataCols);
taosTFree(pTableCheckInfo->pCompInfo); taosTFree(pTableCheckInfo->pCompInfo);
} }
taosArrayDestroy(pQueryHandle->pTableCheckInfo); taosArrayDestroy(pQueryHandle->pTableCheckInfo);
......
...@@ -7,28 +7,31 @@ ...@@ -7,28 +7,31 @@
#include <taos.h> // include TDengine header file #include <taos.h> // include TDengine header file
#include <unistd.h> #include <unistd.h>
int nTotalRows;
void print_result(TAOS_RES* res, int blockFetch) { void print_result(TAOS_RES* res, int blockFetch) {
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
int num_fields = taos_num_fields(res); int num_fields = taos_num_fields(res);
TAOS_FIELD* fields = taos_fetch_fields(res); TAOS_FIELD* fields = taos_fetch_fields(res);
int nRows = 0; int nRows = 0;
char buf[4096];
if (blockFetch) { if (blockFetch) {
nRows = taos_fetch_block(res, &row); nRows = taos_fetch_block(res, &row);
for (int i = 0; i < nRows; i++) { for (int i = 0; i < nRows; i++) {
char temp[256]; taos_print_row(buf, row + i, fields, num_fields);
taos_print_row(temp, row + i, fields, num_fields); puts(buf);
puts(temp);
} }
} else { } else {
while ((row = taos_fetch_row(res))) { while ((row = taos_fetch_row(res))) {
char temp[256]; taos_print_row(buf, row, fields, num_fields);
taos_print_row(temp, row, fields, num_fields); puts(buf);
puts(temp);
nRows++; nRows++;
} }
} }
nTotalRows += nRows;
printf("%d rows consumed.\n", nRows); printf("%d rows consumed.\n", nRows);
} }
...@@ -52,47 +55,52 @@ void check_row_count(int line, TAOS_RES* res, int expected) { ...@@ -52,47 +55,52 @@ void check_row_count(int line, TAOS_RES* res, int expected) {
} }
void do_query(TAOS* taos, const char* sql) {
TAOS_RES* res = taos_query(taos, "drop database if exists test;");
taos_free_result(res);
}
void run_test(TAOS* taos) { void run_test(TAOS* taos) {
taos_query(taos, "drop database if exists test;"); do_query(taos, "drop database if exists test;");
usleep(100000); usleep(100000);
//taos_query(taos, "create database test tables 5;"); do_query(taos, "create database test;");
taos_query(taos, "create database test;");
usleep(100000); usleep(100000);
taos_query(taos, "use test;"); do_query(taos, "use test;");
usleep(100000); usleep(100000);
taos_query(taos, "create table meters(ts timestamp, a int) tags(area int);"); do_query(taos, "create table meters(ts timestamp, a int) tags(area int);");
taos_query(taos, "create table t0 using meters tags(0);"); do_query(taos, "create table t0 using meters tags(0);");
taos_query(taos, "create table t1 using meters tags(1);"); do_query(taos, "create table t1 using meters tags(1);");
taos_query(taos, "create table t2 using meters tags(2);"); do_query(taos, "create table t2 using meters tags(2);");
taos_query(taos, "create table t3 using meters tags(3);"); do_query(taos, "create table t3 using meters tags(3);");
taos_query(taos, "create table t4 using meters tags(4);"); do_query(taos, "create table t4 using meters tags(4);");
taos_query(taos, "create table t5 using meters tags(5);"); do_query(taos, "create table t5 using meters tags(5);");
taos_query(taos, "create table t6 using meters tags(6);"); do_query(taos, "create table t6 using meters tags(6);");
taos_query(taos, "create table t7 using meters tags(7);"); do_query(taos, "create table t7 using meters tags(7);");
taos_query(taos, "create table t8 using meters tags(8);"); do_query(taos, "create table t8 using meters tags(8);");
taos_query(taos, "create table t9 using meters tags(9);"); do_query(taos, "create table t9 using meters tags(9);");
taos_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);"); do_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);"); do_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);"); do_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);"); do_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);"); do_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);"); do_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);"); do_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);"); do_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);"); do_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);"); do_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);"); do_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);"); do_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);"); do_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);"); do_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);"); do_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);"); do_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);"); do_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
taos_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);"); do_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");
// super tables subscription // super tables subscription
usleep(1000000); usleep(1000000);
...@@ -104,23 +112,23 @@ void run_test(TAOS* taos) { ...@@ -104,23 +112,23 @@ void run_test(TAOS* taos) {
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 0); check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);"); do_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);"); do_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 2); check_row_count(__LINE__, res, 2);
taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);"); do_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);"); do_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 2); check_row_count(__LINE__, res, 2);
taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);"); do_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 1); check_row_count(__LINE__, res, 1);
// keep progress information and restart subscription // keep progress information and restart subscription
taos_unsubscribe(tsub, 1); taos_unsubscribe(tsub, 1);
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);"); do_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 24); check_row_count(__LINE__, res, 24);
...@@ -147,7 +155,7 @@ void run_test(TAOS* taos) { ...@@ -147,7 +155,7 @@ void run_test(TAOS* taos) {
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 0); check_row_count(__LINE__, res, 0);
taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);"); do_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
res = taos_consume(tsub); res = taos_consume(tsub);
check_row_count(__LINE__, res, 1); check_row_count(__LINE__, res, 1);
...@@ -223,7 +231,7 @@ int main(int argc, char *argv[]) { ...@@ -223,7 +231,7 @@ int main(int argc, char *argv[]) {
exit(0); exit(0);
} }
taos_query(taos, "use test;"); taos_select_db(taos, "test");
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (async) { if (async) {
// create an asynchronized subscription, the callback function will be called every 1s // create an asynchronized subscription, the callback function will be called every 1s
...@@ -251,6 +259,7 @@ int main(int argc, char *argv[]) { ...@@ -251,6 +259,7 @@ int main(int argc, char *argv[]) {
} }
} }
printf("total rows consumed: %d\n", nTotalRows);
taos_unsubscribe(tsub, keep); taos_unsubscribe(tsub, keep);
taos_close(taos); taos_close(taos);
......
###################################################################
# Copyright (c) 2020 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
import time
import random
import string
from util.log import *
from util.cases import *
from util.sql import *
from util.sub import *
class TDTestCase:
maxTables = 10000
maxCols = 50
rowsPerSecond = 1000
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdLog.notice("NOTE: this case does not stop automatically, Ctrl+C to stop")
tdSql.init(conn.cursor(), logSql)
self.conn = conn
def generateString(self, length):
chars = string.ascii_uppercase + string.ascii_lowercase
v = ""
for i in range(length):
v += random.choice(chars)
return v
def insert(self):
id = random.randint(0, self.maxTables - 1)
cola = self.generateString(40)
sql = "insert into car%d values(now, '%s', %f, %d" % (id, cola, random.random()*100, random.randint(0, 2))
for i in range(self.maxCols):
sql += ", %d" % random.randint(0, self.maxTables)
sql += ")"
tdSql.execute(sql)
def prepare(self):
tdLog.info("prepare database: test")
tdSql.execute('reset query cache')
tdSql.execute('drop database if exists test')
tdSql.execute('create database test')
tdSql.execute('use test')
def run(self):
self.prepare()
sql = "create table cars (ts timestamp, a binary(50), b float, c bool"
for i in range(self.maxCols):
sql += ", c%d int" % i
sql += ") tags(id int, category binary(30), brand binary(30));"
tdSql.execute(sql)
for i in range(self.maxTables):
tdSql.execute("create table car%d using cars tags(%d, 'category%d', 'brand%d')" % (i, i, i % 30, i // 30))
time.sleep(0.1)
total = 0
while True:
start = time.time()
for i in range(self.rowsPerSecond):
self.insert()
total = total + 1
d = time.time() - start
tdLog.info("%d rows inserted in %f seconds, total %d" % (self.rowsPerSecond, d, total))
if d < 1:
time.sleep(1 - d)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
...@@ -31,16 +31,19 @@ class TDTestCase: ...@@ -31,16 +31,19 @@ class TDTestCase:
now = int(time.time() * 1000) now = int(time.time() * 1000)
tdSql.prepare() tdSql.prepare()
tdLog.info("create a super table and 10 sub-tables, then insert 5 rows into each sub-table.") numTables = 2000
rowsPerTable = 5
totalRows = numTables * rowsPerTable
tdLog.info("create a super table and %d sub-tables, then insert %d rows into each sub-table." % (numTables, rowsPerTable))
tdSql.execute("create table meters(ts timestamp, a int, b int) tags(area int, loc binary(20));") tdSql.execute("create table meters(ts timestamp, a int, b int) tags(area int, loc binary(20));")
for i in range(0, 10): for i in range(0, numTables):
for j in range(0, 5): for j in range(0, rowsPerTable):
tdSql.execute("insert into t%d using meters tags(%d, 'area%d') values (%d, %d, %d);" % (i, i, i, now + j, j, j)) tdSql.execute("insert into t%d using meters tags(%d, 'area%d') values (%d, %d, %d);" % (i, i, i, now + j, j, j))
tdLog.info("consumption 01.") tdLog.info("consumption 01.")
tdSub.init(self.conn.subscribe(True, topic, sqlstr, 0)) tdSub.init(self.conn.subscribe(True, topic, sqlstr, 0))
tdSub.consume() tdSub.consume()
tdSub.checkRows(50) tdSub.checkRows(totalRows)
tdLog.info("consumption 02: no new rows inserted") tdLog.info("consumption 02: no new rows inserted")
tdSub.consume() tdSub.consume()
...@@ -61,17 +64,17 @@ class TDTestCase: ...@@ -61,17 +64,17 @@ class TDTestCase:
tdSub.close(False) tdSub.close(False)
tdSub.init(self.conn.subscribe(False, topic, sqlstr, 0)) tdSub.init(self.conn.subscribe(False, topic, sqlstr, 0))
tdSub.consume() tdSub.consume()
tdSub.checkRows(51) tdSub.checkRows(totalRows + 1)
tdLog.info("consumption 06: keep progress and restart the subscription") tdLog.info("consumption 06: keep progress and restart the subscription")
tdSub.close(True) tdSub.close(True)
tdSub.init(self.conn.subscribe(True, topic, sqlstr, 0)) tdSub.init(self.conn.subscribe(True, topic, sqlstr, 0))
tdSub.consume() tdSub.consume()
tdSub.checkRows(51) tdSub.checkRows(totalRows + 1)
tdLog.info("consumption 07: insert one row to two table then remove one table") tdLog.info("consumption 07: insert one row to two table then remove one table")
tdSql.execute("insert into t0 values (%d, 11, 11);" % (now + 11)) tdSql.execute("insert into t0 values (%d, 11, 11);" % (now + 11))
tdSql.execute("insert into t1 values (%d, 11, 11);" % (now + 11)) tdSql.execute("insert into t%d values (%d, 11, 11);" % ((numTables-1), (now + 11)))
tdSql.execute("drop table t0") tdSql.execute("drop table t0")
tdSub.consume() tdSub.consume()
tdSub.checkRows(1) tdSub.checkRows(1)
...@@ -80,7 +83,7 @@ class TDTestCase: ...@@ -80,7 +83,7 @@ class TDTestCase:
tdSub.close(False) tdSub.close(False)
tdSub.init(self.conn.subscribe(True, topic, sqlstr + " where ts > %d" % now, 0)) tdSub.init(self.conn.subscribe(True, topic, sqlstr + " where ts > %d" % now, 0))
tdSub.consume() tdSub.consume()
tdSub.checkRows(37) tdSub.checkRows((numTables-1) * (rowsPerTable-1) + 1)
tdLog.info("consumption 09: insert large timestamp to t2 then insert smaller timestamp to t1") tdLog.info("consumption 09: insert large timestamp to t2 then insert smaller timestamp to t1")
tdSql.execute("insert into t2 values (%d, 100, 100);" % (now + 100)) tdSql.execute("insert into t2 values (%d, 100, 100);" % (now + 100))
...@@ -101,10 +104,15 @@ class TDTestCase: ...@@ -101,10 +104,15 @@ class TDTestCase:
tdLog.info("consumption 11: two vnodes") tdLog.info("consumption 11: two vnodes")
tdSql.execute("insert into t2 values (%d, 102, 100);" % (now + 104)) tdSql.execute("insert into t2 values (%d, 102, 100);" % (now + 104))
tdSql.execute("insert into t9 values (%d, 102, 100);" % (now + 104)) tdSql.execute("insert into t1299 values (%d, 102, 100);" % (now + 104))
tdSub.consume() tdSub.consume()
tdSub.checkRows(2) tdSub.checkRows(2)
tdLog.info("consumption 12: create a new table")
tdSql.execute("insert into t%d using meters tags(%d, 'area%d') values (%d, 102, 100);" % (numTables, numTables, numTables, now + 105))
tdSub.consume()
tdSub.checkRows(1)
def stop(self): def stop(self):
tdSub.close(False) tdSub.close(False)
tdSql.close() tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册