提交 3eaeb140 编写于 作者: D dapan1121

Merge branch 'hotfix/TD-3223' into feature/TD-2577

...@@ -4,7 +4,7 @@ PROJECT(TDengine) ...@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "2.0.16.0") SET(TD_VER_NUMBER "2.0.17.0")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)
......
...@@ -166,7 +166,7 @@ TDengine 分布式架构的逻辑结构图如下: ...@@ -166,7 +166,7 @@ TDengine 分布式架构的逻辑结构图如下:
**虚拟节点(vnode)**: 为更好的支持数据分片、负载均衡,防止数据过热或倾斜,数据节点被虚拟化成多个虚拟节点(vnode,图中V2, V3, V4等)。每个 vnode 都是一个相对独立的工作单元,是时序数据存储的基本单元,具有独立的运行线程、内存空间与持久化存储的路径。一个 vnode 包含一定数量的表(数据采集点)。当创建一张新表时,系统会检查是否需要创建新的 vnode。一个数据节点上能创建的 vnode 的数量取决于该数据节点所在物理节点的硬件资源。一个 vnode 只属于一个DB,但一个DB可以有多个 vnode。一个 vnode 除存储的时序数据外,也保存有所包含的表的schema、标签值等。一个虚拟节点由所属的数据节点的EP,以及所属的VGroup ID在系统内唯一标识,由管理节点创建并管理。 **虚拟节点(vnode)**: 为更好的支持数据分片、负载均衡,防止数据过热或倾斜,数据节点被虚拟化成多个虚拟节点(vnode,图中V2, V3, V4等)。每个 vnode 都是一个相对独立的工作单元,是时序数据存储的基本单元,具有独立的运行线程、内存空间与持久化存储的路径。一个 vnode 包含一定数量的表(数据采集点)。当创建一张新表时,系统会检查是否需要创建新的 vnode。一个数据节点上能创建的 vnode 的数量取决于该数据节点所在物理节点的硬件资源。一个 vnode 只属于一个DB,但一个DB可以有多个 vnode。一个 vnode 除存储的时序数据外,也保存有所包含的表的schema、标签值等。一个虚拟节点由所属的数据节点的EP,以及所属的VGroup ID在系统内唯一标识,由管理节点创建并管理。
**管理节点(mnode):** 一个虚拟的逻辑单元,负责所有数据节点运行状态的监控和维护,以及节点之间的负载均衡(图中M)。同时,管理节点也负责元数据(包括用户、数据库、表、静态标签等)的存储和管理,因此也称为 Meta Node。TDengine 集群中可配置多个(最多不超过5个) mnode,它们自动构建成为一个虚拟管理节点组(图中M0, M1, M2)。mnode 间采用 master/slave 的机制进行管理,而且采取强一致方式进行数据同步, 任何数据更新操作只能在 Master 上进行。mnode 集群的创建由系统自动完成,无需人工干预。每个dnode上至多有一个mnode,由所属的数据节点的EP来唯一标识。每个dnode通过内部消息交互自动获取整个集群中所有 mnode 所在的 dnode 的EP。 **管理节点(mnode):** 一个虚拟的逻辑单元,负责所有数据节点运行状态的监控和维护,以及节点之间的负载均衡(图中M)。同时,管理节点也负责元数据(包括用户、数据库、表、静态标签等)的存储和管理,因此也称为 Meta Node。TDengine 集群中可配置多个(开源版最多不超过3个) mnode,它们自动构建成为一个虚拟管理节点组(图中M0, M1, M2)。mnode 间采用 master/slave 的机制进行管理,而且采取强一致方式进行数据同步, 任何数据更新操作只能在 Master 上进行。mnode 集群的创建由系统自动完成,无需人工干预。每个dnode上至多有一个mnode,由所属的数据节点的EP来唯一标识。每个dnode通过内部消息交互自动获取整个集群中所有 mnode 所在的 dnode 的EP。
**虚拟节点组(VGroup):** 不同数据节点上的 vnode 可以组成一个虚拟节点组(vnode group)来保证系统的高可靠。虚拟节点组内采取master/slave的方式进行管理。写操作只能在 master vnode 上进行,系统采用异步复制的方式将数据同步到 slave vnode,这样确保了一份数据在多个物理节点上有拷贝。一个 vgroup 里虚拟节点个数就是数据的副本数。如果一个DB的副本数为N,系统必须有至少N个数据节点。副本数在创建DB时通过参数 replica 可以指定,缺省为1。使用 TDengine 的多副本特性,可以不再需要昂贵的磁盘阵列等存储设备,就可以获得同样的数据高可靠性。虚拟节点组由管理节点创建、管理,并且由管理节点分配一个系统唯一的ID,VGroup ID。如果两个虚拟节点的vnode group ID相同,说明他们属于同一个组,数据互为备份。虚拟节点组里虚拟节点的个数是可以动态改变的,容许只有一个,也就是没有数据复制。VGroup ID是永远不变的,即使一个虚拟节点组被删除,它的ID也不会被收回重复利用。 **虚拟节点组(VGroup):** 不同数据节点上的 vnode 可以组成一个虚拟节点组(vnode group)来保证系统的高可靠。虚拟节点组内采取master/slave的方式进行管理。写操作只能在 master vnode 上进行,系统采用异步复制的方式将数据同步到 slave vnode,这样确保了一份数据在多个物理节点上有拷贝。一个 vgroup 里虚拟节点个数就是数据的副本数。如果一个DB的副本数为N,系统必须有至少N个数据节点。副本数在创建DB时通过参数 replica 可以指定,缺省为1。使用 TDengine 的多副本特性,可以不再需要昂贵的磁盘阵列等存储设备,就可以获得同样的数据高可靠性。虚拟节点组由管理节点创建、管理,并且由管理节点分配一个系统唯一的ID,VGroup ID。如果两个虚拟节点的vnode group ID相同,说明他们属于同一个组,数据互为备份。虚拟节点组里虚拟节点的个数是可以动态改变的,容许只有一个,也就是没有数据复制。VGroup ID是永远不变的,即使一个虚拟节点组被删除,它的ID也不会被收回重复利用。
......
...@@ -225,7 +225,13 @@ SHOW MNODES; ...@@ -225,7 +225,13 @@ SHOW MNODES;
## <a class="anchor" id="arbitrator"></a>Arbitrator的使用 ## <a class="anchor" id="arbitrator"></a>Arbitrator的使用
如果副本数为偶数,当一个vnode group里一半vnode不工作时,是无法从中选出master的。同理,一半mnode不工作时,是无法选出mnode的master的,因为存在“split brain”问题。为解决这个问题,TDengine引入了Arbitrator的概念。Arbitrator模拟一个vnode或mnode在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含Arbitrator在内,超过半数的vnode或mnode工作,那么该vnode group或mnode组就可以正常的提供数据插入或查询服务。比如对于副本数为2的情形,如果一个节点A离线,但另外一个节点B正常,而且能连接到Arbitrator,那么节点B就能正常工作。 如果副本数为偶数,当一个 vnode group 里一半 vnode 不工作时,是无法从中选出 master 的。同理,一半 mnode 不工作时,是无法选出 mnode 的 master 的,因为存在“split brain”问题。为解决这个问题,TDengine 引入了 Arbitrator 的概念。Arbitrator 模拟一个 vnode 或 mnode 在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含 Arbitrator 在内,超过半数的 vnode 或 mnode 工作,那么该 vnode group 或 mnode 组就可以正常的提供数据插入或查询服务。比如对于副本数为 2 的情形,如果一个节点 A 离线,但另外一个节点 B 正常,而且能连接到 Arbitrator,那么节点 B 就能正常工作。
TDengine提供一个执行程序,名为 tarbitrator,找任何一台Linux服务器运行它即可。请点击[安装包下载](https://www.taosdata.com/cn/all-downloads/),在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数`-p`可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为Arbitrator的End Point。如果该参数配置了,当副本数为偶数时,系统将自动连接配置的Arbitrator。如果副本数为奇数,即使配置了Arbitrator,系统也不会去建立连接。 总之,在目前版本下,TDengine 建议在双副本环境要配置 Arbitrator,以提升系统的可用性。
Arbitrator 的执行程序名为 tarbitrator。该程序对系统资源几乎没有要求,只需要保证有网络连接,找任何一台 Linux 服务器运行它即可。以下简要描述安装配置的步骤:
1. 请点击 [安装包下载](https://www.taosdata.com/cn/all-downloads/),在 TDengine Arbitrator Linux 一节中,选择合适的版本下载并安装。
2. 该应用的命令行参数 `-p` 可以指定其对外服务的端口号,缺省是 6042。
3. 修改每个 taosd 实例的配置文件,在 taos.cfg 里将参数 arbitrator 设置为 tarbitrator 程序所对应的 End Point。(如果该参数配置了,当副本数为偶数时,系统将自动连接配置的 Arbitrator。如果副本数为奇数,即使配置了 Arbitrator,系统也不会去建立连接。)
4. 在配置文件中配置了的 Arbitrator,会出现在 `SHOW DNODES;` 指令的返回结果中,对应的 role 列的值会是“arb”。
...@@ -12,9 +12,13 @@ RUN tar -zxf ${pkgFile} ...@@ -12,9 +12,13 @@ RUN tar -zxf ${pkgFile}
WORKDIR /root/${dirName}/ WORKDIR /root/${dirName}/
RUN /bin/bash install.sh -e no RUN /bin/bash install.sh -e no
RUN apt-get clean && apt-get update && apt-get install -y locales
RUN locale-gen en_US.UTF-8
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib"
ENV LANG=C.UTF-8 ENV LC_CTYPE=en_US.UTF-8
ENV LC_ALL=C.UTF-8 ENV LANG=en_US.UTF-8
ENV LC_ALL=en_US.UTF-8
EXPOSE 6030 6031 6032 6033 6034 6035 6036 6037 6038 6039 6040 6041 6042 EXPOSE 6030 6031 6032 6033 6034 6035 6036 6037 6038 6039 6040 6041 6042
CMD ["taosd"] CMD ["taosd"]
VOLUME [ "/var/lib/taos", "/var/log/taos","/etc/taos/" ] VOLUME [ "/var/lib/taos", "/var/log/taos","/etc/taos/" ]
...@@ -36,10 +36,10 @@ done ...@@ -36,10 +36,10 @@ done
echo "verNumber=${verNumber}" echo "verNumber=${verNumber}"
#docker manifest create -a tdengine/tdengine:${verNumber} tdengine/tdengine-amd64:${verNumber} tdengine/tdengine-aarch64:${verNumber} tdengine/tdengine-aarch32:${verNumber} #docker manifest create -a tdengine/tdengine:${verNumber} tdengine/tdengine-amd64:${verNumber} tdengine/tdengine-aarch64:${verNumber} tdengine/tdengine-aarch32:${verNumber}
docker manifest create -a tdengine/tdengine tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest docker manifest create -a tdengine/tdengine:latest tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest
docker login -u tdengine -p ${passWord} #replace the docker registry username and password docker login -u tdengine -p ${passWord} #replace the docker registry username and password
docker manifest push tdengine/tdengine docker manifest push tdengine/tdengine:latest
# how set latest version ??? # how set latest version ???
name: tdengine name: tdengine
base: core18 base: core18
version: '2.0.16.0' version: '2.0.17.0'
icon: snap/gui/t-dengine.svg icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT. summary: an open-source big data platform designed and optimized for IoT.
description: | description: |
...@@ -72,7 +72,7 @@ parts: ...@@ -72,7 +72,7 @@ parts:
- usr/bin/taosd - usr/bin/taosd
- usr/bin/taos - usr/bin/taos
- usr/bin/taosdemo - usr/bin/taosdemo
- usr/lib/libtaos.so.2.0.16.0 - usr/lib/libtaos.so.2.0.17.0
- usr/lib/libtaos.so.1 - usr/lib/libtaos.so.1
- usr/lib/libtaos.so - usr/lib/libtaos.so
......
...@@ -1880,6 +1880,13 @@ void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, S ...@@ -1880,6 +1880,13 @@ void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, S
} }
} }
if (p && taosArrayGetSize(p) > 0) {
SResPair *l = taosArrayGetLast(p);
if (l->key == key && key == INT64_MIN) {
continue;
}
}
//append a new column //append a new column
if (p == NULL) { if (p == NULL) {
SStddevInterResult t = {.colId = id, .pResult = taosArrayInit(10, sizeof(SResPair)),}; SStddevInterResult t = {.colId = id, .pResult = taosArrayInit(10, sizeof(SResPair)),};
......
...@@ -57,10 +57,8 @@ public abstract class AbstractDriver implements Driver { ...@@ -57,10 +57,8 @@ public abstract class AbstractDriver implements Driver {
} }
protected void loadTaosConfig(Properties info) { protected void loadTaosConfig(Properties info) {
if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null || if ((info.getProperty(TSDBDriver.PROPERTY_KEY_HOST) == null || info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && (
info.getProperty(TSDBDriver.PROPERTY_KEY_HOST).isEmpty()) && ( info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null || info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT) == null ||
info.getProperty(TSDBDriver.PROPERTY_KEY_PORT).isEmpty())) {
File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR)); File cfgDir = loadConfigDir(info.getProperty(TSDBDriver.PROPERTY_KEY_CONFIG_DIR));
File cfgFile = cfgDir.listFiles((dir, name) -> TAOS_CFG_FILENAME.equalsIgnoreCase(name))[0]; File cfgFile = cfgDir.listFiles((dir, name) -> TAOS_CFG_FILENAME.equalsIgnoreCase(name))[0];
List<String> endpoints = loadConfigEndpoints(cfgFile); List<String> endpoints = loadConfigEndpoints(cfgFile);
......
package com.taosdata.jdbc.cases;
import com.taosdata.jdbc.utils.TimestampUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.*;
public class DatetimeBefore1970Test {
private static Connection conn;
@Test
public void test() {
try (Statement stmt = conn.createStatement()) {
stmt.executeUpdate("insert into weather(ts) values('1969-12-31 23:59:59.999')");
stmt.executeUpdate("insert into weather(ts) values('1970-01-01 00:00:00.000')");
stmt.executeUpdate("insert into weather(ts) values('1970-01-01 08:00:00.000')");
stmt.executeUpdate("insert into weather(ts) values('1970-01-01 07:59:59.999')");
ResultSet rs = stmt.executeQuery("select * from weather");
while (rs.next()) {
Timestamp ts = rs.getTimestamp("ts");
System.out.println("long: " + ts.getTime() + ", string: " + TimestampUtil.longToDatetime(ts.getTime()));
}
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
System.out.println("timestamp: " + Long.MAX_VALUE + ", string: " + TimestampUtil.longToDatetime(Long.MAX_VALUE));
System.out.println("timestamp: " + Long.MIN_VALUE + ", string: " + TimestampUtil.longToDatetime(Long.MIN_VALUE));
System.out.println("timestamp: " + 0 + ", string: " + TimestampUtil.longToDatetime(0));
System.out.println("timestamp: " + -1 + ", string: " + TimestampUtil.longToDatetime(-1));
String datetime = "1970-01-01 00:00:00.000";
System.out.println("timestamp: " + TimestampUtil.datetimeToLong(datetime) + ", string: " + datetime);
datetime = "1969-12-31 23:59:59.999";
System.out.println("timestamp: " + TimestampUtil.datetimeToLong(datetime) + ", string: " + datetime);
}
@BeforeClass
public static void beforeClass() {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
conn = DriverManager.getConnection("jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata");
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists test_timestamp");
stmt.execute("create database if not exists test_timestamp keep 36500");
stmt.execute("use test_timestamp");
stmt.execute("create table weather(ts timestamp,f1 float)");
stmt.close();
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
}
}
@AfterClass
public static void afterClass() {
try {
if (conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
...@@ -4,7 +4,7 @@ import java.text.ParseException; ...@@ -4,7 +4,7 @@ import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
public class TimeStampUtil { public class TimestampUtil {
private static final String datetimeFormat = "yyyy-MM-dd HH:mm:ss.SSS"; private static final String datetimeFormat = "yyyy-MM-dd HH:mm:ss.SSS";
......
...@@ -198,6 +198,14 @@ void dnodeCleanupVnodes() { ...@@ -198,6 +198,14 @@ void dnodeCleanupVnodes() {
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) { if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code)); dError("status rsp is received, error:%s", tstrerror(pMsg->code));
if (pMsg->code == TSDB_CODE_MND_DNODE_NOT_EXIST) {
char clusterId[TSDB_CLUSTER_ID_LEN];
dnodeGetClusterId(clusterId);
if (clusterId[0] != '\0') {
dError("exit zombie dropped dnode");
exit(EXIT_FAILURE);
}
}
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return; return;
} }
......
此差异已折叠。
...@@ -35,16 +35,19 @@ import os ...@@ -35,16 +35,19 @@ import os
import signal import signal
import traceback import traceback
import resource import resource
from guppy import hpy # from guppy import hpy
import gc import gc
from crash_gen.service_manager import ServiceManager, TdeInstance from crash_gen.service_manager import ServiceManager, TdeInstance
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
import crash_gen.settings
import taos import taos
import requests import requests
crash_gen.settings.init()
# Require Python 3 # Require Python 3
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
raise Exception("Must be using Python 3") raise Exception("Must be using Python 3")
...@@ -259,6 +262,7 @@ class ThreadCoordinator: ...@@ -259,6 +262,7 @@ class ThreadCoordinator:
self._execStats = ExecutionStats() self._execStats = ExecutionStats()
self._runStatus = Status.STATUS_RUNNING self._runStatus = Status.STATUS_RUNNING
self._initDbs() self._initDbs()
self._stepStartTime = None # Track how long it takes to execute each step
def getTaskExecutor(self): def getTaskExecutor(self):
return self._te return self._te
...@@ -394,6 +398,10 @@ class ThreadCoordinator: ...@@ -394,6 +398,10 @@ class ThreadCoordinator:
try: try:
self._syncAtBarrier() # For now just cross the barrier self._syncAtBarrier() # For now just cross the barrier
Progress.emit(Progress.END_THREAD_STEP) Progress.emit(Progress.END_THREAD_STEP)
if self._stepStartTime :
stepExecTime = time.time() - self._stepStartTime
Progress.emitStr('{:.3f}s/{}'.format(stepExecTime, DbConnNative.totalRequests))
DbConnNative.resetTotalRequests() # reset to zero
except threading.BrokenBarrierError as err: except threading.BrokenBarrierError as err:
self._execStats.registerFailure("Aborted due to worker thread timeout") self._execStats.registerFailure("Aborted due to worker thread timeout")
Logging.error("\n") Logging.error("\n")
...@@ -433,6 +441,7 @@ class ThreadCoordinator: ...@@ -433,6 +441,7 @@ class ThreadCoordinator:
# Then we move on to the next step # Then we move on to the next step
Progress.emit(Progress.BEGIN_THREAD_STEP) Progress.emit(Progress.BEGIN_THREAD_STEP)
self._stepStartTime = time.time()
self._releaseAllWorkerThreads(transitionFailed) self._releaseAllWorkerThreads(transitionFailed)
if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate" if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
...@@ -691,7 +700,7 @@ class AnyState: ...@@ -691,7 +700,7 @@ class AnyState:
def canDropDb(self): def canDropDb(self):
# If user requests to run up to a number of DBs, # If user requests to run up to a number of DBs,
# we'd then not do drop_db operations any more # we'd then not do drop_db operations any more
if gConfig.max_dbs > 0 : if gConfig.max_dbs > 0 or gConfig.use_shadow_db :
return False return False
return self._info[self.CAN_DROP_DB] return self._info[self.CAN_DROP_DB]
...@@ -699,6 +708,8 @@ class AnyState: ...@@ -699,6 +708,8 @@ class AnyState:
return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE] return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
def canDropFixedSuperTable(self): def canDropFixedSuperTable(self):
if gConfig.use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
return False
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE] return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
def canAddData(self): def canAddData(self):
...@@ -1037,7 +1048,7 @@ class Database: ...@@ -1037,7 +1048,7 @@ class Database:
_clsLock = threading.Lock() # class wide lock _clsLock = threading.Lock() # class wide lock
_lastInt = 101 # next one is initial integer _lastInt = 101 # next one is initial integer
_lastTick = 0 _lastTick = 0
_lastLaggingTick = 0 # lagging tick, for unsequenced insersions _lastLaggingTick = 0 # lagging tick, for out-of-sequence (oos) data insertions
def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
self._dbNum = dbNum # we assign a number to databases, for our testing purpose self._dbNum = dbNum # we assign a number to databases, for our testing purpose
...@@ -1093,21 +1104,24 @@ class Database: ...@@ -1093,21 +1104,24 @@ class Database:
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
t4 = datetime.datetime.fromtimestamp( t4 = datetime.datetime.fromtimestamp(
t3.timestamp() + elSec2) # see explanation above t3.timestamp() + elSec2) # see explanation above
Logging.debug("Setting up TICKS to start from: {}".format(t4)) Logging.info("Setting up TICKS to start from: {}".format(t4))
return t4 return t4
@classmethod @classmethod
def getNextTick(cls): def getNextTick(cls):
'''
Fetch a timestamp tick, with some random factor, may not be unique.
'''
with cls._clsLock: # prevent duplicate tick with cls._clsLock: # prevent duplicate tick
if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized
# 10k at 1/20 chance, should be enough to avoid overlaps # 10k at 1/20 chance, should be enough to avoid overlaps
tick = cls.setupLastTick() tick = cls.setupLastTick()
cls._lastTick = tick cls._lastTick = tick
cls._lastLaggingTick = tick + datetime.timedelta(0, -10000) cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast
# if : # should be quite a bit into the future # if : # should be quite a bit into the future
if Dice.throw(20) == 0: # 1 in 20 chance, return lagging tick if gConfig.mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
return cls._lastLaggingTick return cls._lastLaggingTick
else: # regular else: # regular
# add one second to it # add one second to it
...@@ -1334,7 +1348,8 @@ class Task(): ...@@ -1334,7 +1348,8 @@ class Task():
elif self._isErrAcceptable(errno2, err.__str__()): elif self._isErrAcceptable(errno2, err.__str__()):
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format( self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
errno2, err, wt.getDbConn().getLastSql())) errno2, err, wt.getDbConn().getLastSql()))
print("_", end="", flush=True) # print("_", end="", flush=True)
Progress.emit(Progress.ACCEPTABLE_ERROR)
self._err = err self._err = err
else: # not an acceptable error else: # not an acceptable error
errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format( errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
...@@ -1563,8 +1578,11 @@ class TaskCreateDb(StateTransitionTask): ...@@ -1563,8 +1578,11 @@ class TaskCreateDb(StateTransitionTask):
# numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N # numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
numReplica = gConfig.max_replicas # fixed, always numReplica = gConfig.max_replicas # fixed, always
repStr = "replica {}".format(numReplica) repStr = "replica {}".format(numReplica)
self.execWtSql(wt, "create database {} {}" updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active
.format(self._db.getName(), repStr) ) dbName = self._db.getName()
self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
if dbName == "db_0" and gConfig.use_shadow_db:
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
class TaskDropDb(StateTransitionTask): class TaskDropDb(StateTransitionTask):
@classmethod @classmethod
...@@ -1774,13 +1792,13 @@ class TdSuperTable: ...@@ -1774,13 +1792,13 @@ class TdSuperTable:
]) # TODO: add more from 'top' ]) # TODO: add more from 'top'
if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?! # if aggExpr not in ['stddev(speed)']: # STDDEV not valid for super tables?! (Done in TD-1049)
sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName()) sql = "select {} from {}.{}".format(aggExpr, self._dbName, self.getName())
if Dice.throw(3) == 0: # 1 in X chance if Dice.throw(3) == 0: # 1 in X chance
sql = sql + ' GROUP BY color' sql = sql + ' GROUP BY color'
Progress.emit(Progress.QUERY_GROUP_BY) Progress.emit(Progress.QUERY_GROUP_BY)
# Logging.info("Executing GROUP-BY query: " + sql) # Logging.info("Executing GROUP-BY query: " + sql)
ret.append(SqlQuery(sql)) ret.append(SqlQuery(sql))
return ret return ret
...@@ -1988,7 +2006,7 @@ class TaskAddData(StateTransitionTask): ...@@ -1988,7 +2006,7 @@ class TaskAddData(StateTransitionTask):
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
fullTableName = db.getName() + '.' + regTableName fullTableName = db.getName() + '.' + regTableName
sql = "insert into {} values ".format(fullTableName) sql = "INSERT INTO {} VALUES ".format(fullTableName)
for j in range(numRecords): # number of records per table for j in range(numRecords): # number of records per table
nextInt = db.getNextInt() nextInt = db.getNextInt()
nextTick = db.getNextTick() nextTick = db.getNextTick()
...@@ -2016,12 +2034,24 @@ class TaskAddData(StateTransitionTask): ...@@ -2016,12 +2034,24 @@ class TaskAddData(StateTransitionTask):
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written # print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
try: try:
sql = "insert into {} values ('{}', {}, '{}');".format( # removed: tags ('{}', {}) sql = "INSERT INTO {} VALUES ('{}', {}, '{}');".format( # removed: tags ('{}', {})
fullTableName, fullTableName,
# ds.getFixedSuperTableName(), # ds.getFixedSuperTableName(),
# ds.getNextBinary(), ds.getNextFloat(), # ds.getNextBinary(), ds.getNextFloat(),
nextTick, nextInt, nextColor) nextTick, nextInt, nextColor)
dbc.execute(sql) dbc.execute(sql)
# Quick hack, attach an update statement here. TODO: create an "update" task
if (not gConfig.use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
nextInt = db.getNextInt()
nextColor = db.getNextColor()
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
fullTableName,
nextTick, nextInt, nextColor)
# sql = "UPDATE {} set speed={}, color='{}' WHERE ts='{}'".format(
# fullTableName, db.getNextInt(), db.getNextColor(), nextTick)
dbc.execute(sql)
except: # Any exception at all except: # Any exception at all
if gConfig.verify_data: if gConfig.verify_data:
self.unlockTable(fullTableName) self.unlockTable(fullTableName)
...@@ -2070,7 +2100,8 @@ class TaskAddData(StateTransitionTask): ...@@ -2070,7 +2100,8 @@ class TaskAddData(StateTransitionTask):
random.shuffle(tblSeq) # now we have random sequence random.shuffle(tblSeq) # now we have random sequence
for i in tblSeq: for i in tblSeq:
if (i in self.activeTable): # wow already active if (i in self.activeTable): # wow already active
print("x", end="", flush=True) # concurrent insertion # print("x", end="", flush=True) # concurrent insertion
Progress.emit(Progress.CONCURRENT_INSERTION)
else: else:
self.activeTable.add(i) # marking it active self.activeTable.add(i) # marking it active
...@@ -2373,6 +2404,11 @@ class MainExec: ...@@ -2373,6 +2404,11 @@ class MainExec:
'--larger-data', '--larger-data',
action='store_true', action='store_true',
help='Write larger amount of data during write operations (default: false)') help='Write larger amount of data during write operations (default: false)')
parser.add_argument(
'-m',
'--mix-oos-data',
action='store_false',
help='Mix out-of-sequence data into the test data stream (default: true)')
parser.add_argument( parser.add_argument(
'-n', '-n',
'--dynamic-db-table-names', '--dynamic-db-table-names',
...@@ -2414,6 +2450,11 @@ class MainExec: ...@@ -2414,6 +2450,11 @@ class MainExec:
'--verify-data', '--verify-data',
action='store_true', action='store_true',
help='Verify data written in a number of places by reading back (default: false)') help='Verify data written in a number of places by reading back (default: false)')
parser.add_argument(
'-w',
'--use-shadow-db',
action='store_true',
help='Use a shaddow database to verify data integrity (default: false)')
parser.add_argument( parser.add_argument(
'-x', '-x',
'--continue-on-exception', '--continue-on-exception',
...@@ -2422,6 +2463,11 @@ class MainExec: ...@@ -2422,6 +2463,11 @@ class MainExec:
global gConfig global gConfig
gConfig = parser.parse_args() gConfig = parser.parse_args()
crash_gen.settings.gConfig = gConfig # TODO: fix this hack, consolidate this global var
# Sanity check for arguments
if gConfig.use_shadow_db and gConfig.max_dbs>1 :
raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1")
Logging.clsInit(gConfig) Logging.clsInit(gConfig)
......
...@@ -18,6 +18,8 @@ import datetime ...@@ -18,6 +18,8 @@ import datetime
import traceback import traceback
# from .service_manager import TdeInstance # from .service_manager import TdeInstance
import crash_gen.settings
class DbConn: class DbConn:
TYPE_NATIVE = "native-c" TYPE_NATIVE = "native-c"
TYPE_REST = "rest-api" TYPE_REST = "rest-api"
...@@ -244,7 +246,7 @@ class MyTDSql: ...@@ -244,7 +246,7 @@ class MyTDSql:
self._conn.close() # TODO: very important, cursor close does NOT close DB connection! self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
self._cursor.close() self._cursor.close()
def _execInternal(self, sql): def _execInternal(self, sql):
startTime = time.time() startTime = time.time()
# Logging.debug("Executing SQL: " + sql) # Logging.debug("Executing SQL: " + sql)
ret = self._cursor.execute(sql) ret = self._cursor.execute(sql)
...@@ -257,6 +259,27 @@ class MyTDSql: ...@@ -257,6 +259,27 @@ class MyTDSql:
cls.longestQuery = sql cls.longestQuery = sql
cls.longestQueryTime = queryTime cls.longestQueryTime = queryTime
cls.lqStartTime = startTime cls.lqStartTime = startTime
# Now write to the shadow database
if crash_gen.settings.gConfig.use_shadow_db:
if sql[:11] == "INSERT INTO":
if sql[:16] == "INSERT INTO db_0":
sql2 = "INSERT INTO db_s" + sql[16:]
self._cursor.execute(sql2)
else:
raise CrashGenError("Did not find db_0 in INSERT statement: {}".format(sql))
else: # not an insert statement
pass
if sql[:12] == "CREATE TABLE":
if sql[:17] == "CREATE TABLE db_0":
sql2 = sql.replace('db_0', 'db_s')
self._cursor.execute(sql2)
else:
raise CrashGenError("Did not find db_0 in CREATE TABLE statement: {}".format(sql))
else: # not an insert statement
pass
return ret return ret
def query(self, sql): def query(self, sql):
...@@ -302,12 +325,18 @@ class DbConnNative(DbConn): ...@@ -302,12 +325,18 @@ class DbConnNative(DbConn):
_lock = threading.Lock() _lock = threading.Lock()
# _connInfoDisplayed = False # TODO: find another way to display this # _connInfoDisplayed = False # TODO: find another way to display this
totalConnections = 0 # Not private totalConnections = 0 # Not private
totalRequests = 0
def __init__(self, dbTarget): def __init__(self, dbTarget):
super().__init__(dbTarget) super().__init__(dbTarget)
self._type = self.TYPE_NATIVE self._type = self.TYPE_NATIVE
self._conn = None self._conn = None
# self._cursor = None # self._cursor = None
@classmethod
def resetTotalRequests(cls):
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
cls.totalRequests = 0
def openByType(self): # Open connection def openByType(self): # Open connection
# global gContainer # global gContainer
...@@ -356,6 +385,8 @@ class DbConnNative(DbConn): ...@@ -356,6 +385,8 @@ class DbConnNative(DbConn):
Logging.debug("[SQL] Executing SQL: {}".format(sql)) Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql self._lastSql = sql
nRows = self._tdSql.execute(sql) nRows = self._tdSql.execute(sql)
cls = self.__class__
cls.totalRequests += 1
Logging.debug( Logging.debug(
"[SQL] Execution Result, nRows = {}, SQL = {}".format( "[SQL] Execution Result, nRows = {}, SQL = {}".format(
nRows, sql)) nRows, sql))
...@@ -369,6 +400,8 @@ class DbConnNative(DbConn): ...@@ -369,6 +400,8 @@ class DbConnNative(DbConn):
Logging.debug("[SQL] Executing SQL: {}".format(sql)) Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql self._lastSql = sql
nRows = self._tdSql.query(sql) nRows = self._tdSql.query(sql)
cls = self.__class__
cls.totalRequests += 1
Logging.debug( Logging.debug(
"[SQL] Query Result, nRows = {}, SQL = {}".format( "[SQL] Query Result, nRows = {}, SQL = {}".format(
nRows, sql)) nRows, sql))
......
...@@ -176,11 +176,13 @@ class Progress: ...@@ -176,11 +176,13 @@ class Progress:
SERVICE_START_NAP = 7 SERVICE_START_NAP = 7
CREATE_TABLE_ATTEMPT = 8 CREATE_TABLE_ATTEMPT = 8
QUERY_GROUP_BY = 9 QUERY_GROUP_BY = 9
CONCURRENT_INSERTION = 10
ACCEPTABLE_ERROR = 11
tokens = { tokens = {
STEP_BOUNDARY: '.', STEP_BOUNDARY: '.',
BEGIN_THREAD_STEP: '[', BEGIN_THREAD_STEP: ' [',
END_THREAD_STEP: '] ', END_THREAD_STEP: ']',
SERVICE_HEART_BEAT: '.Y.', SERVICE_HEART_BEAT: '.Y.',
SERVICE_RECONNECT_START: '<r.', SERVICE_RECONNECT_START: '<r.',
SERVICE_RECONNECT_SUCCESS: '.r>', SERVICE_RECONNECT_SUCCESS: '.r>',
...@@ -188,8 +190,14 @@ class Progress: ...@@ -188,8 +190,14 @@ class Progress:
SERVICE_START_NAP: '_zz', SERVICE_START_NAP: '_zz',
CREATE_TABLE_ATTEMPT: 'c', CREATE_TABLE_ATTEMPT: 'c',
QUERY_GROUP_BY: 'g', QUERY_GROUP_BY: 'g',
CONCURRENT_INSERTION: 'x',
ACCEPTABLE_ERROR: '_',
} }
@classmethod @classmethod
def emit(cls, token): def emit(cls, token):
print(cls.tokens[token], end="", flush=True) print(cls.tokens[token], end="", flush=True)
@classmethod
def emitStr(cls, str):
print('({})'.format(str), end="", flush=True)
from __future__ import annotations
import argparse
gConfig: argparse.Namespace
def init():
global gConfig
gConfig = []
\ No newline at end of file
...@@ -610,6 +610,11 @@ print =================>TD-2665 ...@@ -610,6 +610,11 @@ print =================>TD-2665
sql_error create table txx as select avg(c) as t from st; sql_error create table txx as select avg(c) as t from st;
sql_error create table txx1 as select avg(c) as t from t1; sql_error create table txx1 as select avg(c) as t from t1;
sql select stddev(c),stddev(c) from st group by c;
if $rows != 4 then
return -1
endi
print =================>TD-2236 print =================>TD-2236
sql select first(ts),last(ts) from t1 group by c; sql select first(ts),last(ts) from t1 group by c;
if $rows != 4 then if $rows != 4 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册