提交 d60ec735 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/stream_compression

......@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.0.1.3")
SET(TD_VER_NUMBER "3.0.1.4")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -109,7 +109,7 @@ TDengine's JDBC URL specification format is:
For establishing connections, native connections differ slightly from REST connections.
<Tabs defaultValue="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="native connection">
```java
......
......@@ -113,7 +113,7 @@ username:password@protocol(address)/dbname?param=value
```
### Connecting via connector
<Tabs defaultValue="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="native connection">
_taosSql_ implements Go's `database/sql/driver` interface via cgo. You can use the [`database/sql`](https://golang.org/pkg/database/sql/) interface by simply introducing the driver.
......
......@@ -55,26 +55,28 @@ taos = "*"
</TabItem>
<TabItem value="native" label="native connection only">
<TabItem value="rest" label="Websocket only">
In `cargo.toml`, add [taos][taos] and enable the native feature:
In `cargo.toml`, add [taos][taos] and enable the ws feature:
```toml
[dependencies]
taos = { version = "*", default-features = false, features = ["native"] }
taos = { version = "*", default-features = false, features = ["ws"] }
```
</TabItem>
<TabItem value="rest" label="Websocket only">
In `cargo.toml`, add [taos][taos] and enable the ws feature:
<TabItem value="native" label="native connection only">
In `cargo.toml`, add [taos][taos] and enable the native feature:
```toml
[dependencies]
taos = { version = "*", default-features = false, features = ["ws"] }
taos = { version = "*", default-features = false, features = ["native"] }
```
</TabItem>
</Tabs>
## Establishing a connection
......
......@@ -81,7 +81,7 @@ pip3 install git+https://github.com/taosdata/taos-connector-python.git
### Verify
<Tabs groupId="connect" default="native">
<TabItem value="native" label="native connection">
<TabItem value="rest" label="native connection">
For native connection, you need to verify that both the client driver and the Python connector itself are installed correctly. The client driver and Python connector have been installed properly if you can successfully import the `taos` module. In the Python Interactive Shell, you can type.
......
......@@ -85,7 +85,7 @@ If using ARM64 Node.js on Windows 10 ARM, you must add "Visual C++ compilers and
### Install via npm
<Tabs defaultValue="install_native">
<Tabs defaultValue="install_rest">
<TabItem value="install_native" label="Install native connector">
```bash
......@@ -124,7 +124,7 @@ node nodejsChecker.js host=localhost
Please choose to use one of the connectors.
<Tabs defaultValue="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="native connection">
Install and import the `@tdengine/client` package.
......
......@@ -15,6 +15,7 @@ To write Telegraf data to TDengine requires the following preparations.
- The TDengine cluster is deployed and functioning properly
- taosAdapter is installed and running properly. Please refer to the [taosAdapter manual](/reference/taosadapter) for details.
- Telegraf has been installed. Please refer to the [official documentation](https://docs.influxdata.com/telegraf/v1.22/install/) for Telegraf installation.
- Telegraf collects the running status measurements of current system. You can enable [input plugins](https://docs.influxdata.com/telegraf/v1.22/plugins/) to insert [other formats](https://docs.influxdata.com/telegraf/v1.24/data_formats/input/) data to Telegraf then forward to TDengine.
## Configuration steps
<Telegraf />
......@@ -31,11 +32,12 @@ Use TDengine CLI to verify Telegraf correctly writing data to TDengine and read
```
taos> show databases;
name | created_time | ntables | vgroups | replica | quorum | days | keep | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | cachelast | precision | update | status |
====================================================================================================================================================================================================================================================================================
telegraf | 2022-04-20 08:47:53.488 | 22 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ns | 2 | ready |
log | 2022-04-20 07:19:50.260 | 9 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ms | 0 | ready |
Query OK, 2 row(s) in set (0.002401s)
name |
=================================
information_schema |
performance_schema |
telegraf |
Query OK, 3 rows in database (0.010568s)
taos> use telegraf;
Database changed.
......@@ -65,3 +67,11 @@ taos> select * from telegraf.system limit 10;
|
Query OK, 3 row(s) in set (0.013269s)
```
:::note
- TDengine take influxdb format data and create unique ID for table names by the rule.
The user can configure `smlChildTableName` paramter to generate specified table names if he/she needs. And he/she also need to insert data with specified data format.
For example, Add `smlChildTableName=tname` in the taos.cfg file. Insert data `st,tname=cpu1,t1=4 c1=3 1626006833639000000` then the table name will be cpu1. If there are multiple lines has same tname but different tag_set, the first line's tag_set will be used to automatically creating table and ignore other lines. Please refer to [TDengine Schemaless](/reference/schemaless/#Schemaless-Line-Protocol)
:::
......@@ -109,7 +109,7 @@ TDengine 的 JDBC URL 规范格式为:
对于建立连接,原生连接与 REST 连接有细微不同。
<Tabs defaultValue="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
```java
......
......@@ -114,7 +114,7 @@ username:password@protocol(address)/dbname?param=value
```
### 使用连接器进行连接
<Tabs defaultValue="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
_taosSql_ 通过 cgo 实现了 Go `database/sql/driver` 接口。只需要引入驱动就可以使用 [`database/sql`](https://golang.org/pkg/database/sql/) 的接口。
......
......@@ -55,23 +55,24 @@ taos = "*"
</TabItem>
<TabItem value="native" label="仅原生连接">
<TabItem value="rest" label="仅 Websocket">
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性:
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。
```toml
[dependencies]
taos = { version = "*", default-features = false, features = ["native"] }
taos = { version = "*", default-features = false, features = ["ws"] }
```
</TabItem>
<TabItem value="rest" label="仅 Websocket">
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `ws` 特性。
<TabItem value="native" label="仅原生连接">
在 `Cargo.toml` 文件中添加 [taos][taos],并启用 `native` 特性:
```toml
[dependencies]
taos = { version = "*", default-features = false, features = ["ws"] }
taos = { version = "*", default-features = false, features = ["native"] }
```
</TabItem>
......
......@@ -80,7 +80,7 @@ pip3 install git+https://github.com/taosdata/taos-connector-python.git
### 安装验证
<Tabs groupId="connect" default="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 `taos` 模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入:
......@@ -118,7 +118,7 @@ Requirement already satisfied: taospy in c:\users\username\appdata\local\program
在用连接器建立连接之前,建议先测试本地 TDengine CLI 到 TDengine 集群的连通性。
<Tabs>
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
请确保 TDengine 集群已经启动, 且集群中机器的 FQDN (如果启动的是单机版,FQDN 默认为 hostname)在本机能够解析, 可用 `ping` 命令进行测试:
......@@ -173,7 +173,7 @@ curl -u root:taosdata http://<FQDN>:<PORT>/rest/sql -d "select server_version()"
以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。
<Tabs>
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接" groupId="connect">
```python
......@@ -219,7 +219,7 @@ curl -u root:taosdata http://<FQDN>:<PORT>/rest/sql -d "select server_version()"
### 基本使用
<Tabs default="native" groupId="connect">
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
##### TaosConnection 类的使用
......@@ -289,7 +289,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
### 与 pandas 一起使用
<Tabs default="native" groupId="connect">
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
```python
......
......@@ -85,7 +85,7 @@ REST 连接器支持所有能运行 Node.js 的平台。
### 使用 npm 安装
<Tabs defaultValue="install_native">
<Tabs defaultValue="install_rest">
<TabItem value="install_native" label="安装原生连接器">
```bash
......@@ -124,7 +124,7 @@ node nodejsChecker.js host=localhost
请选择使用一种连接器。
<Tabs defaultValue="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
安装并引用 `@tdengine/client` 包。
......
......@@ -35,7 +35,7 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx"
## 支持的功能特性
<Tabs defaultValue="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
......@@ -96,7 +96,7 @@ dotnet add exmaple.csproj reference src/TDengine.csproj
## 建立连接
<Tabs defaultValue="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
......@@ -171,7 +171,7 @@ namespace TDengineExample
#### SQL 写入
<Tabs defaultValue="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
......@@ -203,7 +203,7 @@ namespace TDengineExample
#### 参数绑定
<Tabs defaultValue="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
......@@ -227,7 +227,7 @@ namespace TDengineExample
#### 同步查询
<Tabs defaultValue="native">
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
......
......@@ -16,6 +16,7 @@ Telegraf 是一款十分流行的指标采集开源软件。在数据采集和
- TDengine 集群已经部署并正常运行
- taosAdapter 已经安装并正常运行。具体细节请参考 [taosAdapter 的使用手册](/reference/taosadapter)
- Telegraf 已经安装。安装 Telegraf 请参考[官方文档](https://docs.influxdata.com/telegraf/v1.22/install/)
- Telegraf 默认采集系统运行状态数据。通过使能[输入插件](https://docs.influxdata.com/telegraf/v1.22/plugins/)方式可以输出[其他格式](https://docs.influxdata.com/telegraf/v1.24/data_formats/input/)的数据到 Telegraf 再写入到 TDengine中。
## 配置步骤
<Telegraf />
......@@ -32,11 +33,12 @@ sudo systemctl restart telegraf
```
taos> show databases;
name | created_time | ntables | vgroups | replica | quorum | days | keep | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | cachelast | precision | update | status |
====================================================================================================================================================================================================================================================================================
telegraf | 2022-04-20 08:47:53.488 | 22 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ns | 2 | ready |
log | 2022-04-20 07:19:50.260 | 9 | 1 | 1 | 1 | 10 | 3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | 0 | ms | 0 | ready |
Query OK, 2 row(s) in set (0.002401s)
name |
=================================
information_schema |
performance_schema |
telegraf |
Query OK, 3 rows in database (0.010568s)
taos> use telegraf;
Database changed.
......@@ -66,3 +68,11 @@ taos> select * from telegraf.system limit 10;
|
Query OK, 3 row(s) in set (0.013269s)
```
:::note
- TDengine 接收 influxdb 格式数据默认生成的子表名是根据规则生成的唯一 ID 值。
用户如需指定生成的表名,可以通过在 taos.cfg 里配置 smlChildTableName 参数来指定。如果通过控制输入数据格式,即可利用 TDengine 这个功能指定生成的表名。
举例如下:配置 smlChildTableName=tname 插入数据为 st,tname=cpu1,t1=4 c1=3 1626006833639000000 则创建的表名为 cpu1。如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。[TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议)
:::
......@@ -177,6 +177,7 @@ typedef struct SSDataBlock {
enum {
FETCH_TYPE__DATA = 1,
FETCH_TYPE__META,
FETCH_TYPE__SEP,
FETCH_TYPE__NONE,
};
......
......@@ -29,13 +29,13 @@ typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan;
typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*);
typedef int32_t (*localFetchFp)(void*, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*);
typedef struct {
void *handle;
void* handle;
bool localExec;
localFetchFp fp;
SArray *explainRes;
SArray* explainRes;
} SLocalFetch;
typedef struct {
......@@ -51,9 +51,9 @@ typedef struct {
bool initTqReader;
int32_t numOfVgroups;
void* sContext; // SSnapContext*
void* sContext; // SSnapContext*
void* pStateBackend;
void* pStateBackend;
} SReadHandle;
// in queue mode, data streams are seperated by msg
......@@ -136,6 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
* @param handle
* @return
*/
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch *pLocal);
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
......@@ -195,6 +196,8 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
......
......@@ -515,7 +515,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
topic = pMetaRspObj->topic;
vgId = pMetaRspObj->vgId;
} else if(TD_RES_TMQ_METADATA(msg)) {
} else if (TD_RES_TMQ_METADATA(msg)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
topic = pRspObj->topic;
vgId = pRspObj->vgId;
......@@ -715,7 +715,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
int32_t epoch = tmq->epoch;
SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
if (pReq == NULL) goto OVER;
pReq->consumerId = consumerId;
pReq->consumerId = htobe64(consumerId);
pReq->epoch = epoch;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
......@@ -1603,6 +1603,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL;
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
tscDebug("consumer %ld actual process poll rsp", tmq->consumerId);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
......@@ -1661,9 +1662,9 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// build rsp
void* pRsp = NULL;
if(pollRspWrapper->taosxRsp.createTableNum == 0){
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
}else{
} else {
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
}
taosFreeQitem(pollRspWrapper);
......@@ -1718,7 +1719,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
while (1) {
tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, timeout) < 0) return NULL;
if (tmqPollImpl(tmq, timeout) < 0) {
tscDebug("return since poll err");
/*return NULL;*/
}
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
......@@ -1850,12 +1854,12 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL;
}
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL;
}
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
}
return NULL;
}
......
......@@ -272,6 +272,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
mError("consumer %ld not exist", consumerId);
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1;
}
......
......@@ -379,6 +379,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1;
qDebugL("ast %s", topicObj.ast);
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
taosMemoryFree(topicObj.ast);
......
......@@ -217,7 +217,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver);
int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret);
int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver);
int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
......
......@@ -113,10 +113,20 @@ typedef struct {
} STqHandle;
typedef struct {
SMqDataRsp dataRsp;
SMqRspHead rspHead;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
SRpcHandleInfo pInfo;
} STqPushEntry;
struct STQ {
SVnode* pVnode;
char* path;
SHashObj* pPushMgr; // consumerId -> STqHandle*
SVnode* pVnode;
char* path;
SRWLatch pushLock;
SHashObj* pPushMgr; // consumerId -> STqPushEntry
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
......@@ -146,7 +156,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
// tqMeta
int32_t tqMetaOpen(STQ* pTq);
......
......@@ -99,7 +99,6 @@ void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsReadyForRead(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode);
#ifdef __cplusplus
......
......@@ -65,6 +65,11 @@ static void destroySTqHandle(void* data) {
}
}
static void tqPushEntryFree(void* data) {
STqPushEntry* p = *(void**)data;
taosMemoryFree(p);
}
STQ* tqOpen(const char* path, SVnode* pVnode) {
STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
if (pTq == NULL) {
......@@ -78,7 +83,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
taosInitRWLatch(&pTq->pushLock);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
......@@ -153,6 +160,65 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
return 0;
}
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
ASSERT(!pRsp->withSchema);
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
if (pRsp->blockNum > 0) {
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
} else {
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
}
}
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
if (code < 0) {
return -1;
}
int32_t tlen = sizeof(SMqRspHead) + len;
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
memcpy(buf, &pPushEntry->rspHead, sizeof(SMqRspHead));
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len);
tEncodeSMqDataRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg rsp = {
.info = pPushEntry->pInfo,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&rsp);
char buf1[80] = {0};
char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pPushEntry->rspHead.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
return 0;
}
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
......@@ -354,6 +420,8 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
return -1;
}
pRsp->withTbName = 0;
#if 0
pRsp->withTbName = pReq->withTbName;
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
......@@ -362,6 +430,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
return -1;
}
}
#endif
if (subType == TOPIC_SUB_TYPE__COLUMN) {
pRsp->withSchema = false;
......@@ -477,11 +546,33 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
// lock
taosWLockLatch(&pTq->pushLock);
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
#if 1
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
if (pPushEntry != NULL) {
pPushEntry->pInfo = pMsg->info;
memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
dataRsp.withTbName = 0;
memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp));
pPushEntry->rspHead.consumerId = consumerId;
pPushEntry->rspHead.epoch = reqEpoch;
pPushEntry->rspHead.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode));
// unlock
taosWUnLockLatch(&pTq->pushLock);
return 0;
}
}
taosWUnLockLatch(&pTq->pushLock);
#endif
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
......@@ -614,10 +705,22 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
ASSERT(code == 0);
taosWLockLatch(&pTq->pushLock);
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
}
taosWUnLockLatch(&pTq->pushLock);
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
}
tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
if (code != 0) {
tqError("cannot process tq delete req %s, since no such offset", pReq->subKey);
}
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
ASSERT(0);
......@@ -756,7 +859,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
atomic_add_fetch_32(&pHandle->epoch, 1);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
// TODO
ASSERT(0);
}
// close handle
}
return 0;
......
......@@ -15,7 +15,7 @@
#include "tq.h"
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) {
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1;
......@@ -243,7 +243,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
}
if (pHandle->fetchMeta) {
SSubmitBlk* pBlk = pReader->pBlock;
int32_t schemaLen = htonl(pBlk->schemaLen);
int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) {
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
......@@ -278,7 +278,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
}
if (pHandle->fetchMeta) {
SSubmitBlk* pBlk = pReader->pBlock;
int32_t schemaLen = htonl(pBlk->schemaLen);
int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) {
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
......
......@@ -213,6 +213,97 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
tqDebug("vgId:%d tq push msg ver %ld, type: %s", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType));
if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->pushLock);
tqDebug("vgId:%d push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr));
if (taosHashGetSize(pTq->pPushMgr) != 0) {
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
void* data = taosMemoryMalloc(msgLen);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory");
return -1;
}
memcpy(data, msg, msgLen);
SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver;
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pPushMgr, pIter);
if (pIter == NULL) break;
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
if (pHandle == NULL) {
tqDebug("vgId:%d cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
continue;
}
if (pPushEntry->dataRsp.reqOffset.version > ver) {
tqDebug("vgId:%d push entry req version %ld, while push version %ld, skip", pTq->pVnode->config.vgId,
pPushEntry->dataRsp.reqOffset.version, ver);
continue;
}
STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
// prepare scan mem data
qStreamScanMemData(task, pReq);
// exec
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) {
break;
}
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
pRsp->blockNum++;
}
tqDebug("vgId:%d tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, pPushEntry->subKey,
pRsp->blockNum);
if (pRsp->blockNum > 0) {
// set offset
tqOffsetResetToLog(&pRsp->rspOffset, ver);
// remove from hash
size_t kLen;
void* key = taosHashGetKey(pIter, &kLen);
void* keyCopy = taosMemoryMalloc(kLen);
memcpy(keyCopy, key, kLen);
taosArrayPush(cachedKeys, &keyCopy);
taosArrayPush(cachedKeyLens, &kLen);
tqPushDataRsp(pTq, pPushEntry);
}
}
// delete entry
for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) {
void* key = taosArrayGetP(cachedKeys, i);
size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i);
if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) {
ASSERT(0);
}
}
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
taosArrayDestroy(cachedKeyLens);
}
// unlock
taosWUnLockLatch(&pTq->pushLock);
}
if (vnodeIsRoleLeader(pTq->pVnode)) {
if (msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
......
......@@ -15,21 +15,20 @@
#include "tq.h"
bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE){
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
return true;
}
int16_t msgType = pHead->msgType;
char* body = pHead->body;
int32_t bodyLen = pHead->bodyLen;
int16_t msgType = pHead->msgType;
char* body = pHead->body;
int32_t bodyLen = pHead->bodyLen;
int64_t tbSuid = pHandle->execHandle.execTb.suid;
int64_t realTbSuid = 0;
SDecoder coder;
void* data = POINTER_SHIFT(body, sizeof(SMsgHead));
int32_t len = bodyLen - sizeof(SMsgHead);
int64_t tbSuid = pHandle->execHandle.execTb.suid;
int64_t realTbSuid = 0;
SDecoder coder;
void* data = POINTER_SHIFT(body, sizeof(SMsgHead));
int32_t len = bodyLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
......@@ -43,38 +42,38 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
if (tDecodeSVDropStbReq(&coder, &req) < 0) {
goto end;
}
realTbSuid = req.suid;
realTbSuid = req.suid;
} else if (msgType == TDMT_VND_CREATE_TABLE) {
SVCreateTbBatchReq req = {0};
if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
goto end;
}
int32_t needRebuild = 0;
int32_t needRebuild = 0;
SVCreateTbReq* pCreateReq = NULL;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){
if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
needRebuild++;
}
}
if(needRebuild == 0){
if (needRebuild == 0) {
// do nothing
}else if(needRebuild == req.nReqs){
} else if (needRebuild == req.nReqs) {
realTbSuid = tbSuid;
}else{
} else {
realTbSuid = tbSuid;
SVCreateTbBatchReq reqNew = {0};
reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){
if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
reqNew.nReqs++;
taosArrayPush(reqNew.pArray, pCreateReq);
}
}
int tlen;
int tlen;
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
void* buf = taosMemoryMalloc(tlen);
......@@ -107,7 +106,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
}
}
} else if (msgType == TDMT_VND_ALTER_TABLE) {
SVAlterTbReq req = {0};
SVAlterTbReq req = {0};
if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
goto end;
......@@ -129,32 +128,32 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
goto end;
}
int32_t needRebuild = 0;
int32_t needRebuild = 0;
SVDropTbReq* pDropReq = NULL;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq;
if(pDropReq->suid == tbSuid){
if (pDropReq->suid == tbSuid) {
needRebuild++;
}
}
if(needRebuild == 0){
if (needRebuild == 0) {
// do nothing
}else if(needRebuild == req.nReqs){
} else if (needRebuild == req.nReqs) {
realTbSuid = tbSuid;
}else{
} else {
realTbSuid = tbSuid;
SVDropTbBatchReq reqNew = {0};
reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq;
if(pDropReq->suid == tbSuid){
if (pDropReq->suid == tbSuid) {
reqNew.nReqs++;
taosArrayPush(reqNew.pArray, pDropReq);
}
}
int tlen;
int tlen;
int32_t ret = 0;
tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
void* buf = taosMemoryMalloc(tlen);
......@@ -177,11 +176,11 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
goto end;
}
realTbSuid = req.suid;
} else{
} else {
ASSERT(0);
}
end:
end:
tDecoderClear(&coder);
return tbSuid == realTbSuid;
}
......@@ -224,7 +223,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
code = -1;
goto END;
}
if(isValValidForTable(pHandle, pHead)){
if (isValValidForTable(pHandle, pHead)) {
*fetchOffset = offset;
code = 0;
goto END;
......@@ -241,7 +240,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
offset++;
}
}
END:
END:
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
return code;
}
......@@ -315,14 +314,18 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
return -1;
}
void* body = pReader->pWalReader->pHead->head.body;
#if 0
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
// TODO do filter
ret->fetchType = FETCH_TYPE__META;
ret->meta = pReader->pWalReader->pHead->head.body;
return 0;
} else {
tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
#endif
tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
#if 0
}
#endif
}
while (tqNextDataBlock(pReader)) {
......@@ -334,6 +337,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
continue;
}
ret->fetchType = FETCH_TYPE__DATA;
tqDebug("return data rows %d", ret->data.info.rows);
return 0;
}
......@@ -341,14 +345,14 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
ret->offset.type = TMQ_OFFSET__LOG;
ret->offset.version = pReader->ver;
ASSERT(pReader->ver >= 0);
ret->fetchType = FETCH_TYPE__NONE;
ret->fetchType = FETCH_TYPE__SEP;
tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version);
return 0;
}
}
}
int32_t tqReaderSetDataMsg(STqReader* pReader, SSubmitReq* pMsg, int64_t ver) {
int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) {
pReader->pMsg = pMsg;
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
......
......@@ -289,7 +289,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) {
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
return 0;
}
......@@ -311,7 +311,12 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) &&
!vnodeIsReadyForRead(pVnode)) {
!vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
return 0;
}
if (pMsg->msgType == TDMT_VND_CONSUME && !pVnode->restored) {
vnodeRedirectRpcMsg(pVnode, pMsg);
return 0;
}
......@@ -808,7 +813,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
SSubmitRsp submitRsp = {0};
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock;
SSubmitRsp rsp = {0};
SVCreateTbReq createTbReq = {0};
SDecoder decoder = {0};
int32_t nRows;
......@@ -921,7 +925,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
}
if (taosArrayGetSize(newTbUids) > 0) {
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids));
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
(int32_t)taosArrayGetSize(newTbUids));
}
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
......
......@@ -240,7 +240,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
if (!pVnode->restored) {
vGError("vgId:%d, msg:%p failed to process since not leader", vgId, pMsg);
vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
terrno = TSDB_CODE_APP_NOT_READY;
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY);
rpcFreeCont(pMsg->pCont);
......@@ -796,16 +796,3 @@ bool vnodeIsLeader(SVnode *pVnode) {
return true;
}
bool vnodeIsReadyForRead(SVnode *pVnode) {
if (syncIsReady(pVnode->sync)) {
return true;
}
if (syncIsReadyForRead(pVnode->sync)) {
return true;
}
vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId,
syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync));
return false;
}
......@@ -146,6 +146,7 @@ typedef struct {
SMqMetaRsp metaRsp; // for tmq fetching meta
int8_t returned;
int64_t snapshotVer;
const SSubmitReq* pReq;
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN];
......@@ -192,6 +193,7 @@ enum {
OP_OPENED = 0x1,
OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9,
OP_EXEC_RECV = 0x11,
};
typedef struct SOperatorFpSet {
......
......@@ -486,7 +486,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
if (pLocal) {
memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
}
taosArrayClearEx(pResList, freeBlock);
int64_t curOwner = 0;
......@@ -774,6 +774,14 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
return TSDB_CODE_SUCCESS;
}
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
ASSERT(pTaskInfo->streamInfo.pReq == NULL);
pTaskInfo->streamInfo.pReq = pReq;
return 0;
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
......
......@@ -53,7 +53,7 @@ static void destroyIndefinitOperatorInfo(void* param) {
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -184,7 +184,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
//TODO: optimize it later when partition by + limit
// TODO: optimize it later when partition by + limit
if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) ||
(pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
doSetOperatorCompleted(pOperator);
......@@ -206,9 +206,16 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
blockDataCleanup(pFinalRes);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pTaskInfo->streamInfo.pReq) {
pOperator->status = OP_OPENED;
}
qDebug("enter project");
if (pOperator->status == OP_EXEC_DONE) {
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
pOperator->status = OP_OPENED;
qDebug("projection in queue model, set status open and return null");
return NULL;
}
......@@ -237,9 +244,23 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE && pFinalRes->info.rows == 0) {
pOperator->status = OP_OPENED;
if (pOperator->status == OP_EXEC_RECV) {
continue;
} else {
return NULL;
}
}
qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status,
pFinalRes->info.rows);
doSetOperatorCompleted(pOperator);
break;
}
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
qDebug("set status recv");
pOperator->status = OP_EXEC_RECV;
}
// for stream interval
if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
......@@ -298,6 +319,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
qDebug("project return %d rows, status %d", pFinalRes->info.rows, pOperator->status);
break;
}
} else {
......
......@@ -1435,6 +1435,43 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SStreamScanInfo* pInfo = pOperator->info;
qDebug("queue scan called");
if (pTaskInfo->streamInfo.pReq != NULL) {
if (pInfo->tqReader->pMsg == NULL) {
pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;
const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;
if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
qError("submit msg messed up when initing stream submit block %p", pSubmit);
pInfo->tqReader->pMsg = NULL;
pTaskInfo->streamInfo.pReq = NULL;
ASSERT(0);
}
}
blockDataCleanup(pInfo->pRes);
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextDataBlock(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue;
}
setBlockIntoRes(pInfo, &block);
if (pBlockInfo->rows > 0) {
return pInfo->pRes;
}
}
pInfo->tqReader->pMsg = NULL;
pTaskInfo->streamInfo.pReq = NULL;
return NULL;
}
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
......@@ -1467,8 +1504,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
ASSERT(0);
}
// TODO clean data block
if (pInfo->pRes->info.rows > 0) {
pOperator->status = OP_EXEC_RECV;
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
return pInfo->pRes;
}
......@@ -1477,18 +1514,19 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
// pTaskInfo->streamInfo.lastStatus = ret.offset;
// pTaskInfo->streamInfo.metaBlk = ret.meta;
// return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) {
} else if (ret.fetchType == FETCH_TYPE__NONE ||
(ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
char formatBuf[80];
tFormatOffset(formatBuf, 80, &ret.offset);
qDebug("queue scan log return null, offset %s", formatBuf);
pOperator->status = OP_OPENED;
return NULL;
} else {
ASSERT(0);
}
}
#if 0
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
......@@ -1497,6 +1535,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
}
qDebug("stream scan tsdb return null");
return NULL;
#endif
} else {
ASSERT(0);
return NULL;
......
......@@ -90,12 +90,12 @@ static void endTlvEncode(STlvEncoder* pEncoder, char** pMsg, int32_t* pLen) {
static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pValue, int32_t len) {
int32_t tlvLen = sizeof(STlv) + len;
if (pEncoder->offset + tlvLen > pEncoder->allocSize) {
void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize * 2);
pEncoder->allocSize = TMAX(pEncoder->allocSize * 2, pEncoder->allocSize + pEncoder->offset + tlvLen);
void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize);
if (NULL == pNewBuf) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pEncoder->pBuf = pNewBuf;
pEncoder->allocSize = pEncoder->allocSize * 2;
}
STlv* pTlv = (STlv*)(pEncoder->pBuf + pEncoder->offset);
pTlv->type = htons(type);
......
......@@ -396,29 +396,6 @@ bool syncIsReady(int64_t rid) {
return b;
}
bool syncIsReadyForRead(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return false;
}
ASSERT(rid == pSyncNode->rid);
// TODO: last not noop?
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
// if false, set error code
if (false == b) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
}
return b;
}
bool syncIsRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
......
......@@ -23,7 +23,7 @@ node query_example.js
node async_query_example.js
node subscribe_demo.js
# node subscribe_demo.js
taos -s "drop topic if exists topic_name_example"
taos -s "drop database if exists power"
......@@ -39,4 +39,4 @@ taos -s "drop database if exists test"
node opentsdb_telnet_example.js
taos -s "drop database if exists test"
node opentsdb_json_example.js
\ No newline at end of file
node opentsdb_json_example.js
......@@ -303,7 +303,7 @@
./test.sh -f tsim/insert/backquote.sim -m
# unsupport ./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
./test.sh -f tsim/query/interval-offset.sim -m
./test.sh -f tsim/tmq/basic3.sim -m
# unsupport ./test.sh -f tsim/tmq/basic3.sim -m
./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/qnode/basic1.sim -m
# unsupport ./test.sh -f tsim/mnode/basic1.sim -m
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册