提交 4ec789fb 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/TD-18437

...@@ -81,7 +81,7 @@ ENDIF () ...@@ -81,7 +81,7 @@ ENDIF ()
IF (TD_WINDOWS) IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}") MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(COMMON_FLAGS "/w /D_WIN32 /DWIN32 /Zi") SET(COMMON_FLAGS "/w /D_WIN32 /DWIN32 /Zi /MTd")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO") SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900)) # IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18") # SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
...@@ -92,6 +92,12 @@ IF (TD_WINDOWS) ...@@ -92,6 +92,12 @@ IF (TD_WINDOWS)
IF (CMAKE_DEPFILE_FLAGS_CXX) IF (CMAKE_DEPFILE_FLAGS_CXX)
SET(CMAKE_DEPFILE_FLAGS_CXX "") SET(CMAKE_DEPFILE_FLAGS_CXX "")
ENDIF () ENDIF ()
IF (CMAKE_C_FLAGS_DEBUG)
SET(CMAKE_C_FLAGS_DEBUG "" CACHE STRING "" FORCE)
ENDIF ()
IF (CMAKE_CXX_FLAGS_DEBUG)
SET(CMAKE_CXX_FLAGS_DEBUG "" CACHE STRING "" FORCE)
ENDIF ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMMON_FLAGS}") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMMON_FLAGS}")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMMON_FLAGS}") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMMON_FLAGS}")
......
...@@ -273,7 +273,7 @@ endif(${BUILD_WITH_NURAFT}) ...@@ -273,7 +273,7 @@ endif(${BUILD_WITH_NURAFT})
# pthread # pthread
if(${BUILD_PTHREAD}) if(${BUILD_PTHREAD})
set(CMAKE_BUILD_TYPE release) set(CMAKE_BUILD_TYPE debug)
add_definitions(-DPTW32_STATIC_LIB) add_definitions(-DPTW32_STATIC_LIB)
add_subdirectory(pthread EXCLUDE_FROM_ALL) add_subdirectory(pthread EXCLUDE_FROM_ALL)
set_target_properties(libpthreadVC3 PROPERTIES OUTPUT_NAME pthread) set_target_properties(libpthreadVC3 PROPERTIES OUTPUT_NAME pthread)
......
import taos import taos
from taos.tmq import * from taos.tmq import TaosConsumer
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
conn = taos.connect() for msg in consumer:
for row in msg:
# create database
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")
# create table and stables
conn.select_db("py_tmq")
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
# create topic
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
# set consumer configure options
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
conf.set("msg.with.table.name", "true")
def tmq_commit_cb_print(tmq, resp, offset, param=None):
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
# build consumer
tmq = conf.new_consumer()
# build topic list
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column")
# subscribe consumer
tmq.subscribe(topic_list)
# check subscriptions
sub_list = tmq.subscription()
print("subscribed topics: ",sub_list)
# start subscribe
while 1:
res = tmq.poll(1000)
if res:
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res:
print(row) print(row)
tb = res.get_table_name()
print(f"from table: {tb}")
...@@ -5,44 +5,23 @@ title: 使用安装包立即开始 ...@@ -5,44 +5,23 @@ title: 使用安装包立即开始
import Tabs from "@theme/Tabs"; import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem"; import TabItem from "@theme/TabItem";
import PkgListV3 from "/components/PkgListV3";
在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用Docker立即体验](../../get-started/docker/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件,目前服务端 taosd 和 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。应用驱动 taosc 与 TDengine CLI 可以在 Windows 或 Linux 上安装和运行。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../reference/rest-api/)
## 安装 在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用 Docker 立即体验](../../get-started/docker/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
<Tabs>
<TabItem value="apt-get" label="apt-get">
可以使用 apt-get 工具从官方仓库安装。
**安装包仓库**
```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list
```
如果安装 Beta 版需要安装包仓库 为方便使用,标准的服务端安装包包含了 taos、taosd、taosAdapter、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。
```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list
```
**使用 apt-get 命令安装** 在安装包格式上,我们提供 tar.gz, rpm 和 deb 格式,为企业客户提供 tar.gz 格式安装包,以方便在特定操作系统上使用。需要注意的是,rpm 和 deb 包不含 taosdump 和 TDinsight 安装脚本,这些工具需要通过安装 taosTool 包获得。
```bash ## 安装
sudo apt-get update
apt-cache policy tdengine
sudo apt-get install tdengine
```
:::tip <Tabs>
apt-get 方式只适用于 Debian 或 Ubuntu 系统
::::
</TabItem>
<TabItem label="Deb 安装" value="debinst"> <TabItem label="Deb 安装" value="debinst">
1.[发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb; 1. 从列表中下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb;
<PkgListV3 type={6}/>
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令: 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.deb 安装包所在目录,执行如下的安装命令:
```bash ```bash
...@@ -53,7 +32,8 @@ sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb ...@@ -53,7 +32,8 @@ sudo dpkg -i TDengine-server-3.0.0.0-Linux-x64.deb
<TabItem label="RPM 安装" value="rpminst"> <TabItem label="RPM 安装" value="rpminst">
1.[发布历史页面](../../releases) 下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm; 1. 从列表中下载获得 rpm 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.rpm;
<PkgListV3 type={5}/>
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令: 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.rpm 安装包所在目录,执行如下的安装命令:
```bash ```bash
...@@ -64,7 +44,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm ...@@ -64,7 +44,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
<TabItem label="tar.gz 安装" value="tarinst"> <TabItem label="tar.gz 安装" value="tarinst">
1.[发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz; 1. 从列表中下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
<PkgListV3 type={0}/>
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本: 2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本:
```bash ```bash
...@@ -79,19 +60,52 @@ sudo ./install.sh ...@@ -79,19 +60,52 @@ sudo ./install.sh
:::info :::info
install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。 install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。
::: :::
</TabItem> </TabItem>
<TabItem label="Windows 安装" value="windows"> <TabItem label="Windows 安装" value="windows">
1.[发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe; 1. 从列表中下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe;
<PkgListV3 type={3}/>
2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。 2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。
</TabItem>
<TabItem value="apt-get" label="apt-get">
可以使用 apt-get 工具从官方仓库安装。
**安装包仓库**
```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list
```
如果安装 Beta 版需要安装包仓库
```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list
```
**使用 apt-get 命令安装**
```bash
sudo apt-get update
apt-cache policy tdengine
sudo apt-get install tdengine
```
:::tip
apt-get 方式只适用于 Debian 或 Ubuntu 系统
::::
</TabItem> </TabItem>
</Tabs> </Tabs>
:::info
下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases)
:::
:::note :::note
当安装第一个节点时,出现 Enter FQDN:提示的时候,不需要输入任何内容。只有当安装第二个或以后更多的节点时,才需要输入已有集群中任何一个可用节点的 FQDN,支持该新节点加入集群。当然也可以不输入,而是在新节点启动前,配置到新节点的配置文件中。 当安装第一个节点时,出现 Enter FQDN:提示的时候,不需要输入任何内容。只有当安装第二个或以后更多的节点时,才需要输入已有集群中任何一个可用节点的 FQDN,支持该新节点加入集群。当然也可以不输入,而是在新节点启动前,配置到新节点的配置文件中。
......
...@@ -88,6 +88,96 @@ void close() throws SQLException; ...@@ -88,6 +88,96 @@ void close() throws SQLException;
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python">
```python
class TaosConsumer():
def __init__(self, *topics, **configs)
def __iter__(self)
def __next__(self)
def sync_next(self)
def subscription(self)
def unsubscribe(self)
def close(self)
def __del__(self)
```
</TabItem>
<TabItem label="Go" value="Go">
```go
func NewConsumer(conf *Config) (*Consumer, error)
func (c *Consumer) Close() error
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
func (c *Consumer) FreeMessage(message unsafe.Pointer)
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
func (c *Consumer) Subscribe(topics []string) error
func (c *Consumer) Unsubscribe() error
```
</TabItem>
<TabItem value="C#" label="C#">
```csharp
ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
virtual IConsumer Build()
Consumer(ConsumerBuilder builder)
void Subscribe(IEnumerable<string> topics)
void Subscribe(string topic)
ConsumeResult Consume(int millisecondsTimeout)
List<string> Subscription()
void Unsubscribe()
void Commit(ConsumeResult consumerResult)
void Close()
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
```node
function TMQConsumer(config)
function subscribe(topic)
function consume(timeout)
function subscription()
function unsubscribe()
function commit(msg)
function close()
```
</TabItem>
</Tabs> </Tabs>
## 写入数据 ## 写入数据
...@@ -230,6 +320,128 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> { ...@@ -230,6 +320,128 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python">
Python 使用以下配置项创建一个 Consumer 实例。
| 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
| `client_id` | string | 客户端 ID | 最大长度:192。 |
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
| `timeout` | int | 消费者拉去的超时时间 | |
</TabItem>
<TabItem label="Go" value="Go">
```go
config := tmq.NewConfig()
defer config.Destroy()
err = config.SetGroupID("test")
if err != nil {
panic(err)
}
err = config.SetAutoOffsetReset("earliest")
if err != nil {
panic(err)
}
err = config.SetConnectIP("127.0.0.1")
if err != nil {
panic(err)
}
err = config.SetConnectUser("root")
if err != nil {
panic(err)
}
err = config.SetConnectPass("taosdata")
if err != nil {
panic(err)
}
err = config.SetConnectPort("6030")
if err != nil {
panic(err)
}
err = config.SetMsgWithTableName(true)
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
if result.ErrCode != 0 {
errStr := wrapper.TMQErr2Str(result.ErrCode)
err := errors.NewError(int(result.ErrCode), errStr)
panic(err)
}
})
if err != nil {
panic(err)
}
```
</TabItem>
<TabItem value="C#" label="C#">
```csharp
using TDengineTMQ;
// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、
// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数
var cfg = new ConsumerConfig
{
EnableAutoCommit = "true"
AutoCommitIntervalMs = "1000"
GourpId = "TDengine-TMQ-C#",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
AutoOffsetReset = "earliest"
MsgWithTableName = "true",
TDConnectIp = "127.0.0.1",
TDConnectPort = "6030"
};
var consumer = new ConsumerBuilder(cfg).Build();
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
``` node
// 根据需要,设置消费组 (group.id)、自动提交 (enable.auto.commit)、
// 自动提交时间间隔 (auto.commit.interval.ms)、用户名 (td.connect.user)、密码 (td.connect.pass) 等参数
let consumer = taos.consumer({
'enable.auto.commit': 'true',
'auto.commit.interval.ms','1000',
'group.id': 'tg2',
'td.connect.user': 'root',
'td.connect.pass': 'taosdata',
'auto.offset.reset','earliest',
'msg.with.table.name': 'true',
'td.connect.ip','127.0.0.1',
'td.connect.port','6030'
});
```
</TabItem>
</Tabs> </Tabs>
上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。 上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
...@@ -260,6 +472,50 @@ topics.add("tmq_topic"); ...@@ -260,6 +472,50 @@ topics.add("tmq_topic");
consumer.subscribe(topics); consumer.subscribe(topics);
``` ```
</TabItem>
<TabItem value="Go" label="Go">
```go
consumer, err := tmq.NewConsumer(config)
if err != nil {
panic(err)
}
err = consumer.Subscribe([]string{"example_tmq_topic"})
if err != nil {
panic(err)
}
```
</TabItem>
<TabItem value="C#" label="C#">
```csharp
// 创建订阅 topics 列表
List<String> topics = new List<string>();
topics.add("tmq_topic");
// 启动订阅
consumer.Subscribe(topics);
```
</TabItem>
<TabItem value="Python" label="Python">
```python
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
```node
// 创建订阅 topics 列表
let topics = ['topic_test']
// 启动订阅
consumer.subscribe(topics);
```
</TabItem> </TabItem>
</Tabs> </Tabs>
...@@ -294,6 +550,60 @@ while(running){ ...@@ -294,6 +550,60 @@ while(running){
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python">
```python
for msg in consumer:
for row in msg:
print(row)
```
</TabItem>
<TabItem value="Go" label="Go">
```go
for {
result, err := consumer.Poll(time.Second)
if err != nil {
panic(err)
}
fmt.Println(result)
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
}
```
</TabItem>
<TabItem value="C#" label="C#">
```csharp
// 消费数据
while (true)
{
var consumerRes = consumer.Consume(100);
// process ConsumeResult
ProcessMsg(consumerRes);
consumer.Commit(consumerRes);
}
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
```node
while(true){
msg = consumer.consume(200);
// process message(consumeResult)
console.log(msg.topicPartition);
console.log(msg.block);
console.log(msg.fields)
}
```
</TabItem>
</Tabs> </Tabs>
## 结束消费 ## 结束消费
...@@ -323,6 +633,45 @@ consumer.close(); ...@@ -323,6 +633,45 @@ consumer.close();
``` ```
</TabItem> </TabItem>
<TabItem value="Python" label="Python">
```python
/* 取消订阅 */
consumer.unsubscribe();
/* 关闭消费 */
consumer.close();
</TabItem>
<TabItem value="Go" label="Go">
```go
consumer.Close()
```
</TabItem>
<TabItem value="C#" label="C#">
```csharp
// 取消订阅
consumer.Unsubscribe();
// 关闭消费
consumer.Close();
```
</TabItem>
<TabItem label="Node.JS" value="Node.JS">
```node
consumer.unsubscribe();
consumer.close();
```
</TabItem>
</Tabs> </Tabs>
## 删除 *topic* ## 删除 *topic*
...@@ -671,64 +1020,14 @@ int main(int argc, char* argv[]) { ...@@ -671,64 +1020,14 @@ int main(int argc, char* argv[]) {
```python ```python
import taos import taos
from taos.tmq import * from taos.tmq import TaosConsumer
conn = taos.connect() import taos
from taos.tmq import *
# create database consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
conn.execute("drop database if exists py_tmq") for msg in consumer:
conn.execute("create database if not exists py_tmq vgroups 2") for row in msg:
# create table and stables
conn.select_db("py_tmq")
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
# create topic
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
# set consumer configure options
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
conf.set("msg.with.table.name", "true")
def tmq_commit_cb_print(tmq, resp, offset, param=None):
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
# build consumer
tmq = conf.new_consumer()
# build topic list
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column")
# subscribe consumer
tmq.subscribe(topic_list)
# check subscriptions
sub_list = tmq.subscription()
print("subscribed topics: ",sub_list)
# start subscribe
while 1:
res = tmq.poll(1000)
if res:
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res:
print(row) print(row)
tb = res.get_table_name()
print(f"from table: {tb}")
``` ```
......
...@@ -6,7 +6,7 @@ description: "TDengine 3.0 版本的语法变更说明" ...@@ -6,7 +6,7 @@ description: "TDengine 3.0 版本的语法变更说明"
## SQL 基本元素变更 ## SQL 基本元素变更
| # | **元素** | <div style={{width: 100}}>**差异性**</div> | **说明** | | # | **元素** | **<div style={{width: 60}}>差异性</div>** | **说明** |
| - | :------- | :-------- | :------- | | - | :------- | :-------- | :------- |
| 1 | VARCHAR | 新增 | BINARY类型的别名。 | 1 | VARCHAR | 新增 | BINARY类型的别名。
| 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。 | 2 | TIMESTAMP字面量 | 新增 | 新增支持 TIMESTAMP 'timestamp format' 语法。
...@@ -22,7 +22,7 @@ description: "TDengine 3.0 版本的语法变更说明" ...@@ -22,7 +22,7 @@ description: "TDengine 3.0 版本的语法变更说明"
在 TDengine 中,普通表的数据模型中可使用以下数据类型。 在 TDengine 中,普通表的数据模型中可使用以下数据类型。
| # | **语句** | **<div style={{width: 100}}>差异性</div>** | **说明** | | # | **语句** | **<div style={{width: 60}}>差异性</div>** | **说明** |
| - | :------- | :-------- | :------- | | - | :------- | :-------- | :------- |
| 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。 | 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。 | 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。
...@@ -80,7 +80,7 @@ description: "TDengine 3.0 版本的语法变更说明" ...@@ -80,7 +80,7 @@ description: "TDengine 3.0 版本的语法变更说明"
## SQL 函数变更 ## SQL 函数变更
| # | **函数** | ** <div style={{width: 100}}>差异性</div> ** | **说明** | | # | **函数** | ** <div style={{width: 60}}>差异性</div> ** | **说明** |
| - | :------- | :-------- | :------- | | - | :------- | :-------- | :------- |
| 1 | TWA | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 | 1 | TWA | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
| 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。 | 2 | IRATE | 增强 | 可以直接用于超级表了。没有PARTITION BY时,超级表的数据会被合并成一条时间线。
......
import PkgList from "/components/PkgList"; import PkgListV3 from "/components/PkgListV3";
1. 下载客户端安装包 1. 下载客户端安装包
<PkgList type={1} sys="Linux" /> <PkgListV3 type={1} sys="Linux" />
[所有下载](https://www.taosdata.com/cn/all-downloads/) [所有下载](../../releases)
2. 解压缩软件包 2. 解压缩软件包
......
import PkgList from "/components/PkgList"; import PkgListV3 from "/components/PkgListV3";
1. 下载客户端安装包 1. 下载客户端安装包
<PkgList type={1} sys="Windows" /> <PkgListV3 type={4} sys="Windows" />
[所有下载](https://www.taosdata.com/cn/all-downloads/)
[所有下载](../../releases)
2. 执行安装程序,按提示选择默认值,完成安装 2. 执行安装程序,按提示选择默认值,完成安装
3. 安装路径 3. 安装路径
......
...@@ -93,12 +93,12 @@ Maven 项目中,在 pom.xml 中添加以下依赖: ...@@ -93,12 +93,12 @@ Maven 项目中,在 pom.xml 中添加以下依赖:
可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector 可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector
```shell ```shell
git clone https://github.com/taosdata/taos-connector-jdbc.git --branch 2.0 git clone https://github.com/taosdata/taos-connector-jdbc.git
cd taos-connector-jdbc cd taos-connector-jdbc
mvn clean install -Dmaven.test.skip=true mvn clean install -Dmaven.test.skip=true
``` ```
编译后,在 target 目录下会产生 taos-jdbcdriver-2.0.XX-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。 编译后,在 target 目录下会产生 taos-jdbcdriver-3.0.*-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。
</TabItem> </TabItem>
</Tabs> </Tabs>
...@@ -198,7 +198,7 @@ url 中的配置参数如下: ...@@ -198,7 +198,7 @@ url 中的配置参数如下:
- user:登录 TDengine 用户名,默认值 'root'。 - user:登录 TDengine 用户名,默认值 'root'。
- password:用户登录密码,默认值 'taosdata'。 - password:用户登录密码,默认值 'taosdata'。
- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。从 taos-jdbcdriver-2.0.38 开始,JDBC REST 连接增加批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。 - batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。 - charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
- batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。 - batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。
- httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 5000。 - httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 5000。
...@@ -216,7 +216,7 @@ url 中的配置参数如下: ...@@ -216,7 +216,7 @@ url 中的配置参数如下:
INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6); INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6);
``` ```
- 从 taos-jdbcdriver-2.0.36 开始,如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6); - 如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
::: :::
...@@ -230,7 +230,7 @@ INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFra ...@@ -230,7 +230,7 @@ INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFra
**注意**: **注意**:
- 应用中设置的 client parameter 为进程级别的,即如果要更新 client 的参数,需要重启应用。这是因为 client parameter 是全局参数,仅在应用程序的第一次设置生效。 - 应用中设置的 client parameter 为进程级别的,即如果要更新 client 的参数,需要重启应用。这是因为 client parameter 是全局参数,仅在应用程序的第一次设置生效。
- 以下示例代码基于 taos-jdbcdriver-2.0.36 - 以下示例代码基于 taos-jdbcdriver-3.0.0
```java ```java
public Connection getConn() throws Exception{ public Connection getConn() throws Exception{
...@@ -367,7 +367,7 @@ TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据 ...@@ -367,7 +367,7 @@ TDengine 的 JDBC 原生连接实现大幅改进了参数绑定方式对数据
**注意**: **注意**:
- JDBC REST 连接目前不支持参数绑定 - JDBC REST 连接目前不支持参数绑定
- 以下示例代码基于 taos-jdbcdriver-2.0.36 - 以下示例代码基于 taos-jdbcdriver-3.0.0
- binary 类型数据需要调用 setString 方法,nchar 类型数据需要调用 setNString 方法 - binary 类型数据需要调用 setString 方法,nchar 类型数据需要调用 setNString 方法
- setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽 - setString 和 setNString 都要求用户在 size 参数里声明表定义中对应列的列宽
...@@ -635,7 +635,7 @@ TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协 ...@@ -635,7 +635,7 @@ TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协
**注意**: **注意**:
- JDBC REST 连接目前不支持无模式写入 - JDBC REST 连接目前不支持无模式写入
- 以下示例代码基于 taos-jdbcdriver-2.0.36 - 以下示例代码基于 taos-jdbcdriver-3.0.0
```java ```java
public class SchemalessInsertTest { public class SchemalessInsertTest {
...@@ -666,7 +666,7 @@ public class SchemalessInsertTest { ...@@ -666,7 +666,7 @@ public class SchemalessInsertTest {
} }
``` ```
### 订阅 ### 数据订阅
TDengine Java 连接器支持订阅功能,应用 API 如下: TDengine Java 连接器支持订阅功能,应用 API 如下:
...@@ -717,9 +717,14 @@ while(true) { ...@@ -717,9 +717,14 @@ while(true) {
#### 关闭订阅 #### 关闭订阅
```java ```java
// 取消订阅
consumer.unsubscribe();
// 关闭消费
consumer.close() consumer.close()
``` ```
详情请参考:[数据订阅](../../../develop/tmq)
### 使用示例如下: ### 使用示例如下:
```java ```java
...@@ -734,7 +739,7 @@ public abstract class ConsumerLoop { ...@@ -734,7 +739,7 @@ public abstract class ConsumerLoop {
config.setProperty("msg.with.table.name", "true"); config.setProperty("msg.with.table.name", "true");
config.setProperty("enable.auto.commit", "true"); config.setProperty("enable.auto.commit", "true");
config.setProperty("group.id", "group1"); config.setProperty("group.id", "group1");
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer"); config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
this.consumer = new TaosConsumer<>(config); this.consumer = new TaosConsumer<>(config);
this.topics = Collections.singletonList("topic_speed"); this.topics = Collections.singletonList("topic_speed");
...@@ -754,6 +759,7 @@ public abstract class ConsumerLoop { ...@@ -754,6 +759,7 @@ public abstract class ConsumerLoop {
process(record); process(record);
} }
} }
consumer.unsubscribe();
} finally { } finally {
consumer.close(); consumer.close();
shutdownLatch.countDown(); shutdownLatch.countDown();
...@@ -875,6 +881,7 @@ public static void main(String[] args) throws Exception { ...@@ -875,6 +881,7 @@ public static void main(String[] args) throws Exception {
| taos-jdbcdriver 版本 | 主要变化 | | taos-jdbcdriver 版本 | 主要变化 |
| :------------------: | :----------------------------: | | :------------------: | :----------------------------: |
| 3.0.0 | 支持 TDengine 3.0 |
| 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 | | 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 |
| 2.0.38 | JDBC REST 连接增加批量拉取功能 | | 2.0.38 | JDBC REST 连接增加批量拉取功能 |
| 2.0.37 | 增加对 json tag 支持 | | 2.0.37 | 增加对 json tag 支持 |
......
...@@ -30,6 +30,7 @@ extern bool gRaftDetailLog; ...@@ -30,6 +30,7 @@ extern bool gRaftDetailLog;
#define SYNC_SPEED_UP_HB_TIMER 400 #define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) #define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SLOW_DOWN_RANGE 100 #define SYNC_SLOW_DOWN_RANGE 100
#define SYNC_MAX_READ_RANGE 10
#define SYNC_MAX_BATCH_SIZE 1 #define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
...@@ -210,9 +211,12 @@ void syncStop(int64_t rid); ...@@ -210,9 +211,12 @@ void syncStop(int64_t rid);
int32_t syncSetStandby(int64_t rid); int32_t syncSetStandby(int64_t rid);
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
bool syncIsReady(int64_t rid); bool syncIsReady(int64_t rid);
bool syncIsReadyForRead(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid); const char* syncGetMyRoleStr(int64_t rid);
bool syncRestoreFinish(int64_t rid); bool syncRestoreFinish(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid);
SyncIndex syncGetLastIndex(int64_t rid);
SyncIndex syncGetCommitIndex(int64_t rid);
SyncGroupId syncGetVgId(int64_t rid); SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
......
...@@ -5,137 +5,54 @@ ...@@ -5,137 +5,54 @@
# # # #
######################################################## ########################################################
# first fully qualified domain name (FQDN) for TDengine system # The end point of the first dnode in the cluster to be connected to when `taosd` or `taos` is started
# firstEp hostname:6030 # firstEp hostname:6030
# local fully qualified domain name (FQDN) # The end point of the second dnode to be connected to if the firstEp is not available when `taosd` or `taos` is started
# secondEp
# The FQDN of the host where `taosd` will be started. It can be IP address
# fqdn hostname # fqdn hostname
# first port number for the connection (12 continuous UDP/TCP port number are used) # The port for external access after `taosd` is started
# serverPort 6030 # serverPort 6030
# log file's directory # The maximum number of connections a dnode can accept
# maxShellConns 5000
# The directory for writing log files
# logDir /var/log/taos # logDir /var/log/taos
# data file's directory # All data files are stored in this directory
# dataDir /var/lib/taos # dataDir /var/lib/taos
# temporary file's directory # temporary file's directory
# tempDir /tmp/ # tempDir /tmp/
# the arbitrator's fully qualified domain name (FQDN) for TDengine system, for cluster only # Switch for allowing TDengine to collect and report service usage information
# arbitrator arbitrator_hostname:6042
# number of threads per CPU core
# numOfThreadsPerCore 1.0
# number of threads to commit cache data
# numOfCommitThreads 4
# the proportion of total CPU cores available for query processing
# 2.0: the query threads will be set to double of the CPU cores.
# 1.0: all CPU cores are available for query processing [default].
# 0.5: only half of the CPU cores are available for query.
# 0.0: only one core available.
# ratioOfQueryCores 1.0
# the last_row/first/last aggregator will not change the original column name in the result fields
keepColumnName 1
# number of management nodes in the system
# numOfMnodes 1
# enable/disable backuping vnode directory when removing vnode
# vnodeBak 1
# enable/disable installation / usage report
# telemetryReporting 1 # telemetryReporting 1
# enable/disable load balancing # The maximum number of vnodes supported by dnode
# balance 1 # supportVnodes 0
# role for dnode. 0 - any, 1 - mnode, 2 - dnode
# role 0
# max timer control blocks
# maxTmrCtrl 512
# time interval of system monitor, seconds # The interval of dnode reporting status to mnode
# monitorInterval 30
# number of seconds allowed for a dnode to be offline, for cluster only
# offlineThreshold 864000
# RPC re-try timer, millisecond
# rpcTimer 300
# RPC maximum time for ack, seconds.
# rpcMaxTime 600
# time interval of dnode status reporting to mnode, seconds, for cluster only
# statusInterval 1 # statusInterval 1
# time interval of heart beat from shell to dnode, seconds # The interval for taos shell to send heartbeat to mnode
# shellActivityTimer 3 # shellActivityTimer 3
# minimum sliding window time, milli-second # The minimum sliding window time, milli-second
# minSlidingTime 10 # minSlidingTime 10
# minimum time window, milli-second # The minimum time window, milli-second
# minIntervalTime 10 # minIntervalTime 10
# maximum delay before launching a stream computation, milli-second # The maximum allowed query buffer size in MB during query processing for each data node
# maxStreamCompDelay 20000 # -1 no limit (default)
# 0 no query allowed, queries are disabled
# maximum delay before launching a stream computation for the first time, milli-second # queryBufferSize -1
# maxFirstStreamCompDelay 10000
# retry delay when a stream computation fails, milli-second
# retryStreamCompDelay 10
# the delayed time for launching a stream computation, from 0.1(default, 10% of whole computing time window) to 0.9
# streamCompDelayRatio 0.1
# max number of vgroups per db, 0 means configured automatically
# maxVgroupsPerDb 0
# max number of tables per vnode
# maxTablesPerVnode 1000000
# cache block size (Mbyte)
# cache 16
# number of cache blocks per vnode
# blocks 6
# number of days per DB file
# days 10
# number of days to keep DB file
# keep 3650
# minimum rows of records in file block
# minRows 100
# maximum rows of records in file block
# maxRows 4096
# the number of acknowledgments required for successful data writing
# quorum 1
# enable/disable compression
# comp 2
# write ahead log (WAL) level, 0: no wal; 1: write wal, but no fysnc; 2: write wal, and call fsync
# walLevel 1
# if walLevel is set to 2, the cycle of fsync being executed, if set to 0, fsync is called right away
# fsync 3000
# number of replications, for cluster only
# replica 1
# the compressed rpc message, option: # The compressed rpc message, option:
# -1 (no compression) # -1 (no compression)
# 0 (all message compressed), # 0 (all message compressed),
# > 0 (rpc message body which larger than this value will be compressed) # > 0 (rpc message body which larger than this value will be compressed)
...@@ -147,15 +64,6 @@ keepColumnName 1 ...@@ -147,15 +64,6 @@ keepColumnName 1
# > 0 (any retrieved column size greater than this value all data will be compressed.) # > 0 (any retrieved column size greater than this value all data will be compressed.)
# compressColData -1 # compressColData -1
# max length of an SQL
# maxSQLLength 65480
# max length of WildCards
# maxWildCardsLength 100
# the maximum number of records allowed for super table time sorting
# maxNumOfOrderedRes 100000
# system time zone # system time zone
# timezone Asia/Shanghai (CST, +0800) # timezone Asia/Shanghai (CST, +0800)
# system time zone (for windows 10) # system time zone (for windows 10)
...@@ -167,12 +75,6 @@ keepColumnName 1 ...@@ -167,12 +75,6 @@ keepColumnName 1
# default system charset # default system charset
# charset UTF-8 # charset UTF-8
# max number of connections allowed in dnode
# maxShellConns 5000
# max number of connections allowed in client
# maxConnections 5000
# stop writing logs when the disk size of the log folder is less than this value # stop writing logs when the disk size of the log folder is less than this value
# minimalLogDirGB 1.0 # minimalLogDirGB 1.0
...@@ -182,30 +84,9 @@ keepColumnName 1 ...@@ -182,30 +84,9 @@ keepColumnName 1
# if disk free space is less than this value, taosd service exit directly within startup process # if disk free space is less than this value, taosd service exit directly within startup process
# minimalDataDirGB 2.0 # minimalDataDirGB 2.0
# One mnode is equal to the number of vnode consumed
# mnodeEqualVnodeNum 4
# enbale/disable http service
# http 1
# enable/disable system monitor # enable/disable system monitor
# monitor 1 # monitor 1
# enable/disable recording the SQL statements via restful interface
# httpEnableRecordSql 0
# number of threads used to process http requests
# httpMaxThreads 2
# maximum number of rows returned by the restful interface
# restfulRowLimit 10240
# database name must be specified in restful interface if the following parameter is set, off by default
# httpDbNameMandatory 1
# http keep alive, default is 30 seconds
# httpKeepAlive 30000
# The following parameter is used to limit the maximum number of lines in log files. # The following parameter is used to limit the maximum number of lines in log files.
# max number of lines per log filters # max number of lines per log filters
# numOfLogLines 10000000 # numOfLogLines 10000000
...@@ -216,7 +97,6 @@ keepColumnName 1 ...@@ -216,7 +97,6 @@ keepColumnName 1
# time of keeping log files, days # time of keeping log files, days
# logKeepDays 0 # logKeepDays 0
# The following parameters are used for debug purpose only. # The following parameters are used for debug purpose only.
# debugFlag 8 bits mask: FILE-SCREEN-UNUSED-HeartBeat-DUMP-TRACE_WARN-ERROR # debugFlag 8 bits mask: FILE-SCREEN-UNUSED-HeartBeat-DUMP-TRACE_WARN-ERROR
# 131: output warning and error # 131: output warning and error
...@@ -228,85 +108,62 @@ keepColumnName 1 ...@@ -228,85 +108,62 @@ keepColumnName 1
# debug flag for all log type, take effect when non-zero value # debug flag for all log type, take effect when non-zero value
# debugFlag 0 # debugFlag 0
# debug flag for meta management messages # debug flag for timer
# mDebugFlag 135
# debug flag for dnode messages
# dDebugFlag 135
# debug flag for sync module
# sDebugFlag 135
# debug flag for WAL
# wDebugFlag 135
# debug flag for SDB
# sdbDebugFlag 135
# debug flag for RPC
# rpcDebugFlag 131
# debug flag for TAOS TIMER
# tmrDebugFlag 131 # tmrDebugFlag 131
# debug flag for TDengine client # debug flag for util
# cDebugFlag 131
# debug flag for JNI
# jniDebugFlag 131
# debug flag for storage
# uDebugFlag 131 # uDebugFlag 131
# debug flag for http server # debug flag for rpc
# httpDebugFlag 131 # rpcDebugFlag 131
# debug flag for monitor # debug flag for jni
# monDebugFlag 131 # jniDebugFlag 131
# debug flag for query # debug flag for query
# qDebugFlag 131 # qDebugFlag 131
# debug flag for taosc driver
# cDebugFlag 131
# debug flag for dnode messages
# dDebugFlag 135
# debug flag for vnode # debug flag for vnode
# vDebugFlag 131 # vDebugFlag 131
# debug flag for TSDB # debug flag for meta management messages
# tsdbDebugFlag 131 # mDebugFlag 135
# debug flag for continue query
# cqDebugFlag 131
# enable/disable recording the SQL in taos client # debug flag for wal
# enableRecordSql 0 # wDebugFlag 135
# generate core file when service crash # debug flag for sync module
# enableCoreFile 1 # sDebugFlag 135
# maximum display width of binary and nchar fields in the shell. The parts exceeding this limit will be hidden # debug flag for tsdb
# maxBinaryDisplayWidth 30 # tsdbDebugFlag 131
# enable/disable stream (continuous query) # debug flag for tq
# stream 1 # tqDebugFlag 131
# in retrieve blocking model, only in 50% query threads will be used in query processing in dnode # debug flag for fs
# retrieveBlockingModel 0 # fsDebugFlag 131
# the maximum allowed query buffer size in MB during query processing for each data node # debug flag for udf
# -1 no limit (default) # udfDebugFlag 131
# 0 no query allowed, queries are disabled
# queryBufferSize -1
# percent of redundant data in tsdb meta will compact meta data,0 means donot compact # debug flag for sma
# tsdbMetaCompactRatio 0 # smaDebugFlag 131
# default string type used for storing JSON String, options can be binary/nchar, default is nchar # debug flag for index
# defaultJSONStrType nchar # idxDebugFlag 131
# force TCP transmission # debug flag for tdb
# rpcForceTcp 0 # tdbDebugFlag 131
# unit MB. Flush vnode wal file if walSize > walFlushSize and walSize > cache*0.5*blocks # debug flag for meta
# walFlushSize 1024 # metaDebugFlag 131
# unit Hour. Latency of data migration # generate core file when service crash
# keepTimeOffset 0 # enableCoreFile 1
...@@ -29,6 +29,7 @@ else ...@@ -29,6 +29,7 @@ else
# Remove all links # Remove all links
${csudo}rm -f ${bin_link_dir}/taos || : ${csudo}rm -f ${bin_link_dir}/taos || :
${csudo}rm -f ${bin_link_dir}/taosd || : ${csudo}rm -f ${bin_link_dir}/taosd || :
${csudo}rm -f ${bin_link_dir}/udfd || :
${csudo}rm -f ${bin_link_dir}/taosadapter || : ${csudo}rm -f ${bin_link_dir}/taosadapter || :
${csudo}rm -f ${bin_link_dir}/taosdemo || : ${csudo}rm -f ${bin_link_dir}/taosdemo || :
${csudo}rm -f ${cfg_link_dir}/* || : ${csudo}rm -f ${cfg_link_dir}/* || :
......
...@@ -60,6 +60,7 @@ cp ${compile_dir}/../packaging/tools/set_core.sh ${pkg_dir}${install_home_pat ...@@ -60,6 +60,7 @@ cp ${compile_dir}/../packaging/tools/set_core.sh ${pkg_dir}${install_home_pat
cp ${compile_dir}/../packaging/tools/taosd-dump-cfg.gdb ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/../packaging/tools/taosd-dump-cfg.gdb ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosd ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/bin/taosd ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/udfd ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosBenchmark ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/bin/taosBenchmark ${pkg_dir}${install_home_path}/bin
if [ -f "${compile_dir}/build/bin/taosadapter" ]; then if [ -f "${compile_dir}/build/bin/taosadapter" ]; then
......
...@@ -69,6 +69,7 @@ cp %{_compiledir}/../packaging/tools/set_core.sh %{buildroot}%{homepath}/bin ...@@ -69,6 +69,7 @@ cp %{_compiledir}/../packaging/tools/set_core.sh %{buildroot}%{homepath}/bin
cp %{_compiledir}/../packaging/tools/taosd-dump-cfg.gdb %{buildroot}%{homepath}/bin cp %{_compiledir}/../packaging/tools/taosd-dump-cfg.gdb %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taos %{buildroot}%{homepath}/bin cp %{_compiledir}/build/bin/taos %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosd %{buildroot}%{homepath}/bin cp %{_compiledir}/build/bin/taosd %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/udfd %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosBenchmark %{buildroot}%{homepath}/bin cp %{_compiledir}/build/bin/taosBenchmark %{buildroot}%{homepath}/bin
if [ -f %{_compiledir}/build/bin/taosadapter ]; then if [ -f %{_compiledir}/build/bin/taosadapter ]; then
...@@ -204,6 +205,7 @@ if [ $1 -eq 0 ];then ...@@ -204,6 +205,7 @@ if [ $1 -eq 0 ];then
# Remove all links # Remove all links
${csudo}rm -f ${bin_link_dir}/taos || : ${csudo}rm -f ${bin_link_dir}/taos || :
${csudo}rm -f ${bin_link_dir}/taosd || : ${csudo}rm -f ${bin_link_dir}/taosd || :
${csudo}rm -f ${bin_link_dir}/udfd || :
${csudo}rm -f ${bin_link_dir}/taosadapter || : ${csudo}rm -f ${bin_link_dir}/taosadapter || :
${csudo}rm -f ${cfg_link_dir}/* || : ${csudo}rm -f ${cfg_link_dir}/* || :
${csudo}rm -f ${inc_link_dir}/taos.h || : ${csudo}rm -f ${inc_link_dir}/taos.h || :
......
...@@ -18,6 +18,7 @@ script_dir=$(dirname $(readlink -f "$0")) ...@@ -18,6 +18,7 @@ script_dir=$(dirname $(readlink -f "$0"))
clientName="taos" clientName="taos"
serverName="taosd" serverName="taosd"
udfdName="udfd"
configFile="taos.cfg" configFile="taos.cfg"
productName="TDengine" productName="TDengine"
emailName="taosdata.com" emailName="taosdata.com"
...@@ -192,6 +193,7 @@ function install_bin() { ...@@ -192,6 +193,7 @@ function install_bin() {
# Remove links # Remove links
${csudo}rm -f ${bin_link_dir}/${clientName} || : ${csudo}rm -f ${bin_link_dir}/${clientName} || :
${csudo}rm -f ${bin_link_dir}/${serverName} || : ${csudo}rm -f ${bin_link_dir}/${serverName} || :
${csudo}rm -f ${bin_link_dir}/${udfdName} || :
${csudo}rm -f ${bin_link_dir}/${adapterName} || : ${csudo}rm -f ${bin_link_dir}/${adapterName} || :
${csudo}rm -f ${bin_link_dir}/${uninstallScript} || : ${csudo}rm -f ${bin_link_dir}/${uninstallScript} || :
${csudo}rm -f ${bin_link_dir}/${demoName} || : ${csudo}rm -f ${bin_link_dir}/${demoName} || :
...@@ -205,6 +207,7 @@ function install_bin() { ...@@ -205,6 +207,7 @@ function install_bin() {
#Make link #Make link
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} || : [ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} || :
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} || : [ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} || :
[ -x ${install_main_dir}/bin/${udfdName} ] && ${csudo}ln -s ${install_main_dir}/bin/${udfdName} ${bin_link_dir}/${udfdName} || :
[ -x ${install_main_dir}/bin/${adapterName} ] && ${csudo}ln -s ${install_main_dir}/bin/${adapterName} ${bin_link_dir}/${adapterName} || : [ -x ${install_main_dir}/bin/${adapterName} ] && ${csudo}ln -s ${install_main_dir}/bin/${adapterName} ${bin_link_dir}/${adapterName} || :
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${demoName} || : [ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${demoName} || :
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName} || : [ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName} || :
...@@ -742,7 +745,7 @@ function is_version_compatible() { ...@@ -742,7 +745,7 @@ function is_version_compatible() {
fi fi
exist_version=$(${installDir}/bin/${serverName} -V | head -1 | cut -d ' ' -f 3) exist_version=$(${installDir}/bin/${serverName} -V | head -1 | cut -d ' ' -f 3)
vercomp $exist_version "2.0.16.0" vercomp $exist_version "3.0.0.0"
case $? in case $? in
2) 2)
prompt_force=1 prompt_force=1
......
...@@ -85,6 +85,7 @@ else ...@@ -85,6 +85,7 @@ else
${build_dir}/bin/${clientName} \ ${build_dir}/bin/${clientName} \
${taostools_bin_files} \ ${taostools_bin_files} \
${build_dir}/bin/taosadapter \ ${build_dir}/bin/taosadapter \
${build_dir}/bin/udfd \
${script_dir}/remove.sh \ ${script_dir}/remove.sh \
${script_dir}/set_core.sh \ ${script_dir}/set_core.sh \
${script_dir}/startPre.sh \ ${script_dir}/startPre.sh \
...@@ -318,7 +319,7 @@ if [ "$verMode" == "cluster" ]; then ...@@ -318,7 +319,7 @@ if [ "$verMode" == "cluster" ]; then
fi fi
# Copy release note # Copy release note
cp ${script_dir}/release_note ${install_dir} # cp ${script_dir}/release_note ${install_dir}
# exit 1 # exit 1
......
...@@ -118,6 +118,7 @@ function install_bin() { ...@@ -118,6 +118,7 @@ function install_bin() {
# Remove links # Remove links
${csudo}rm -f ${bin_link_dir}/taos || : ${csudo}rm -f ${bin_link_dir}/taos || :
${csudo}rm -f ${bin_link_dir}/taosd || : ${csudo}rm -f ${bin_link_dir}/taosd || :
${csudo}rm -f ${bin_link_dir}/udfd || :
${csudo}rm -f ${bin_link_dir}/taosadapter || : ${csudo}rm -f ${bin_link_dir}/taosadapter || :
${csudo}rm -f ${bin_link_dir}/taosBenchmark || : ${csudo}rm -f ${bin_link_dir}/taosBenchmark || :
${csudo}rm -f ${bin_link_dir}/taosdemo || : ${csudo}rm -f ${bin_link_dir}/taosdemo || :
...@@ -130,6 +131,7 @@ function install_bin() { ...@@ -130,6 +131,7 @@ function install_bin() {
#Make link #Make link
[ -x ${bin_dir}/taos ] && ${csudo}ln -s ${bin_dir}/taos ${bin_link_dir}/taos || : [ -x ${bin_dir}/taos ] && ${csudo}ln -s ${bin_dir}/taos ${bin_link_dir}/taos || :
[ -x ${bin_dir}/taosd ] && ${csudo}ln -s ${bin_dir}/taosd ${bin_link_dir}/taosd || : [ -x ${bin_dir}/taosd ] && ${csudo}ln -s ${bin_dir}/taosd ${bin_link_dir}/taosd || :
[ -x ${bin_dir}/udfd ] && ${csudo}ln -s ${bin_dir}/udfd ${bin_link_dir}/udfd || :
[ -x ${bin_dir}/taosadapter ] && ${csudo}ln -s ${bin_dir}/taosadapter ${bin_link_dir}/taosadapter || : [ -x ${bin_dir}/taosadapter ] && ${csudo}ln -s ${bin_dir}/taosadapter ${bin_link_dir}/taosadapter || :
[ -x ${bin_dir}/taosBenchmark ] && ${csudo}ln -sf ${bin_dir}/taosBenchmark ${bin_link_dir}/taosdemo || : [ -x ${bin_dir}/taosBenchmark ] && ${csudo}ln -sf ${bin_dir}/taosBenchmark ${bin_link_dir}/taosdemo || :
[ -x ${bin_dir}/taosBenchmark ] && ${csudo}ln -sf ${bin_dir}/taosBenchmark ${bin_link_dir}/taosBenchmark || : [ -x ${bin_dir}/taosBenchmark ] && ${csudo}ln -sf ${bin_dir}/taosBenchmark ${bin_link_dir}/taosBenchmark || :
......
...@@ -51,7 +51,7 @@ Source: taos.bat; DestDir: "{app}\include"; Flags: igNoreversion; ...@@ -51,7 +51,7 @@ Source: taos.bat; DestDir: "{app}\include"; Flags: igNoreversion;
;Source: taosdemo.png; DestDir: "{app}\include"; Flags: igNoreversion; ;Source: taosdemo.png; DestDir: "{app}\include"; Flags: igNoreversion;
;Source: taosShell.png; DestDir: "{app}\include"; Flags: igNoreversion; ;Source: taosShell.png; DestDir: "{app}\include"; Flags: igNoreversion;
Source: favicon.ico; DestDir: "{app}\include"; Flags: igNoreversion; Source: favicon.ico; DestDir: "{app}\include"; Flags: igNoreversion;
Source: {#MyAppSourceDir}{#MyAppDLLName}; DestDir: "{win}\System32"; Flags: igNoreversion; Source: {#MyAppSourceDir}{#MyAppDLLName}; DestDir: "{win}\System32"; Flags: 64bit;Check:IsWin64;
Source: {#MyAppSourceDir}{#MyAppCfgName}; DestDir: "{app}\cfg"; Flags: igNoreversion recursesubdirs createallsubdirs onlyifdoesntexist uninsneveruninstall Source: {#MyAppSourceDir}{#MyAppCfgName}; DestDir: "{app}\cfg"; Flags: igNoreversion recursesubdirs createallsubdirs onlyifdoesntexist uninsneveruninstall
Source: {#MyAppSourceDir}{#MyAppDriverName}; DestDir: "{app}\driver"; Flags: igNoreversion recursesubdirs createallsubdirs Source: {#MyAppSourceDir}{#MyAppDriverName}; DestDir: "{app}\driver"; Flags: igNoreversion recursesubdirs createallsubdirs
;Source: {#MyAppSourceDir}{#MyAppConnectorName}; DestDir: "{app}\connector"; Flags: igNoreversion recursesubdirs createallsubdirs ;Source: {#MyAppSourceDir}{#MyAppConnectorName}; DestDir: "{app}\connector"; Flags: igNoreversion recursesubdirs createallsubdirs
......
...@@ -9,6 +9,11 @@ IF (TD_GRANT) ...@@ -9,6 +9,11 @@ IF (TD_GRANT)
ADD_DEFINITIONS(-D_GRANT) ADD_DEFINITIONS(-D_GRANT)
ENDIF () ENDIF ()
IF (TD_STORAGE)
ADD_DEFINITIONS(-D_STORAGE)
TARGET_LINK_LIBRARIES(common PRIVATE storage)
ENDIF ()
target_include_directories( target_include_directories(
common common
PUBLIC "${TD_SOURCE_DIR}/include/common" PUBLIC "${TD_SOURCE_DIR}/include/common"
......
...@@ -165,58 +165,11 @@ int32_t tsTtlUnit = 86400; ...@@ -165,58 +165,11 @@ int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 86400; int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60; int32_t tsGrantHBInterval = 60;
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { #ifndef _STORAGE
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN); int32_t taosSetTfsCfg(SConfig *pCfg) { return 0; }
tsDiskCfg[index].level = level; #else
tsDiskCfg[index].primary = primary; int32_t taosSetTfsCfg(SConfig *pCfg);
uTrace("dataDir:%s, level:%d primary:%d is configured", v1, level, primary); #endif
}
static int32_t taosSetTfsCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "dataDir");
memset(tsDataDir, 0, PATH_MAX);
int32_t size = taosArrayGetSize(pItem->array);
if (size <= 0) {
tsDiskCfgNum = 1;
taosAddDataDir(0, pItem->str, 0, 1);
tstrncpy(tsDataDir, pItem->str, PATH_MAX);
if (taosMulMkDir(tsDataDir) != 0) {
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
return -1;
}
} else {
tsDiskCfgNum = size < TFS_MAX_DISKS ? size : TFS_MAX_DISKS;
for (int32_t index = 0; index < tsDiskCfgNum; ++index) {
SDiskCfg *pCfg = taosArrayGet(pItem->array, index);
memcpy(&tsDiskCfg[index], pCfg, sizeof(SDiskCfg));
if (pCfg->level == 0 && pCfg->primary == 1) {
tstrncpy(tsDataDir, pCfg->dir, PATH_MAX);
}
if (taosMulMkDir(pCfg->dir) != 0) {
uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr());
return -1;
}
}
}
if (tsDataDir[0] == 0) {
if (pItem->str != NULL) {
taosAddDataDir(tsDiskCfgNum, pItem->str, 0, 1);
tstrncpy(tsDataDir, pItem->str, PATH_MAX);
if (taosMulMkDir(tsDataDir) != 0) {
uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr());
return -1;
}
tsDiskCfgNum++;
} else {
uError("datadir not set");
return -1;
}
}
return 0;
}
struct SConfig *taosGetCfg() { struct SConfig *taosGetCfg() {
return tsCfg; return tsCfg;
......
...@@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode); ...@@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsReadyForRead(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode); bool vnodeIsRoleLeader(SVnode* pVnode);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "tmsg.h" #include "tmsg.h"
#include "tq.h" #include "tq.h"
int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock, int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
SBatchDeleteReq* deleteReq) { SBatchDeleteReq* deleteReq) {
ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT); ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
int32_t totRow = pDataBlock->info.rows; int32_t totRow = pDataBlock->info.rows;
...@@ -68,9 +68,10 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -68,9 +68,10 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
int32_t padding1 = 0; int32_t padding1 = 0;
void* padding2 = taosMemoryMalloc(1); void* padding2 = NULL;
taosArrayPush(schemaReqSz, &padding1); taosArrayPush(schemaReqSz, &padding1);
taosArrayPush(schemaReqs, &padding2); taosArrayPush(schemaReqs, &padding2);
continue;
} }
STagVal tagVal = { STagVal tagVal = {
...@@ -138,8 +139,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -138,8 +139,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
continue; continue;
} }
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
// TODO min /*int32_t rowSize = pDataBlock->info.rowSize;*/
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
int32_t schemaLen = 0; int32_t schemaLen = 0;
...@@ -150,7 +150,6 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -150,7 +150,6 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
} }
// assign data // assign data
// TODO
ret = rpcMallocCont(cap); ret = rpcMallocCont(cap);
ret->header.vgId = pVnode->config.vgId; ret->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq); ret->length = sizeof(SSubmitReq);
...@@ -161,13 +160,12 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -161,13 +160,12 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid; pDeleteReq->suid = suid;
tdBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq); tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
continue; continue;
} }
blkHead->numOfRows = htonl(pDataBlock->info.rows); blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version); blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = htobe64(suid); blkHead->suid = htobe64(suid);
// uid is assigned by vnode // uid is assigned by vnode
blkHead->uid = 0; blkHead->uid = 0;
......
...@@ -196,7 +196,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid ...@@ -196,7 +196,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey); tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
} }
tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 tsdbInfo("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" since %s", " since %s",
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code)); TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
return code; return code;
......
...@@ -283,7 +283,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -283,7 +283,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing"); vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) { if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg); vnodeRedirectRpcMsg(pVnode, pMsg);
return 0; return 0;
} }
...@@ -307,7 +307,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -307,7 +307,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) && pMsg->msgType == TDMT_VND_BATCH_META) &&
!vnodeIsLeader(pVnode)) { !vnodeIsReadyForRead(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg); vnodeRedirectRpcMsg(pVnode, pMsg);
return 0; return 0;
} }
......
...@@ -781,3 +781,17 @@ bool vnodeIsLeader(SVnode *pVnode) { ...@@ -781,3 +781,17 @@ bool vnodeIsLeader(SVnode *pVnode) {
return true; return true;
} }
bool vnodeIsReadyForRead(SVnode *pVnode) {
if (syncIsReady(pVnode->sync)) {
return true;
}
if (syncIsReadyForRead(pVnode->sync)) {
return true;
}
vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId,
syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync));
return false;
}
...@@ -4918,6 +4918,16 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) { ...@@ -4918,6 +4918,16 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) {
return numOfElems; return numOfElems;
} }
static SSampleInfo* getSampleOutputInfo(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
pInfo->data = (char*)pInfo + sizeof(SSampleInfo);
pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes);
return pInfo;
}
bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
...@@ -4972,7 +4982,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da ...@@ -4972,7 +4982,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
int32_t sampleFunction(SqlFunctionCtx* pCtx) { int32_t sampleFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
...@@ -4998,7 +5008,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -4998,7 +5008,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo); SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
pEntryInfo->complete = true; pEntryInfo->complete = true;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
......
...@@ -1399,7 +1399,7 @@ static int32_t translateTimelineFunc(STranslateContext* pCxt, SFunctionNode* pFu ...@@ -1399,7 +1399,7 @@ static int32_t translateTimelineFunc(STranslateContext* pCxt, SFunctionNode* pFu
"%s function must be used in select statements", pFunc->functionName); "%s function must be used in select statements", pFunc->functionName);
} }
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) && if (NULL != pSelect->pFromTable && QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
!isTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) { !isTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s function requires valid time series input", pFunc->functionName); "%s function requires valid time series input", pFunc->functionName);
...@@ -2037,16 +2037,13 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName, ...@@ -2037,16 +2037,13 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
code = getDBVgInfoImpl(pCxt, pName, &vgroupList); code = getDBVgInfoImpl(pCxt, pName, &vgroupList);
} }
if (TSDB_CODE_SUCCESS == code && if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) && 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) && isSelectStmt(pCxt->pCurrStmt) &&
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TAGS) &&
isSelectStmt(pCxt->pCurrStmt) &&
0 == taosArrayGetSize(vgroupList)) { 0 == taosArrayGetSize(vgroupList)) {
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true; ((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
} }
if (TSDB_CODE_SUCCESS == code && if (TSDB_CODE_SUCCESS == code && 0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
0 == strcmp(pRealTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB) &&
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) { 0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES)) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList); code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &vgroupList);
} }
......
...@@ -392,6 +392,29 @@ bool syncIsReady(int64_t rid) { ...@@ -392,6 +392,29 @@ bool syncIsReady(int64_t rid) {
return b; return b;
} }
bool syncIsReadyForRead(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return false;
}
ASSERT(rid == pSyncNode->rid);
// TODO: last not noop?
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
// if false, set error code
if (false == b) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
}
return b;
}
bool syncIsRestoreFinish(int64_t rid) { bool syncIsRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
...@@ -519,6 +542,30 @@ SyncTerm syncGetMyTerm(int64_t rid) { ...@@ -519,6 +542,30 @@ SyncTerm syncGetMyTerm(int64_t rid) {
return term; return term;
} }
SyncIndex syncGetLastIndex(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return SYNC_INDEX_INVALID;
}
ASSERT(rid == pSyncNode->rid);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return lastIndex;
}
SyncIndex syncGetCommitIndex(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return SYNC_INDEX_INVALID;
}
ASSERT(rid == pSyncNode->rid);
SyncIndex cmtIndex = pSyncNode->commitIndex;
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return cmtIndex;
}
SyncGroupId syncGetVgId(int64_t rid) { SyncGroupId syncGetVgId(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
...@@ -828,6 +875,15 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -828,6 +875,15 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
pSyncNode->changing = true; pSyncNode->changing = true;
} }
// not restored, vnode enable
if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
ret = -1;
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%ld, cmt:%ld", pSyncNode->vgId,
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
goto _END;
}
SRespStub stub; SRespStub stub;
stub.createTime = taosGetTimestampMs(); stub.createTime = taosGetTimestampMs();
stub.rpcMsg = *pMsg; stub.rpcMsg = *pMsg;
......
...@@ -37,9 +37,13 @@ uint32_t taosRandR(uint32_t *pSeed) { ...@@ -37,9 +37,13 @@ uint32_t taosRandR(uint32_t *pSeed) {
uint32_t taosSafeRand(void) { uint32_t taosSafeRand(void) {
#ifdef WINDOWS #ifdef WINDOWS
uint32_t seed; uint32_t seed = taosRand();
HCRYPTPROV hCryptProv; HCRYPTPROV hCryptProv;
if (!CryptAcquireContext(&hCryptProv, NULL, NULL, PROV_RSA_FULL, 0)) return seed; if (!CryptAcquireContext(&hCryptProv, NULL, NULL, PROV_RSA_FULL, 0)) {
if (!CryptAcquireContext(&hCryptProv, NULL, NULL, PROV_RSA_FULL, CRYPT_NEWKEYSET)) {
return seed;
}
}
if (hCryptProv != NULL) { if (hCryptProv != NULL) {
if (!CryptGenRandom(hCryptProv, 4, &seed)) return seed; if (!CryptGenRandom(hCryptProv, 4, &seed)) return seed;
} }
......
...@@ -237,8 +237,8 @@ ...@@ -237,8 +237,8 @@
./test.sh -f tsim/stream/distributeInterval0.sim ./test.sh -f tsim/stream/distributeInterval0.sim
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
./test.sh -f tsim/stream/distributeSession0.sim ./test.sh -f tsim/stream/distributeSession0.sim
#./test.sh -f tsim/stream/session0.sim ./test.sh -f tsim/stream/session0.sim
#./test.sh -f tsim/stream/session1.sim ./test.sh -f tsim/stream/session1.sim
./test.sh -f tsim/stream/state0.sim ./test.sh -f tsim/stream/state0.sim
./test.sh -f tsim/stream/triggerInterval0.sim ./test.sh -f tsim/stream/triggerInterval0.sim
./test.sh -f tsim/stream/triggerSession0.sim ./test.sh -f tsim/stream/triggerSession0.sim
......
...@@ -83,22 +83,22 @@ if $data11 != 3 then ...@@ -83,22 +83,22 @@ if $data11 != 3 then
goto loop0 goto loop0
endi endi
if $data12 != NULL then if $data12 != 10 then
print ======data12=$data12 print ======data12=$data12
goto loop0 goto loop0
endi endi
if $data13 != NULL then if $data13 != 10 then
print ======data13=$data13 print ======data13=$data13
goto loop0 goto loop0
endi endi
if $data14 != NULL then if $data14 != 1.100000000 then
print ======data14=$data14 print ======data14=$data14
return -1 return -1
endi endi
if $data15 != NULL then if $data15 != 0.000000000 then
print ======data15=$data15 print ======data15=$data15
return -1 return -1
endi endi
...@@ -141,38 +141,38 @@ if $data01 != 7 then ...@@ -141,38 +141,38 @@ if $data01 != 7 then
goto loop1 goto loop1
endi endi
if $data02 != NULL then if $data02 != 18 then
print =====data02=$data02 print =====data02=$data02
goto loop1 goto loop1
endi endi
if $data03 != NULL then if $data03 != 4 then
print =====data03=$data03 print =====data03=$data03
goto loop1 goto loop1
endi endi
if $data04 != NULL then if $data04 != 1.000000000 then
print ======$data04 print ======data04=$data04
return -1 return -1
endi endi
if $data05 != NULL then if $data05 != 1.154700538 then
print ======$data05 print ======data05=$data05
return -1 return -1
endi endi
if $data06 != 4 then if $data06 != 4 then
print ======$data06 print ======data06=$data06
return -1 return -1
endi endi
if $data07 != 1.000000000 then if $data07 != 1.000000000 then
print ======$data07 print ======data07=$data07
return -1 return -1
endi endi
if $data08 != 13 then if $data08 != 13 then
print ======$data08 print ======data08=$data08
return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册