提交 98487594 编写于 作者: S Shengliang Guan

Merge from master into develop

......@@ -607,6 +607,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'Type=simple' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/taosd' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStartPre=/usr/local/taos/bin/startPre.sh' >> ${taosd_service_config}"
${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${taosd_service_config}"
......@@ -630,6 +631,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo '[Service]' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'Type=simple' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/tarbitrator' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${tarbitratord_service_config}"
......@@ -655,6 +657,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'PIDFile=/usr/local/nginxd/logs/nginx.pid' >> ${nginx_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/local/nginxd/sbin/nginx' >> ${nginx_service_config}"
${csudo} bash -c "echo 'ExecStop=/usr/local/nginxd/sbin/nginx -s stop' >> ${nginx_service_config}"
${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${nginx_service_config}"
......
......@@ -205,6 +205,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo '[Service]' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'Type=simple' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/tarbitrator' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${tarbitratord_service_config}"
......
......@@ -205,6 +205,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo '[Service]' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'Type=simple' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/tarbitrator' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${tarbitratord_service_config}"
......
......@@ -577,6 +577,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'Type=simple' >> ${powerd_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/powerd' >> ${powerd_service_config}"
${csudo} bash -c "echo 'ExecStartPre=/usr/local/power/bin/startPre.sh' >> ${powerd_service_config}"
${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${powerd_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${powerd_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${powerd_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${powerd_service_config}"
......@@ -599,6 +600,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo '[Service]' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'Type=simple' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/tarbitrator' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${tarbitratord_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${tarbitratord_service_config}"
......@@ -624,6 +626,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'PIDFile=/usr/local/nginxd/logs/nginx.pid' >> ${nginx_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/local/nginxd/sbin/nginx' >> ${nginx_service_config}"
${csudo} bash -c "echo 'ExecStop=/usr/local/nginxd/sbin/nginx -s stop' >> ${nginx_service_config}"
${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${nginx_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${nginx_service_config}"
......
......@@ -333,6 +333,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'Type=simple' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/taosd' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStartPre=/usr/local/taos/bin/startPre.sh' >> ${taosd_service_config}"
${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${taosd_service_config}"
......
......@@ -405,6 +405,7 @@ function install_service_on_systemd() {
${csudo} bash -c "echo 'Type=simple' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStart=/usr/bin/taosd' >> ${taosd_service_config}"
${csudo} bash -c "echo 'ExecStartPre=/usr/local/taos/bin/startPre.sh' >> ${taosd_service_config}"
${csudo} bash -c "echo 'TimeoutStopSec=1000000s' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNOFILE=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitNPROC=infinity' >> ${taosd_service_config}"
${csudo} bash -c "echo 'LimitCORE=infinity' >> ${taosd_service_config}"
......
......@@ -2861,7 +2861,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
tscDebug("0x%"PRIx64" sub:%p retrieve numOfRows:%d totalNumOfRows:%" PRIu64 " from ep:%s, orderOfSub:%d",
pParentSql->self, pSql, pRes->numOfRows, pState->numOfRetrievedRows, pSql->epSet.fqdn[pSql->epSet.inUse], idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0) && !(tscGetQueryInfoDetail(&pParentSql->cmd, 0)->distinctTag)) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
pParentSql->self, pSql->self, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
......
......@@ -87,7 +87,9 @@ tExprNode* exprTreeFromBinary(const void* data, size_t size);
tExprNode* exprTreeFromTableName(const char* tbnameCond);
tExprNode* exprdup(tExprNode* pTree);
bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param);
void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree);
bool exprTreeApplayFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param);
void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, const char*, int32_t));
......
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
Subproject commit 3530c6df097134a410bacec6b3cd013ef38a61aa
......@@ -122,6 +122,7 @@
<exclude>**/FailOverTest.java</exclude>
<exclude>**/InvalidResultSetPointerTest.java</exclude>
<exclude>**/RestfulConnectionTest.java</exclude>
<exclude>**/TD4144Test.java</exclude>
</excludes>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
......
package com.taosdata.jdbc.cases;
import com.taosdata.jdbc.TSDBConnection;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBResultSet;
import com.taosdata.jdbc.TSDBSubscribe;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.DriverManager;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class TD4144Test {
private static TSDBConnection connection;
private static final String host = "127.0.0.1";
private static final String topic = "topic-meter-current-bg-10";
private static final String sql = "select * from meters where current > 10";
private static final String sql2 = "select * from meters where ts >= '2020-08-15 12:20:00.000'";
@Test
public void test() throws SQLException {
TSDBSubscribe subscribe = null;
TSDBResultSet res = null;
boolean hasNext = false;
try {
subscribe = connection.subscribe(topic, sql, false);
int count = 0;
while (true) {
// 等待1秒,避免频繁调用 consume,给服务端造成压力
TimeUnit.SECONDS.sleep(1);
if (res == null) {
// 消费数据
res = subscribe.consume();
hasNext = res.next();
}
if (res == null) {
continue;
}
ResultSetMetaData metaData = res.getMetaData();
int number = 0;
while (hasNext) {
int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
System.out.print(metaData.getColumnLabel(i) + ": " + res.getString(i) + "\t");
}
System.out.println();
count++;
number++;
hasNext = res.next();
if (!hasNext) {
res.close();
res = null;
System.out.println("rows: " + count);
}
if (hasNext == true && number >= 10) {
System.out.println("batch" + number);
break;
}
}
}
} catch (SQLException | InterruptedException throwables) {
throwables.printStackTrace();
} finally {
if (subscribe != null)
subscribe.close(true);
}
}
@BeforeClass
public static void beforeClass() throws SQLException {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
String url = "jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata";
connection = (DriverManager.getConnection(url, properties)).unwrap(TSDBConnection.class);
try (Statement stmt = connection.createStatement()) {
stmt.execute("drop database if exists power");
stmt.execute("create database if not exists power");
stmt.execute("use power");
stmt.execute("create table meters(ts timestamp, current float, voltage int, phase int) tags(location binary(64), groupId int)");
stmt.execute("create table d1001 using meters tags(\"Beijing.Chaoyang\", 2)");
stmt.execute("create table d1002 using meters tags(\"Beijing.Haidian\", 2)");
stmt.execute("insert into d1001 values(\"2020-08-15 12:00:00.000\", 12, 220, 1),(\"2020-08-15 12:10:00.000\", 12.3, 220, 2),(\"2020-08-15 12:20:00.000\", 12.2, 220, 1)");
stmt.execute("insert into d1002 values(\"2020-08-15 12:00:00.000\", 9.9, 220, 1),(\"2020-08-15 12:10:00.000\", 10.3, 220, 1),(\"2020-08-15 12:20:00.000\", 11.2, 220, 1)");
}
}
@AfterClass
public static void afterClass() throws SQLException {
if (connection != null)
connection.close();
}
}
from .connection import TDengineConnection
from .cursor import TDengineCursor
from .error import Error
# Globals
threadsafety = 0
paramstyle = 'pyformat'
__all__ = ['connection', 'cursor']
def connect(*args, **kwargs):
""" Function to return a TDengine connector object
Current supporting keyword parameters:
@dsn: Data source name as string
@user: Username as string(optional)
@password: Password as string(optional)
@host: Hostname(optional)
@database: Database name(optional)
@rtype: TDengineConnector
"""
return TDengineConnection(*args, **kwargs)
......@@ -592,14 +592,14 @@ void tSetDbName(SStrToken *pCpxName, SStrToken *pDb) {
void tSetColumnInfo(TAOS_FIELD *pField, SStrToken *pName, TAOS_FIELD *pType) {
int32_t maxLen = sizeof(pField->name) / sizeof(pField->name[0]);
// truncate the column name
if ((int32_t)pName->n >= maxLen) {
pName->n = maxLen - 1;
}
strncpy(pField->name, pName->z, pName->n);
pField->name[pName->n] = 0;
// column name is too long, set the it to be invalid.
if ((int32_t) pName->n >= maxLen) {
pName->n = -1;
} else {
strncpy(pField->name, pName->z, pName->n);
pField->name[pName->n] = 0;
}
pField->type = pType->type;
if(!isValidDataType(pField->type)){
......
......@@ -96,6 +96,7 @@ typedef struct tSkipListState {
} tSkipListState;
typedef struct SSkipList {
unsigned int seed;
__compar_fn_t comparFn;
__sl_key_fn_t keyFn;
pthread_rwlock_t *lock;
......
......@@ -50,6 +50,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
pSkipList->len = keyLen;
pSkipList->flags = flags;
pSkipList->keyFn = fn;
pSkipList->seed = rand();
if (comparFn == NULL) {
pSkipList->comparFn = getKeyComparFunc(keyType);
} else {
......@@ -545,7 +546,12 @@ static FORCE_INLINE int32_t getSkipListNodeRandomHeight(SSkipList *pSkipList) {
const uint32_t factor = 4;
int32_t n = 1;
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
while ((rand() % factor) == 0 && n <= pSkipList->maxLevel) {
#else
while ((rand_r(&(pSkipList->seed)) % factor) == 0 && n <= pSkipList->maxLevel) {
#endif
n++;
}
......
# Helpful Ref: https://stackoverflow.com/questions/24100558/how-can-i-split-a-module-into-multiple-files-without-breaking-a-backwards-compa/24100645
from crash_gen.service_manager import ServiceManager, TdeInstance, TdeSubProcess
from __future__ import annotations
import argparse
gConfig: argparse.Namespace
def init():
global gConfig
gConfig = []
\ No newline at end of file
from __future__ import annotations
import argparse
from typing import Optional
from .misc import CrashGenError
# from crash_gen.misc import CrashGenError
# gConfig: Optional[argparse.Namespace]
class Config:
_config = None # type Optional[argparse.Namespace]
@classmethod
def init(cls, parser: argparse.ArgumentParser):
if cls._config is not None:
raise CrashGenError("Config can only be initialized once")
cls._config = parser.parse_args()
# print(cls._config)
@classmethod
def setConfig(cls, config: argparse.Namespace):
cls._config = config
@classmethod
# TODO: check items instead of exposing everything
def getConfig(cls) -> argparse.Namespace:
if cls._config is None:
raise CrashGenError("invalid state")
return cls._config
@classmethod
def clearConfig(cls):
cls._config = None
@classmethod
def isSet(cls, cfgKey):
cfg = cls.getConfig()
if cfgKey not in cfg:
return False
return cfg.__getattribute__(cfgKey)
\ No newline at end of file
from __future__ import annotations
import sys
import os
import datetime
import time
import threading
import requests
from requests.auth import HTTPBasicAuth
import taos
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.log import *
from .misc import Logging, CrashGenError, Helper, Dice
import os
import datetime
import traceback
# from .service_manager import TdeInstance
import crash_gen.settings
from .config import Config
from .misc import Logging, CrashGenError, Helper
from .types import QueryResult
class DbConn:
TYPE_NATIVE = "native-c"
......@@ -79,7 +81,7 @@ class DbConn:
raise RuntimeError("Cannot query database until connection is open")
nRows = self.query(sql)
if nRows != 1:
raise taos.error.ProgrammingError(
raise CrashGenError(
"Unexpected result for query: {}, rows = {}".format(sql, nRows),
(CrashGenError.INVALID_EMPTY_RESULT if nRows==0 else CrashGenError.INVALID_MULTIPLE_RESULT)
)
......@@ -115,7 +117,7 @@ class DbConn:
try:
self.execute(sql)
return True # ignore num of results, return success
except taos.error.ProgrammingError as err:
except taos.error.Error as err:
return False # failed, for whatever TAOS reason
# Not possile to reach here, non-TAOS exception would have been thrown
......@@ -126,7 +128,7 @@ class DbConn:
def openByType(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getQueryResult(self):
def getQueryResult(self) -> QueryResult :
raise RuntimeError("Unexpected execution, should be overriden")
def getResultRows(self):
......@@ -221,7 +223,7 @@ class DbConnRest(DbConn):
class MyTDSql:
# Class variables
_clsLock = threading.Lock() # class wide locking
longestQuery = None # type: str
longestQuery = '' # type: str
longestQueryTime = 0.0 # seconds
lqStartTime = 0.0
# lqEndTime = 0.0 # Not needed, as we have the two above already
......@@ -249,7 +251,13 @@ class MyTDSql:
def _execInternal(self, sql):
startTime = time.time()
# Logging.debug("Executing SQL: " + sql)
# ret = None # TODO: use strong type here
# try: # Let's not capture the error, and let taos.error.ProgrammingError pass through
ret = self._cursor.execute(sql)
# except taos.error.ProgrammingError as err:
# Logging.warning("Taos SQL execution error: {}, SQL: {}".format(err.msg, sql))
# raise CrashGenError(err.msg)
# print("\nSQL success: {}".format(sql))
queryTime = time.time() - startTime
# Record the query time
......@@ -261,7 +269,7 @@ class MyTDSql:
cls.lqStartTime = startTime
# Now write to the shadow database
if crash_gen.settings.gConfig.use_shadow_db:
if Config.isSet('use_shadow_db'):
if sql[:11] == "INSERT INTO":
if sql[:16] == "INSERT INTO db_0":
sql2 = "INSERT INTO db_s" + sql[16:]
......@@ -453,31 +461,11 @@ class DbManager():
''' Release the underlying DB connection upon deletion of DbManager '''
self.cleanUp()
def getDbConn(self):
def getDbConn(self) -> DbConn :
if self._dbConn is None:
raise CrashGenError("Unexpected empty DbConn")
return self._dbConn
# TODO: not used any more, to delete
def pickAndAllocateTable(self): # pick any table, and "use" it
return self.tableNumQueue.pickAndAllocate()
# TODO: Not used any more, to delete
def addTable(self):
with self._lock:
tIndex = self.tableNumQueue.push()
return tIndex
# Not used any more, to delete
def releaseTable(self, i): # return the table back, so others can use it
self.tableNumQueue.release(i)
# TODO: not used any more, delete
def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition!
if (not tblNum): # maybe false
return False
return "table_{}".format(tblNum)
def cleanUp(self):
if self._dbConn:
self._dbConn.close()
......
......@@ -3,6 +3,7 @@ import random
import logging
import os
import sys
from typing import Optional
import taos
......@@ -39,14 +40,14 @@ class MyLoggingAdapter(logging.LoggerAdapter):
class Logging:
logger = None
logger = None # type: Optional[MyLoggingAdapter]
@classmethod
def getLogger(cls):
return logger
return cls.logger
@classmethod
def clsInit(cls, gConfig): # TODO: refactor away gConfig
def clsInit(cls, debugMode: bool):
if cls.logger:
return
......@@ -60,13 +61,9 @@ class Logging:
# Logging adapter, to be used as a logger
# print("setting logger variable")
# global logger
cls.logger = MyLoggingAdapter(_logger, [])
if (gConfig.debug):
cls.logger.setLevel(logging.DEBUG) # default seems to be INFO
else:
cls.logger.setLevel(logging.INFO)
cls.logger = MyLoggingAdapter(_logger, {})
cls.logger.setLevel(logging.DEBUG if debugMode else logging.INFO) # default seems to be INFO
@classmethod
def info(cls, msg):
cls.logger.info(msg)
......@@ -84,6 +81,7 @@ class Logging:
cls.logger.error(msg)
class Status:
STATUS_EMPTY = 99
STATUS_STARTING = 1
STATUS_RUNNING = 2
STATUS_STOPPING = 3
......@@ -95,12 +93,16 @@ class Status:
def __repr__(self):
return "[Status: v={}]".format(self._status)
def set(self, status):
def set(self, status: int):
self._status = status
def get(self):
return self._status
def isEmpty(self):
''' Empty/Undefined '''
return self._status == Status.STATUS_EMPTY
def isStarting(self):
return self._status == Status.STATUS_STARTING
......@@ -117,6 +119,9 @@ class Status:
def isStable(self):
return self.isRunning() or self.isStopped()
def isActive(self):
return self.isStarting() or self.isRunning() or self.isStopping()
# Deterministic random number generator
class Dice():
seeded = False # static, uninitialized
......
from typing import Any, List, Dict, NewType
from enum import Enum
DirPath = NewType('DirPath', str)
QueryResult = NewType('QueryResult', List[List[Any]])
class TdDataType(Enum):
'''
Use a Python Enum types of represent all the data types in TDengine.
Ref: https://www.taosdata.com/cn/documentation/taos-sql#data-type
'''
TIMESTAMP = 'TIMESTAMP'
INT = 'INT'
BIGINT = 'BIGINT'
FLOAT = 'FLOAT'
DOUBLE = 'DOUBLE'
BINARY = 'BINARY'
BINARY16 = 'BINARY(16)' # TODO: get rid of this hack
BINARY200 = 'BINARY(200)'
SMALLINT = 'SMALLINT'
TINYINT = 'TINYINT'
BOOL = 'BOOL'
NCHAR = 'NCHAR'
TdColumns = Dict[str, TdDataType]
TdTags = Dict[str, TdDataType]
#!/usr/bin/python3.8
from abc import abstractmethod
import time
from datetime import datetime
from influxdb_client import InfluxDBClient, Point, WritePrecision, BucketsApi
from influxdb_client.client.write_api import SYNCHRONOUS
import argparse
import textwrap
import subprocess
import sys
import taos
from crash_gen.crash_gen_main import Database, TdSuperTable
from crash_gen.service_manager import TdeInstance
from crash_gen.shared.config import Config
from crash_gen.shared.db import DbConn
from crash_gen.shared.misc import Dice, Logging, Helper
from crash_gen.shared.types import TdDataType
# NUM_PROCESSES = 10
# NUM_REPS = 1000
tick = int(time.time() - 5000000.0) # for now we will create max 5M record
value = 101
DB_NAME = 'mydb'
TIME_SERIES_NAME = 'widget'
MAX_SHELF = 500 # shelf number runs up to this, non-inclusive
ITEMS_PER_SHELF = 5
BATCH_SIZE = 2000 # Number of data points per request
# None_RW:
# INFLUX_TOKEN='RRzVQZs8ERCpV9cS2RXqgtM_Y6FEZuJ7Tuk0aHtZItFTfcM9ajixtGDhW8HzqNIBmG3hmztw-P4sHOstfJvjFA=='
# DevOrg_RW:
# INFLUX_TOKEN='o1P8sEhBmXKhxBmNuiCyOUKv8d7qm5wUjMff9AbskBu2LcmNPQzU77NrAn5hDil8hZ0-y1AGWpzpL-4wqjFdkA=='
# DevOrg_All_Access
INFLUX_TOKEN='T2QTr4sloJhINH_oSrwSS-WIIZYjDfD123NK4ou3b7ajRs0c0IphCh3bNc0OsDZQRW1HyCby7opdEndVYFGTWQ=='
INFLUX_ORG="DevOrg"
INFLUX_BUCKET="Bucket01"
def writeTaosBatch(dbc, tblName):
# Database.setupLastTick()
global value, tick
data = []
for i in range(0, 100):
data.append("('{}', {})".format(Database.getNextTick(), value) )
value += 1
sql = "INSERT INTO {} VALUES {}".format(tblName, ''.join(data))
dbc.execute(sql)
class PerfGenError(taos.error.ProgrammingError):
pass
class Benchmark():
# @classmethod
# def create(cls, dbType):
# if dbType == 'taos':
# return TaosBenchmark()
# elif dbType == 'influx':
# return InfluxBenchmark()
# else:
# raise RuntimeError("Unknown DB type: {}".format(dbType))
def __init__(self, dbType, loopCount = 0):
self._dbType = dbType
self._setLoopCount(loopCount)
def _setLoopCount(self, loopCount):
cfgLoopCount = Config.getConfig().loop_count
if loopCount == 0: # use config
self._loopCount = cfgLoopCount
else:
if cfgLoopCount :
Logging.warning("Ignoring loop count for fixed-loop-count benchmarks: {}".format(cfgLoopCount))
self._loopCount = loopCount
@abstractmethod
def doIterate(self):
'''
Execute the benchmark directly, without invoking sub processes,
effectively using one execution thread.
'''
pass
@abstractmethod
def prepare(self):
'''
Preparation needed to run a certain benchmark
'''
pass
@abstractmethod
def execute(self):
'''
Actually execute the benchmark
'''
Logging.warning("Unexpected execution")
@property
def name(self):
return self.__class__.__name__
def run(self):
print("Running benchmark: {}, class={} ...".format(self.name, self.__class__))
startTime = time.time()
# Prepare to execute the benchmark
self.prepare()
# Actually execute the benchmark
self.execute()
# if Config.getConfig().iterate_directly: # execute directly
# Logging.debug("Iterating...")
# self.doIterate()
# else:
# Logging.debug("Executing via sub process...")
# startTime = time.time()
# self.prepare()
# self.spawnProcesses()
# self.waitForProcecess()
# duration = time.time() - startTime
# Logging.info("Benchmark execution completed in {:.3f} seconds".format(duration))
Logging.info("Benchmark {} finished in {:.3f} seconds".format(
self.name, time.time()-startTime))
def spawnProcesses(self):
self._subProcs = []
for j in range(0, Config.getConfig().subprocess_count):
ON_POSIX = 'posix' in sys.builtin_module_names
tblName = 'cars_reg_{}'.format(j)
cmdLineStr = './perf_gen.sh -t {} -i -n {} -l {}'.format(
self._dbType,
tblName,
Config.getConfig().loop_count
)
if Config.getConfig().debug:
cmdLineStr += ' -d'
subProc = subprocess.Popen(cmdLineStr,
shell = True,
close_fds = ON_POSIX)
self._subProcs.append(subProc)
def waitForProcecess(self):
for sp in self._subProcs:
sp.wait(300)
class TaosBenchmark(Benchmark):
def __init__(self, loopCount):
super().__init__('taos', loopCount)
# self._dbType = 'taos'
tInst = TdeInstance()
self._dbc = DbConn.createNative(tInst.getDbTarget())
self._dbc.open()
self._sTable = TdSuperTable(TIME_SERIES_NAME + '_s', DB_NAME)
def doIterate(self):
tblName = Config.getConfig().target_table_name
print("Benchmarking TAOS database (1 pass) for: {}".format(tblName))
self._dbc.execute("USE {}".format(DB_NAME))
self._sTable.ensureRegTable(None, self._dbc, tblName)
try:
lCount = Config.getConfig().loop_count
print("({})".format(lCount))
for i in range(0, lCount):
writeTaosBatch(self._dbc, tblName)
except taos.error.ProgrammingError as err:
Logging.error("Failed to write batch")
def prepare(self):
self._dbc.execute("CREATE DATABASE IF NOT EXISTS {}".format(DB_NAME))
self._dbc.execute("USE {}".format(DB_NAME))
# Create the super table
self._sTable.drop(self._dbc, True)
self._sTable.create(self._dbc,
{'ts': TdDataType.TIMESTAMP,
'temperature': TdDataType.INT,
'pressure': TdDataType.INT,
'notes': TdDataType.BINARY200
},
{'rack': TdDataType.INT,
'shelf': TdDataType.INT,
'barcode': TdDataType.BINARY16
})
def execSql(self, sql):
try:
self._dbc.execute(sql)
except taos.error.ProgrammingError as err:
Logging.warning("SQL Error: 0x{:X}, {}, SQL: {}".format(
Helper.convertErrno(err.errno), err.msg, sql))
raise
def executeWrite(self):
# Sample: INSERT INTO t1 USING st TAGS(1) VALUES(now, 1) t2 USING st TAGS(2) VALUES(now, 2)
sqlPrefix = "INSERT INTO "
dataTemplate = "{} USING {} TAGS({},{},'barcode_{}') VALUES('{}',{},{},'{}') "
stName = self._sTable.getName()
BATCH_SIZE = 2000 # number of items per request batch
ITEMS_PER_SHELF = 5
# rackSize = 10 # shelves per rack
# shelfSize = 100 # items per shelf
batchCount = self._loopCount // BATCH_SIZE
lastRack = 0
for i in range(batchCount):
sql = sqlPrefix
for j in range(BATCH_SIZE):
n = i*BATCH_SIZE + j # serial number
# values first
# rtName = 'rt_' + str(n) # table name contains serial number, has info
temperature = 20 + (n % 10)
pressure = 70 + (n % 10)
# tags
shelf = (n // ITEMS_PER_SHELF) % MAX_SHELF # shelf number
rack = n // (ITEMS_PER_SHELF * MAX_SHELF) # rack number
barcode = rack + shelf
# table name
tableName = "reg_" + str(rack) + '_' + str(shelf)
# now the SQL
sql += dataTemplate.format(tableName, stName,# table name
rack, shelf, barcode, # tags
Database.getNextTick(), temperature, pressure, 'xxx') # values
lastRack = rack
self.execSql(sql)
Logging.info("Last Rack: {}".format(lastRack))
class TaosWriteBenchmark(TaosBenchmark):
def execute(self):
self.executeWrite()
class Taos100kWriteBenchmark(TaosWriteBenchmark):
def __init__(self):
super().__init__(100*1000)
class Taos10kWriteBenchmark(TaosWriteBenchmark):
def __init__(self):
super().__init__(10*1000)
class Taos1mWriteBenchmark(TaosWriteBenchmark):
def __init__(self):
super().__init__(1000*1000)
class Taos5mWriteBenchmark(TaosWriteBenchmark):
def __init__(self):
super().__init__(5*1000*1000)
class Taos1kQueryBenchmark(TaosBenchmark):
def __init__(self):
super().__init__(1000)
class Taos1MCreationBenchmark(TaosBenchmark):
def __init__(self):
super().__init__(1000000)
class InfluxBenchmark(Benchmark):
def __init__(self, loopCount):
super().__init__('influx', loopCount)
# self._dbType = 'influx'
# self._client = InfluxDBClient(host='localhost', port=8086)
# def _writeBatch(self, tblName):
# global value, tick
# data = []
# for i in range(0, 100):
# line = "{},device={} value={} {}".format(
# TIME_SERIES_NAME,
# tblName,
# value,
# tick*1000000000)
# # print(line)
# data.append(line)
# value += 1
# tick +=1
# self._client.write(data, {'db':DB_NAME}, protocol='line')
def executeWrite(self):
global tick # influx tick #TODO refactor
lineTemplate = TIME_SERIES_NAME + ",rack={},shelf={},barcode='barcode_{}' temperature={},pressure={} {}"
batchCount = self._loopCount // BATCH_SIZE
for i in range(batchCount):
lineBatch = []
for j in range(BATCH_SIZE):
n = i*BATCH_SIZE + j # serial number
# values first
# rtName = 'rt_' + str(n) # table name contains serial number, has info
temperature = 20 + (n % 10)
pressure = 70 + (n % 10)
# tags
shelf = (n // ITEMS_PER_SHELF) % MAX_SHELF # shelf number
rack = n // (ITEMS_PER_SHELF * MAX_SHELF) # rack number
barcode = rack + shelf
# now the SQL
line = lineTemplate.format(
rack, shelf, barcode, # tags
temperature, pressure, # values
tick * 1000000000 )
tick += 1
lineBatch.append(line)
write_api = self._client.write_api(write_options=SYNCHRONOUS)
write_api.write(INFLUX_BUCKET, INFLUX_ORG, lineBatch)
# self._client.write(lineBatch, {'db':DB_NAME}, protocol='line')
# def doIterate(self):
# tblName = Config.getConfig().target_table_name
# print("Benchmarking INFLUX database (1 pass) for: {}".format(tblName))
# for i in range(0, Config.getConfig().loop_count):
# self._writeBatch(tblName)
def _getOrgIdByName(self, orgName):
"""Find org by name.
"""
orgApi = self._client.organizations_api()
orgs = orgApi.find_organizations()
for org in orgs:
if org.name == orgName:
return org.id
raise PerfGenError("Org not found with name: {}".format(orgName))
def _fetchAuth(self):
authApi = self._client.authorizations_api()
auths = authApi.find_authorizations()
for auth in auths:
if auth.token == INFLUX_TOKEN :
return auth
raise PerfGenError("No proper auth found")
def _verifyPermissions(self, perms: list):
if list:
return #OK
raise PerfGenError("No permission found")
def prepare(self):
self._client = InfluxDBClient(
url="http://127.0.0.1:8086",
token=INFLUX_TOKEN,
org=INFLUX_ORG)
auth = self._fetchAuth()
self._verifyPermissions(auth.permissions)
bktApi = self._client.buckets_api()
# Delete
bkt = bktApi.find_bucket_by_name(INFLUX_BUCKET)
if bkt:
bktApi.delete_bucket(bkt)
# Recreate
orgId = self._getOrgIdByName(INFLUX_ORG)
bktApi.create_bucket(bucket=None, bucket_name=INFLUX_BUCKET, org_id=orgId)
# self._client.drop_database(DB_NAME)
# self._client.create_database(DB_NAME)
# self._client.switch_database(DB_NAME)
class InfluxWriteBenchmark(InfluxBenchmark):
def execute(self):
return self.executeWrite()
class Influx10kWriteBenchmark(InfluxWriteBenchmark):
def __init__(self):
super().__init__(10*1000)
class Influx100kWriteBenchmark(InfluxWriteBenchmark):
def __init__(self):
super().__init__(100*1000)
class Influx1mWriteBenchmark(InfluxWriteBenchmark):
def __init__(self):
super().__init__(1000*1000)
class Influx5mWriteBenchmark(InfluxWriteBenchmark):
def __init__(self):
super().__init__(5*1000*1000)
def _buildCmdLineParser():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent('''\
TDengine Performance Benchmarking Tool
---------------------------------------------------------------------
'''))
parser.add_argument(
'-b',
'--benchmark-name',
action='store',
default='Taos1kQuery',
type=str,
help='Benchmark to use (default: Taos1kQuery)')
parser.add_argument(
'-d',
'--debug',
action='store_true',
help='Turn on DEBUG mode for more logging (default: false)')
parser.add_argument(
'-i',
'--iterate-directly',
action='store_true',
help='Execution operations directly without sub-process (default: false)')
parser.add_argument(
'-l',
'--loop-count',
action='store',
default=1000,
type=int,
help='Number of loops to perform, 100 operations per loop. (default: 1000)')
parser.add_argument(
'-n',
'--target-table-name',
action='store',
default=None,
type=str,
help='Regular table name in target DB (default: None)')
parser.add_argument(
'-s',
'--subprocess-count',
action='store',
default=4,
type=int,
help='Number of sub processes to spawn. (default: 10)')
parser.add_argument(
'-t',
'--target-database',
action='store',
default='taos',
type=str,
help='Benchmark target: taos, influx (default: taos)')
return parser
def main():
parser = _buildCmdLineParser()
Config.init(parser)
Logging.clsInit(Config.getConfig().debug)
Dice.seed(0) # initial seeding of dice
bName = Config.getConfig().benchmark_name
bClassName = bName + 'Benchmark'
x = globals()
if bClassName in globals():
bClass = globals()[bClassName]
bm = bClass() # Benchmark object
bm.run()
else:
raise PerfGenError("No such benchmark: {}".format(bName))
# bm = Benchmark.create(Config.getConfig().target_database)
# bm.run()
if __name__ == "__main__":
main()
#!/bin/bash
# This is the script for us to try to cause the TDengine server or client to crash
#
# PREPARATION
#
# 1. Build an compile the TDengine source code that comes with this script, in the same directory tree
# 2. Please follow the direction in our README.md, and build TDengine in the build/ directory
# 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg
# 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg
# 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above
# 6. Make sure you have the proper Python packages: # sudo apt install python3-setuptools python3-pip python3-distutils
#
# RUNNING THIS SCRIPT
#
# This script assumes the source code directory is intact, and that the binaries has been built in the
# build/ directory, as such, will will load the Python libraries in the directory tree, and also load
# the TDengine client shared library (so) file, in the build/directory, as evidenced in the env
# variables below.
#
# Running the script is simple, no parameter is needed (for now, but will change in the future).
#
# Happy Crashing...
# Due to the heavy path name assumptions/usage, let us require that the user be in the current directory
EXEC_DIR=`dirname "$0"`
if [[ $EXEC_DIR != "." ]]
then
echo "ERROR: Please execute `basename "$0"` in its own directory (for now anyway, pardon the dust)"
exit -1
fi
CURR_DIR=`pwd`
IN_TDINTERNAL="community"
if [[ "$CURR_DIR" == *"$IN_TDINTERNAL"* ]]; then
TAOS_DIR=$CURR_DIR/../../..
TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6,7|rev`/lib
else
TAOS_DIR=$CURR_DIR/../..
TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6|rev`/lib
fi
# Now getting ready to execute Python
# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk
PYTHON_EXEC=python3.8
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3:$(pwd)
# Then let us set up the library path so that our compiled SO file can be loaded by Python
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
# Now we are all let, and let's see if we can find a crash. Note we pass all params
PERF_GEN_EXEC=perf_gen.py
$PYTHON_EXEC $PERF_GEN_EXEC $@
......@@ -88,10 +88,9 @@ class TDTestCase:
# TSIM:
# TSIM: print =============== step4
tdLog.info('=============== step4')
# TSIM: sql create table $tb (ts timestamp,
# a0123456789012345678901234567890123456789 int)
# TSIM: sql create table $tb (ts timestamp, a0123456789012345678901234567890123456789 int)
getMaxColNum = "grep -w '#define TSDB_COL_NAME_LEN' ../../src/inc/taosdef.h|awk '{print $3}'"
boundary = int(subprocess.check_output(getMaxColNum, shell=True))
boundary = int(subprocess.check_output(getMaxColNum, shell=True)) - 1
tdLog.info("get max column name length is %d" % boundary)
chars = string.ascii_uppercase + string.ascii_lowercase
......
......@@ -22,7 +22,7 @@ sql_error alter table mt1 change tag a 1
sql_error create table mtx1 (ts timestamp, c1 int) tags (123 int)
sql create table mt2 (ts timestamp, c1 int) tags (abc012345678901234567890123456789012345678901234567890123456789def int)
sql_error create table mt2 (ts timestamp, c1 int) tags (abc012345678901234567890123456789012345678901234567890123456789def int)
sql create table mt3 (ts timestamp, c1 int) tags (abc012345678901234567890123456789012345678901234567890123456789 int)
sql_error alter table mt3 change tag abc012345678901234567890123456789012345678901234567890123456789 abcdefg012345678901234567890123456789012345678901234567890123456789
sql alter table mt3 change tag abc012345678901234567890123456789012345678901234567890123456789 abcdefg0123456789012345678901234567890123456789
......
......@@ -114,7 +114,11 @@ sql_error create table $tb (ts timestamp, $tag int)
sql_error create table $tb (ts timestamp, $tags int)
sql_error create table $tb (ts timestamp, $sint int)
sql_error create table $tb (ts timestamp, $tint int)
sql_error create table $tb (ts timestamp, $nchar int)
sql_error create table $tb (ts timestamp, $nchar int)
# too long column name
sql_error create table $tb (ts timestamp, abcde_123456789_123456789_123456789_123456789_123456789_123456789 int)
sql_error create table tx(ts timestamp, k int) tags(abcd5_123456789_123456789_123456789_123456789_123456789_123456789 int)
print illegal_column_names test passed
# case5: chinese_char_in_table_support
......
......@@ -119,4 +119,8 @@ if $rows != 4 then
return -1
endi
print ================>td-4147
sql_error create table tx(ts timestamp, a1234_0123456789_0123456789_0123456789_0123456789_0123456789_0123456789 int)
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册