diff --git a/docs-cn/07-develop/01-connect/index.md b/docs-cn/07-develop/01-connect/index.md index 3a15d03f93cee7dd064f29b4911019cae3632b9a..b1857b973932b4f9cfd1564b709dd79f26701951 100644 --- a/docs-cn/07-develop/01-connect/index.md +++ b/docs-cn/07-develop/01-connect/index.md @@ -212,7 +212,7 @@ curl -L -o php-tdengine.tar.gz https://github.com/Yurunsoft/php-tdengine/archive && tar -xzf php-tdengine.tar.gz -C php-tdengine --strip-components=1 ``` -> 版本 `v1.0.0` 可替换为任意更新的版本,可在 Release 中查看最新版本。 +> 版本 `v1.0.2` 只是示例,可替换为任意更新的版本,可在 [TDengine PHP Connector 发布历史](https://github.com/Yurunsoft/php-tdengine/releases) 中查看可用版本。 **非 Swoole 环境:** diff --git a/docs-cn/14-reference/03-connector/php.mdx b/docs-cn/14-reference/03-connector/php.mdx new file mode 100644 index 0000000000000000000000000000000000000000..f150aed4c8a6ba855d5e830a2944a6d6f88ab0f5 --- /dev/null +++ b/docs-cn/14-reference/03-connector/php.mdx @@ -0,0 +1,150 @@ +--- +sidebar_position: 1 +sidebar_label: PHP +title: PHP Connector +--- + +`php-tdengine` 是由社区贡献的 PHP 连接器扩展,还特别支持了 Swoole 协程化。 + +PHP 连接器依赖 TDengine 客户端驱动。 + +项目地址: + +TDengine 服务端或客户端安装后,`taos.h` 位于: + +- Linux:`/usr/local/taos/include` +- Windows:`C:\TDengine\include` + +TDengine 客户端驱动的动态库位于: + +- Linux: `/usr/local/taos/driver/libtaos.so` +- Windows: `C:\TDengine\taos.dll` + +## 支持的平台 + +* Windows、Linux、MacOS + +* PHP >= 7.4 + +* TDengine >= 2.0 + +* Swoole >= 4.8 (可选) + +## 支持的版本 + +TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一对应的强对应关系,建议使用与 TDengine 服务端完全相同的客户端驱动。虽然低版本的客户端驱动在前三段版本号一致(即仅第四段版本号不同)的情况下也能够与高版本的服务端相兼容,但这并非推荐用法。强烈不建议使用高版本的客户端驱动访问低版本的服务端。 + +## 安装步骤 + +### 安装 TDengine 客户端驱动 + +TDengine 客户端驱动的安装请参考 [安装指南](/reference/connector#安装步骤) + +### 编译安装 php-tdengine + +**下载代码并解压:** + +```shell +curl -L -o php-tdengine.tar.gz https://github.com/Yurunsoft/php-tdengine/archive/refs/tags/v1.0.2.tar.gz \ +&& mkdir php-tdengine \ +&& tar -xzf php-tdengine.tar.gz -C php-tdengine --strip-components=1 +``` + +> 版本 `v1.0.2` 可替换为任意更新的版本,可在 [TDengine PHP Connector 发布历史](https://github.com/Yurunsoft/php-tdengine/releases)。 + +**非 Swoole 环境:** + +```shell +phpize && ./configure && make -j && make install +``` + +**手动指定 tdengine 目录:** + +```shell +phpize && ./configure --with-tdengine-dir=/usr/local/Cellar/tdengine/2.4.0.0 && make -j && make install +``` + +> `--with-tdengine-dir=` 后跟上 tdengine 目录。 +> 适用于默认找不到的情况,或者 MacOS 系统用户。 + +**Swoole 环境:** + +```shell +phpize && ./configure --enable-swoole && make -j && make install +``` + +**启用扩展:** + +方法一:在 `php.ini` 中加入 `extension=tdengine` + +方法二:运行带参数 `php -dextension=tdengine test.php` + +## 示例程序 + +本节展示了使用客户端驱动访问 TDengine 集群的常见访问方式的示例代码。 + +> 所有错误都会抛出异常: `TDengine\Exception\TDengineException` + +### 建立连接 + +
+建立连接 + +```c +{{#include docs-examples/php/connect.php}} +``` + +
+ +### 插入数据 + +
+插入数据 + +```c +{{#include docs-examples/php/insert.php}} +``` + +
+ +### 同步查询 + +
+同步查询 + +```c +{{#include docs-examples/php/query.php}} +``` + +
+ +### 参数绑定 + +
+参数绑定 + +```c +{{#include docs-examples/php/insert_stmt.php}} +``` + +
+ +## 常量 + +| 常量 | 说明 | +| ------------ | ------------ +| `TDengine\TSDB_DATA_TYPE_NULL` | null | +| `TDengine\TSDB_DATA_TYPE_BOOL` | bool | +| `TDengine\TSDB_DATA_TYPE_TINYINT` | tinyint | +| `TDengine\TSDB_DATA_TYPE_SMALLINT` | smallint | +| `TDengine\TSDB_DATA_TYPE_INT` | int | +| `TDengine\TSDB_DATA_TYPE_BIGINT` | bigint | +| `TDengine\TSDB_DATA_TYPE_FLOAT` | float | +| `TDengine\TSDB_DATA_TYPE_DOUBLE` | double | +| `TDengine\TSDB_DATA_TYPE_BINARY` | binary | +| `TDengine\TSDB_DATA_TYPE_TIMESTAMP` | timestamp | +| `TDengine\TSDB_DATA_TYPE_NCHAR` | nchar | +| `TDengine\TSDB_DATA_TYPE_UTINYINT` | utinyint | +| `TDengine\TSDB_DATA_TYPE_USMALLINT` | usmallint | +| `TDengine\TSDB_DATA_TYPE_UINT` | uint | +| `TDengine\TSDB_DATA_TYPE_UBIGINT` | ubigint | diff --git a/docs-en/07-develop/01-connect/index.md b/docs-en/07-develop/01-connect/index.md index b9217b828d0d08c4ff1eacd27406d4e3bfba8eac..720f8e2384c565d5494ce7d84d531188dae96fe0 100644 --- a/docs-en/07-develop/01-connect/index.md +++ b/docs-en/07-develop/01-connect/index.md @@ -19,7 +19,7 @@ import InstallOnLinux from "../../14-reference/03-connector/\_windows_install.md import VerifyLinux from "../../14-reference/03-connector/\_verify_linux.mdx"; import VerifyWindows from "../../14-reference/03-connector/\_verify_windows.mdx"; -Any application programs running on any kind of platform can access TDengine through the REST API provided by TDengine. For details, please refer to [REST API](/reference/rest-api/). Additionally, application programs can use the connectors of multiple programming languages including C/C++, Java, Python, Go, Node.js, C#, and Rust to access TDengine. This chapter describes how to establish a connection to TDengine and briefly introduces how to install and use connectors. For details about the connectors, please refer to [Connectors](/reference/connector/) +Any application programs running on any kind of platform can access TDengine through the REST API provided by TDengine. For details, please refer to [REST API](/reference/rest-api/). Additionally, application programs can use the connectors of multiple programming languages including C/C++, Java, Python, Go, Node.js, C#, Rust to access TDengine. This chapter describes how to establish a connection to TDengine and briefly introduces how to install and use connectors. TDengine community also provides connectors in LUA and PHP languages. For details about the connectors, please refer to [Connectors](/reference/connector/). ## Establish Connection @@ -200,6 +200,46 @@ install.packages("RJDBC") If the client driver (taosc) is already installed, then the C connector is already available.
+ + + +**Download Source Code Package and Unzip:** + +```shell +curl -L -o php-tdengine.tar.gz https://github.com/Yurunsoft/php-tdengine/archive/refs/tags/v1.0.2.tar.gz \ +&& mkdir php-tdengine \ +&& tar -xzf php-tdengine.tar.gz -C php-tdengine --strip-components=1 +``` + +> Version number `v1.0.2` is only for example, it can be replaced to any newer version, please check available version from [TDengine PHP Connector Releases](https://github.com/Yurunsoft/php-tdengine/releases). + +**Non-Swoole Environment:** + +```shell +phpize && ./configure && make -j && make install +``` + +**Specify TDengine Location:** + +```shell +phpize && ./configure --with-tdengine-dir=/usr/local/Cellar/tdengine/2.4.0.0 && make -j && make install +``` + +> `--with-tdengine-dir=` is followed by the TDengine installation location. +> This way is useful in case TDengine location can't be found automatically or macOS. + +**Swoole Environment:** + +```shell +phpize && ./configure --enable-swoole && make -j && make install +``` + +**Enable The Extension:** + +Option One: Add `extension=tdengine` in `php.ini` + +Option Two: Specify the extension on CLI `php -d extension=tdengine test.php` + diff --git a/docs-en/14-reference/03-connector/cpp.mdx b/docs-en/14-reference/03-connector/cpp.mdx index d549413012d1f17edf4711ae51a56ba5696fcbe3..e0cdf2bf2ce7eed06cacaf71a5b9baf56a3aee2b 100644 --- a/docs-en/14-reference/03-connector/cpp.mdx +++ b/docs-en/14-reference/03-connector/cpp.mdx @@ -26,7 +26,7 @@ Please refer to [list of supported platforms](/reference/connector#supported-pla ## Supported versions -The version number of the TDengine client driver and the version number of the TDengine server should be the same. A lower version of the client driver is compatible with a higher version of the server, if the first three version numbers are the same (i.e., only the fourth version number is different). For e.g. if the client version is x.y.z.1 and the server version is x.y.z.2 the client and server are compatible. But in general we do not recommend using a lower client version with a newer server version. It is also strongly discouraged to use a higher version of the client driver to access a lower version of the TDengine server. +The version number of the TDengine client driver and the version number of the TDengine server should be same. A lower version of the client driver is compatible with a higher version of the server, if the first three version numbers are the same (i.e., only the fourth version number is different). For e.g. if the client version is x.y.z.1 and the server version is x.y.z.2 the client and server are compatible. But in general we do not recommend using a lower client version with a newer server version. It is also strongly discouraged to use a higher version of the client driver to access a lower version of the TDengine server. ## Installation steps diff --git a/docs-en/14-reference/03-connector/php.mdx b/docs-en/14-reference/03-connector/php.mdx new file mode 100644 index 0000000000000000000000000000000000000000..839a5c8c3cd27f39b234b51aab4d41ad05e93fbc --- /dev/null +++ b/docs-en/14-reference/03-connector/php.mdx @@ -0,0 +1,150 @@ +--- +sidebar_position: 1 +sidebar_label: PHP +title: PHP Connector +--- + +`php-tdengine` is the TDengine PHP connector provided by TDengine community. In particular, it supports Swoole coroutine. + +PHP Connector relies on TDengine client driver. + +Project Repository: + +After TDengine client or server is installed, `taos.h` is located at: + +- Linux:`/usr/local/taos/include` +- Windows:`C:\TDengine\include` + +TDengine client driver is located at: + +- Linux: `/usr/local/taos/driver/libtaos.so` +- Windows: `C:\TDengine\taos.dll` + +## Supported Platforms + +- Windows、Linux、MacOS + +- PHP >= 7.4 + +- TDengine >= 2.0 + +- Swoole >= 4.8 (Optional) + +## Supported Versions + +Because the version of TDengine client driver is tightly associated with that of TDengine server, it's strongly suggested to use the client driver of same version as TDengine server, even though the client driver can work with TDengine server if the first 3 sections of the versions are same. + +## Installation + +### Install TDengine Client Driver + +Regarding how to install TDengine client driver please refer to [Install Client Driver](/reference/connector#installation-steps) + +### Install php-tdengine + +**Download Source Code Package and Unzip:** + +```shell +curl -L -o php-tdengine.tar.gz https://github.com/Yurunsoft/php-tdengine/archive/refs/tags/v1.0.2.tar.gz \ +&& mkdir php-tdengine \ +&& tar -xzf php-tdengine.tar.gz -C php-tdengine --strip-components=1 +``` + +> Version number `v1.0.2` is only for example, it can be replaced to any newer version, please find available versions in [TDengine PHP Connector Releases](https://github.com/Yurunsoft/php-tdengine/releases). + +**Non-Swoole Environment:** + +```shell +phpize && ./configure && make -j && make install +``` + +**Specify TDengine location:** + +```shell +phpize && ./configure --with-tdengine-dir=/usr/local/Cellar/tdengine/2.4.0.0 && make -j && make install +``` + +> `--with-tdengine-dir=` is followed by TDengine location. +> It's useful in case TDengine installatio location can't be found automatically or MacOS. + +**Swoole Environment:** + +```shell +phpize && ./configure --enable-swoole && make -j && make install +``` + +**Enable Extension:** + +Option One: Add `extension=tdengine` in `php.ini`. + +Option Two: Use CLI `php -dextension=tdengine test.php`. + +## Sample Programs + +In this section a few sample programs which use TDengine PHP connector to access TDengine cluster are demonstrated. + +> Any error would throw exception: `TDengine\Exception\TDengineException` + +### Establish Conection + +
+Establish Connection + +```c +{{#include docs-examples/php/connect.php}} +``` + +
+ +### Insert Data + +
+Insert Data + +```c +{{#include docs-examples/php/insert.php}} +``` + +
+ +### Synchronous Query + +
+Synchronous Query + +```c +{{#include docs-examples/php/query.php}} +``` + +
+ +### Parameter Binding + +
+Parameter Binding + +```c +{{#include docs-examples/php/insert_stmt.php}} +``` + +
+ +## Constants + +| Constant | Description | +| ----------------------------------- | ----------- | +| `TDengine\TSDB_DATA_TYPE_NULL` | null | +| `TDengine\TSDB_DATA_TYPE_BOOL` | bool | +| `TDengine\TSDB_DATA_TYPE_TINYINT` | tinyint | +| `TDengine\TSDB_DATA_TYPE_SMALLINT` | smallint | +| `TDengine\TSDB_DATA_TYPE_INT` | int | +| `TDengine\TSDB_DATA_TYPE_BIGINT` | bigint | +| `TDengine\TSDB_DATA_TYPE_FLOAT` | float | +| `TDengine\TSDB_DATA_TYPE_DOUBLE` | double | +| `TDengine\TSDB_DATA_TYPE_BINARY` | binary | +| `TDengine\TSDB_DATA_TYPE_TIMESTAMP` | timestamp | +| `TDengine\TSDB_DATA_TYPE_NCHAR` | nchar | +| `TDengine\TSDB_DATA_TYPE_UTINYINT` | utinyint | +| `TDengine\TSDB_DATA_TYPE_USMALLINT` | usmallint | +| `TDengine\TSDB_DATA_TYPE_UINT` | uint | +| `TDengine\TSDB_DATA_TYPE_UBIGINT` | ubigint | diff --git a/docs-examples/php/insert_stmt.php b/docs-examples/php/insert_stmt.php index 99a9a6aef3f69a8880316355e17396e06ca985c9..c927a9b0ced46461daeda0f53b27e2f9d67d5860 100644 --- a/docs-examples/php/insert_stmt.php +++ b/docs-examples/php/insert_stmt.php @@ -22,7 +22,7 @@ try { // set table name and tags $stmt->setTableNameTags('d1001', [ - // 支持格式同参数绑定 + // same format as parameter binding [TDengine\TSDB_DATA_TYPE_BINARY, 'California.SanFrancisco'], [TDengine\TSDB_DATA_TYPE_INT, 2], ]); diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 97ff2886fcb95fcfaca19ba4baf70b64160855ca..943fcbdb5329c4c53e9f4c663497419091f8b4d2 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -25,7 +25,7 @@ int32_t init_env() { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; @@ -82,7 +82,7 @@ int32_t create_stream() { /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query( - pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from tu1 interval(10m)"); + pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 interval(10m)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 47c26ed4317c49a3436ef18d8750b1c87b139504..23945cd50075db667fad613774dcc716edaeab30 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -933,6 +933,7 @@ typedef struct { int64_t numOfProcessedFetch; int64_t numOfProcessedDrop; int64_t numOfProcessedHb; + int64_t numOfProcessedDelete; int64_t cacheDataSize; int64_t numOfQueryInQueue; int64_t numOfFetchInQueue; @@ -2689,20 +2690,20 @@ int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq); int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq); typedef struct { - int64_t delUid; - int64_t tbUid; // super/child/normal table - int8_t type; // table type - int16_t nWnds; - char* tbFullName; - char* subPlan; - STimeWindow wnds[]; + SMsgHead header; + uint64_t sId; + uint64_t queryId; + uint64_t taskId; + uint32_t sqlLen; + uint32_t phyLen; + char* sql; + char* msg; } SVDeleteReq; -int32_t tEncodeSVDeleteReq(SEncoder* pCoder, const SVDeleteReq* pReq); -int32_t tDecodeSVDeleteReq(SDecoder* pCoder, SVDeleteReq* pReq); +int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); +int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); typedef struct { - int32_t code; int64_t affectedRows; } SVDeleteRsp; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 73f8515f22df69b8a2a4bf2f2cfff2170331a903..8a811774b2d435732bf368b4e8b75a9d634f0aed 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -187,6 +187,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 2cc9caca6fa4d8e4dd4bd6a8d7b490e7baaf2c34..c23cf162aa6e1ca35e3750aa54e91659c45d6b08 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -32,6 +32,18 @@ extern "C" { struct SDataSink; struct SSDataBlock; +typedef struct SDeleterRes { + uint64_t uid; + SArray* uidList; + int64_t skey; + int64_t ekey; + int64_t affectedRows; +} SDeleterRes; + +typedef struct SDeleterParam { + SArray* pUidList; +} SDeleterParam; + typedef struct SDataSinkStat { uint64_t cachedSize; } SDataSinkStat; @@ -64,7 +76,7 @@ typedef struct SOutputData { * @param pHandle output * @return error code */ -int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle); +int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam); int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat); diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index aa20082fe02c75c74306adcbdbddc250384ac116..f3f147955a03cb99f1ea806441eddd63ba8f96fe 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -31,7 +31,12 @@ enum { NODE_TYPE_MNODE, }; - +typedef struct SDeleteRes { + uint64_t uid; + SArray* uidList; + int64_t skey; + int64_t ekey; +} SDeleteRes; typedef struct SQWorkerCfg { uint32_t maxSchedulerNum; @@ -47,6 +52,7 @@ typedef struct { uint64_t fetchProcessed; uint64_t dropProcessed; uint64_t hbProcessed; + uint64_t deleteProcessed; uint64_t numOfQueryInQueue; uint64_t numOfFetchInQueue; @@ -74,6 +80,8 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); +int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes); + void qWorkerDestroy(void **qWorkerMgmt); int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bb0b6dc0a0c40c6eab5ac68535beb15667e4affc..db8d3ac033636adba4d4807897adea5b1e4bd1d5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -285,12 +285,6 @@ struct SStreamTask { int8_t inputStatus; int8_t outputStatus; -#if 0 - STaosQueue* inputQ; - STaosQall* inputQAll; - STaosQueue* outputQ; - STaosQall* outputQAll; -#endif SStreamQueue* inputQueue; SStreamQueue* outputQueue; @@ -371,13 +365,6 @@ typedef struct { int32_t taskId; } SStreamTaskRunReq; -typedef struct { - // SMsgHead head; - int64_t streamId; - int64_t version; - SArray* res; // SArray -} SStreamSinkReq; - typedef struct { int64_t streamId; int32_t taskId; @@ -411,11 +398,9 @@ typedef struct { int8_t inputStatus; } SStreamTaskRecoverRsp; -int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb); +int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); -int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input); -int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input); -int32_t streamDequeueOutput(SStreamTask* pTask, void** output); +int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb); int32_t streamTaskRun(SStreamTask* pTask); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c71c7d3487078f79ceee63ba9079670a8a7d4daa..62026dd577a608d6a5595cf508ba62de6b7aa177 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -457,7 +457,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList return pRequest->code; } - if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) { + if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) { pRequest->body.resInfo.numOfRows = res.numOfRows; if (pRequest->body.queryJob != 0) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 61638b553da629f61a6f28b7e9bec699ee539bb1..93c1b74ad2b9dec7e84ef574de8e9da99ffd6779 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1849,11 +1849,12 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo } ret->length = htonl(ret->length); - taosArrayDestroy(tagArray); + taosArrayDestroy(tagArray); return ret; } -void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { +void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, + int8_t needCompress) { int32_t* actualLen = (int32_t*)data; data += sizeof(int32_t); @@ -1947,4 +1948,4 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t ASSERT(pStart - pData == dataLen); return pStart; -} \ No newline at end of file +} diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7041c9478e27c3d8421040da225aafeeef177ba3..773e430b8365eb8456c8c90ddcfe4194c921922b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -932,6 +932,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tEncodeI64(&encoder, pReq->qload.numOfProcessedFetch) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDrop) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfProcessedHb) < 0) return -1; + if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDelete) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.cacheDataSize) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfQueryInQueue) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.numOfFetchInQueue) < 0) return -1; @@ -1001,6 +1002,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedFetch) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDrop) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedHb) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDelete) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.cacheDataSize) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfQueryInQueue) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.numOfFetchInQueue) < 0) return -1; @@ -3823,46 +3825,70 @@ int32_t tDecodeSVGetTsmaExpWndsRsp(SDecoder *pCoder, SVGetTsmaExpWndsRsp *pReq) return 0; } -int32_t tEncodeSVDeleteReq(SEncoder *pCoder, const SVDeleteReq *pReq) { - if (tStartEncode(pCoder) < 0) return -1; +int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { + int32_t headLen = sizeof(SMsgHead); + if (buf != NULL) { + buf = (char *)buf + headLen; + bufLen -= headLen; + } - if (tEncodeI64(pCoder, pReq->delUid) < 0) return -1; - if (tEncodeI64(pCoder, pReq->tbUid) < 0) return -1; - if (tEncodeI8(pCoder, pReq->type) < 0) return -1; - if (tEncodeI16v(pCoder, pReq->nWnds) < 0) return -1; - if (tEncodeCStr(pCoder, pReq->tbFullName) < 0) return -1; - if (tEncodeCStr(pCoder, pReq->subPlan) < 0) return -1; - for (int16_t i = 0; i < pReq->nWnds; ++i) { - if (tEncodeI64(pCoder, pReq->wnds[i].skey) < 0) return -1; - if (tEncodeI64(pCoder, pReq->wnds[i].ekey) < 0) return -1; + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeU64(&encoder, pReq->sId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1; + if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1; + if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1; + if (tEncodeU32(&encoder, pReq->phyLen) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->msg) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + if (buf != NULL) { + SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); + pHead->vgId = htonl(pReq->header.vgId); + pHead->contLen = htonl(tlen + headLen); } - tEndEncode(pCoder); - return 0; + return tlen + headLen; } -int32_t tDecodeSVDeleteReq(SDecoder *pCoder, SVDeleteReq *pReq) { - if (tStartDecode(pCoder) < 0) return -1; +int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { + int32_t headLen = sizeof(SMsgHead); - if (tDecodeI64(pCoder, &pReq->delUid) < 0) return -1; - if (tDecodeI64(pCoder, &pReq->tbUid) < 0) return -1; - if (tDecodeI8(pCoder, &pReq->type) < 0) return -1; - if (tDecodeI16v(pCoder, &pReq->nWnds) < 0) return -1; - if (tDecodeCStr(pCoder, &pReq->tbFullName) < 0) return -1; - if (tDecodeCStr(pCoder, &pReq->subPlan) < 0) return -1; - for (int16_t i = 0; i < pReq->nWnds; ++i) { - if (tDecodeI64(pCoder, &pReq->wnds[i].skey) < 0) return -1; - if (tDecodeI64(pCoder, &pReq->wnds[i].ekey) < 0) return -1; - } + SMsgHead *pHead = buf; + pHead->vgId = pReq->header.vgId; + pHead->contLen = pReq->header.contLen; - tEndDecode(pCoder); + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->sId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1; + if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; + if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1; + if (tDecodeU32(&decoder, &pReq->phyLen) < 0) return -1; + pReq->sql = taosMemoryCalloc(1, pReq->sqlLen + 1); + if (NULL == pReq->sql) return -1; + pReq->msg = taosMemoryCalloc(1, pReq->phyLen + 1); + if (NULL == pReq->msg) return -1; + if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->msg) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); return 0; } int32_t tEncodeSVDeleteRsp(SEncoder *pCoder, const SVDeleteRsp *pReq) { if (tStartEncode(pCoder) < 0) return -1; - if (tEncodeI32(pCoder, pReq->code) < 0) return -1; if (tEncodeI64(pCoder, pReq->affectedRows) < 0) return -1; tEndEncode(pCoder); @@ -3872,7 +3898,6 @@ int32_t tEncodeSVDeleteRsp(SEncoder *pCoder, const SVDeleteRsp *pReq) { int32_t tDecodeSVDeleteRsp(SDecoder *pCoder, SVDeleteRsp *pReq) { if (tStartDecode(pCoder) < 0) return -1; - if (tDecodeI32(pCoder, &pReq->code) < 0) return -1; if (tDecodeI64(pCoder, &pReq->affectedRows) < 0) return -1; tEndDecode(pCoder); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 77109ab52f1f9e6067ff48ac6b70a40a221b37ab..ccc399605bd22132265b77a43dd44c4c5e506459 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -358,6 +358,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 45b88318c474dfd3f053bb8803e92be32defcecf..ebaf73a952283d0feb4f276b33912dd419becbf7 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -59,6 +59,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { pLoad->numOfProcessedFetch = stat.fetchProcessed; pLoad->numOfProcessedDrop = stat.dropProcessed; pLoad->numOfProcessedHb = stat.hbProcessed; + pLoad->numOfProcessedDelete = stat.deleteProcessed; return 0; } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index c6a6dd81473d75a319fb19ebc6bfa906eebbf6b0..64c811ca184893704e19b67b3e915b01dbc25d54 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -61,14 +61,14 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const if (tTagToValArray((const STag *)data, &pTagVals) != 0) { return -1; } - char key[512] = {0}; SIndexMultiTerm *terms = indexMultiTermCreate(); int16_t nCols = taosArrayGetSize(pTagVals); for (int i = 0; i < nCols; i++) { STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i); char type = pTagVal->type; - sprintf(key, "%s_%s", tagName, pTagVal->pKey); + + char * key = pTagVal->pKey; int32_t nKey = strlen(key); SIndexTerm *term = NULL; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 38482ccad56d5e6c544d65b0373c001df95aefed..2b5e18c1dbe71d9fdd5288f3db1d745926a49115 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -216,8 +216,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } // TODO wrap in destroy func - taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockDataLen); + taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree); if (rsp.withSchema) { taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); @@ -421,10 +421,20 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { - SStreamDispatchReq* pReq = pMsg->pCont; - int32_t taskId = pReq->taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SStreamDispatchReq req; + SDecoder decoder; + tDecoderInit(&decoder, msgBody, msgLen); + tDecodeStreamDispatchReq(&decoder, &req); + int32_t taskId = req.taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, &req, &rsp); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4bcd5fc095a5e71f5e87e7ee374993f557bf3000..3eee6713d86c471f89db6a3d9d870faa5563741c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -24,6 +24,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp); int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; @@ -142,6 +143,9 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp case TDMT_VND_SUBMIT: if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err; break; + case TDMT_VND_DELETE: + if (vnodeProcessWriteMsg(pVnode, version, pMsg, pRsp) < 0) goto _err; + break; /* TQ */ case TDMT_VND_MQ_VG_CHANGE: if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), @@ -256,6 +260,22 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { } } +int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) { + vTrace("message in write queue is processing"); + char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SDeleteRes res = {0}; + SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; + + switch (pMsg->msgType) { + case TDMT_VND_DELETE: + return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, pRsp, &res); + default: + vError("unknown msg type:%d in write queue", pMsg->msgType); + return TSDB_CODE_VND_APP_ERROR; + } +} + // TODO: remove the function void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // TODO @@ -873,4 +893,4 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void pRsp->contLen = 0; return 0; -} \ No newline at end of file +} diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 8f49440105c813b512835717e861d3da1b2065df..dead1aff7383a0f6da2b8d83d290bdd7a1be3a31 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -49,6 +49,7 @@ typedef struct SDataSinkHandle { } SDataSinkHandle; int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle); +int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam); #ifdef __cplusplus } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d2de831d70356b906e1f3ade5b8af30208824223..d61280150a2ff13513867b1eda4f97f5dfc302ee 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -870,6 +870,7 @@ int32_t decodeOperator(SOperatorInfo* ops, char* data, int32_t length); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); +int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, int32_t* resNum); diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c new file mode 100644 index 0000000000000000000000000000000000000000..33b7811e6c0c26032d1bca8cc8643dce2b0ea984 --- /dev/null +++ b/source/libs/executor/src/dataDeleter.c @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "dataSinkInt.h" +#include "dataSinkMgt.h" +#include "executorimpl.h" +#include "planner.h" +#include "tcompression.h" +#include "tdatablock.h" +#include "tglobal.h" +#include "tqueue.h" + +extern SDataSinkStat gDataSinkStat; + +typedef struct SDataDeleterBuf { + int32_t useSize; + int32_t allocSize; + char* pData; +} SDataDeleterBuf; + +typedef struct SDataCacheEntry { + int32_t dataLen; + int32_t numOfRows; + int32_t numOfCols; + int8_t compressed; + char data[]; +} SDataCacheEntry; + +typedef struct SDataDeleterHandle { + SDataSinkHandle sink; + SDataSinkManager* pManager; + SDataBlockDescNode* pSchema; + SDataDeleterNode* pDeleter; + SDeleterParam* pParam; + STaosQueue* pDataBlocks; + SDataDeleterBuf nextOutput; + int32_t status; + bool queryEnd; + uint64_t useconds; + uint64_t cachedSize; + TdThreadMutex mutex; +} SDataDeleterHandle; + +static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { + if (tsCompressColData < 0 || 0 == pData->info.rows) { + return false; + } + + for (int32_t col = 0; col < numOfCols; ++col) { + SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col); + int32_t colSize = pColRes->info.bytes * pData->info.rows; + if (NEEDTO_COMPRESS_QUERY(colSize)) { + return true; + } + } + + return false; +} + +static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) { + int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots); + + SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; + pEntry->compressed = 0; + pEntry->numOfRows = pInput->pData->info.rows; + pEntry->numOfCols = pInput->pData->info.numOfCols; + pEntry->dataLen = sizeof(SDeleterRes); + + ASSERT(1 == pEntry->numOfRows); + ASSERT(1 == pEntry->numOfCols); + + pBuf->useSize = sizeof(SDataCacheEntry); + + SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0); + + SDeleterRes* pRes = (SDeleterRes*)pEntry->data; + pRes->uid = pHandle->pDeleter->tableId; + pRes->uidList = pHandle->pParam->pUidList; + pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; + pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; + pRes->affectedRows = *(int64_t*)pColRes->pData; + + pBuf->useSize += pEntry->dataLen; + + atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); + atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); +} + +static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) { + uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery; + if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) { + qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity, + taosQueueItemSize(pDeleter->pDataBlocks)); + return false; + } + + pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes); + + pBuf->pData = taosMemoryMalloc(pBuf->allocSize); + if (pBuf->pData == NULL) { + qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); + } + + return NULL != pBuf->pData; +} + +static int32_t updateStatus(SDataDeleterHandle* pDeleter) { + taosThreadMutexLock(&pDeleter->mutex); + int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks); + int32_t status = + (0 == blockNums ? DS_BUF_EMPTY + : (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); + pDeleter->status = status; + taosThreadMutexUnlock(&pDeleter->mutex); + return status; +} + +static int32_t getStatus(SDataDeleterHandle* pDeleter) { + taosThreadMutexLock(&pDeleter->mutex); + int32_t status = pDeleter->status; + taosThreadMutexUnlock(&pDeleter->mutex); + return status; +} + +static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { + SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; + SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM); + if (NULL == pBuf || !allocBuf(pDeleter, pInput, pBuf)) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + toDataCacheEntry(pDeleter, pInput, pBuf); + taosWriteQitem(pDeleter->pDataBlocks, pBuf); + *pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false); + return TSDB_CODE_SUCCESS; +} + +static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { + SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; + taosThreadMutexLock(&pDeleter->mutex); + pDeleter->queryEnd = true; + pDeleter->useconds = useconds; + taosThreadMutexUnlock(&pDeleter->mutex); +} + +static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { + SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; + if (taosQueueEmpty(pDeleter->pDataBlocks)) { + *pQueryEnd = pDeleter->queryEnd; + *pLen = 0; + return; + } + + SDataDeleterBuf* pBuf = NULL; + taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); + memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf)); + taosFreeQitem(pBuf); + *pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen; + *pQueryEnd = pDeleter->queryEnd; + qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); +} + +static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { + SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; + if (NULL == pDeleter->nextOutput.pData) { + assert(pDeleter->queryEnd); + pOutput->useconds = pDeleter->useconds; + pOutput->precision = pDeleter->pSchema->precision; + pOutput->bufStatus = DS_BUF_EMPTY; + pOutput->queryEnd = pDeleter->queryEnd; + return TSDB_CODE_SUCCESS; + } + SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData); + memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); + pOutput->numOfRows = pEntry->numOfRows; + pOutput->numOfCols = pEntry->numOfCols; + pOutput->compressed = pEntry->compressed; + + atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen); + atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); + + taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent + pOutput->bufStatus = updateStatus(pDeleter); + taosThreadMutexLock(&pDeleter->mutex); + pOutput->queryEnd = pDeleter->queryEnd; + pOutput->useconds = pDeleter->useconds; + pOutput->precision = pDeleter->pSchema->precision; + taosThreadMutexUnlock(&pDeleter->mutex); + + return TSDB_CODE_SUCCESS; +} + +static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { + SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; + atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize); + taosMemoryFreeClear(pDeleter->nextOutput.pData); + while (!taosQueueEmpty(pDeleter->pDataBlocks)) { + SDataDeleterBuf* pBuf = NULL; + taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); + taosMemoryFreeClear(pBuf->pData); + taosFreeQitem(pBuf); + } + taosCloseQueue(pDeleter->pDataBlocks); + taosThreadMutexDestroy(&pDeleter->mutex); + return TSDB_CODE_SUCCESS; +} + +static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { + SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle; + + *size = atomic_load_64(&pDispatcher->cachedSize); + return TSDB_CODE_SUCCESS; +} + +int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam) { + SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle)); + if (NULL == deleter) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + SDataDeleterNode* pDeleterNode = (SDataDeleterNode *)pDataSink; + deleter->sink.fPut = putDataBlock; + deleter->sink.fEndPut = endPut; + deleter->sink.fGetLen = getDataLength; + deleter->sink.fGetData = getDataBlock; + deleter->sink.fDestroy = destroyDataSinker; + deleter->sink.fGetCacheSize = getCacheSize; + deleter->pManager = pManager; + deleter->pDeleter = pDeleterNode; + deleter->pSchema = pDataSink->pInputDataBlockDesc; + deleter->pParam = pParam; + deleter->status = DS_BUF_EMPTY; + deleter->queryEnd = false; + deleter->pDataBlocks = taosOpenQueue(); + taosThreadMutexInit(&deleter->mutex, NULL); + if (NULL == deleter->pDataBlocks) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + *pHandle = deleter; + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 080cf5c2ad44f31f11f0fce0e2350fe121c2c1fb..eaa366b7e75ea75d9cd32f1f99b71685feb6baa5 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -83,7 +83,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn pEntry->numOfCols = pInput->pData->info.numOfCols; pEntry->dataLen = 0; - pBuf->useSize = sizeof(SRetrieveTableRsp); + pBuf->useSize = sizeof(SDataCacheEntry); blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed); pBuf->useSize += pEntry->dataLen; @@ -100,7 +100,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, return false; } - pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pInput->pData); + pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData); pBuf->pData = taosMemoryMalloc(pBuf->allocSize); if (pBuf->pData == NULL) { @@ -211,7 +211,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { return TSDB_CODE_SUCCESS; } -int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { +static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; *size = atomic_load_64(&pDispatcher->cachedSize); diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 9016ca274a3567d8cbc45d522d5e1cb93b176e68..ffa9822e927fc10e42a43064e428ccbe45acd00f 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -34,9 +34,12 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat) { } -int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle) { - if (QUERY_NODE_PHYSICAL_PLAN_DISPATCH == nodeType(pDataSink)) { - return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); +int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle, void* pParam) { + switch (nodeType(pDataSink)) { + case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: + return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); + case QUERY_NODE_PHYSICAL_PLAN_DELETE: + return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam); } return TSDB_CODE_FAILED; } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 7757825733153741d6e83404051578f7f4e2aef8..c014b2395306c06184ab43875d04e93813ab427e 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -45,8 +45,15 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, if (code != TSDB_CODE_SUCCESS) { goto _error; } + if (handle) { - code = dsCreateDataSinker(pSubplan->pDataSink, handle); + void* pSinkParam = NULL; + code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); } _error: diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0d3c14589467c1a8ed1ebbd378790bf0d9567cc8..b986983d0ceeb42184b043fd53134a13d7f70cab 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5111,6 +5111,37 @@ int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length) { return TDB_CODE_SUCCESS; } +int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo) { + SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; + + switch (pNode->type) { + case QUERY_NODE_PHYSICAL_PLAN_DELETE: { + SDeleterParam *pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam)); + if (NULL == pDeleterParam) { + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList); + pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t)); + if (NULL == pDeleterParam->pUidList) { + taosMemoryFree(pDeleterParam); + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t i = 0; i < tbNum; ++i) { + STableKeyInfo *pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i); + taosArrayPush(pDeleterParam->pUidList, &pTable->uid); + } + + *pParam = pDeleterParam; + break; + } + default: + break; + } + + return TSDB_CODE_SUCCESS; +} + + int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model) { uint64_t queryId = pPlan->id.queryId; diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 24a4e99970692b202ab36fd1d1a83a45a09bcaa4..e3f9cb89ecd6eeb524061a7735df6161b30e195b 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -35,15 +35,15 @@ extern "C" { #endif // clang-format off -#define indexFatal(...) do { if (idxDebugFlag & DEBUG_FATAL) { taosPrintLog("INDEX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0) -#define indexError(...) do { if (idxDebugFlag & DEBUG_ERROR) { taosPrintLog("INDEX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0) -#define indexWarn(...) do { if (idxDebugFlag & DEBUG_WARN) { taosPrintLog("INDEX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0) -#define indexInfo(...) do { if (idxDebugFlag & DEBUG_INFO) { taosPrintLog("INDEX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0) -#define indexDebug(...) do { if (idxDebugFlag & DEBUG_DEBUG) { taosPrintLog("INDEX ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0) -#define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("INDEX ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0) +#define indexFatal(...) do { if (idxDebugFlag & DEBUG_FATAL) { taosPrintLog("IDX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0) +#define indexError(...) do { if (idxDebugFlag & DEBUG_ERROR) { taosPrintLog("IDX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0) +#define indexWarn(...) do { if (idxDebugFlag & DEBUG_WARN) { taosPrintLog("IDX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0) +#define indexInfo(...) do { if (idxDebugFlag & DEBUG_INFO) { taosPrintLog("IDX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0) +#define indexDebug(...) do { if (idxDebugFlag & DEBUG_DEBUG) { taosPrintLog("IDX ", DEBUG_DEBUG, idxDebugFlag, __VA_ARGS__);} } while (0) +#define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("IDX", DEBUG_TRACE, idxDebugFlag, __VA_ARGS__);} } while (0) // clang-format on -typedef enum { LT, LE, GT, GE } RangeType; +typedef enum { LT, LE, GT, GE, CONTAINS } RangeType; typedef enum { kTypeValue, kTypeDeletion } STermValueType; typedef struct SIndexStat { diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 4e7be245ef7fb0a4c383a0abf0b242ebbb46522c..1d8aa995be7e818e684b2604a42702847df6283e 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -90,7 +90,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe break; } CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); - if (0 == strcmp(c->colVal, pCt->colVal)) { + if (0 == strcmp(c->colVal, pCt->colVal) && strlen(pCt->colVal) == strlen(c->colVal)) { if (c->operaType == ADD_VALUE) { INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid) // taosArrayPush(result, &c->uid); @@ -123,12 +123,11 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt* if (cache == NULL) { return 0; } - - _cache_range_compare cmpFn = indexGetCompare(type); - MemTable* mem = cache; IndexCache* pCache = mem->pCache; + _cache_range_compare cmpFn = indexGetCompare(type); + CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); pCt->colVal = term->colVal; pCt->colType = term->colType; @@ -222,7 +221,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr return TSDB_CODE_SUCCESS; } static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { - return TSDB_CODE_SUCCESS; + return cacheSearchCompareFunc_JSON(cache, term, tr, s, CONTAINS); } static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return TSDB_CODE_SUCCESS; @@ -242,6 +241,9 @@ static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTR static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE); } +static int32_t cacheSearchContain_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { + return cacheSearchCompareFunc_JSON(cache, term, tr, s, CONTAINS); +} static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return TSDB_CODE_SUCCESS; } @@ -264,13 +266,20 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR int skip = 0; char* exBuf = NULL; - if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) { + if (type == CONTAINS) { + SIndexTerm tm = {.suid = term->suid, + .operType = term->operType, + .colType = term->colType, + .colName = term->colVal, + .nColName = term->nColVal}; + exBuf = indexPackJsonDataPrefix(&tm, &skip); + pCt->colVal = exBuf; + } else { exBuf = indexPackJsonDataPrefix(term, &skip); pCt->colVal = exBuf; } char* key = indexCacheTermGet(pCt); - // SSkipListIterator* iter = tSkipListCreateIter(mem->mem); SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); @@ -278,14 +287,19 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR break; } CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); - // printf("json val: %s\n", c->colVal); - if (0 != strncmp(c->colVal, pCt->colVal, skip)) { - break; + TExeCond cond = CONTINUE; + if (type == CONTAINS) { + if (0 == strncmp(c->colVal, pCt->colVal, skip)) { + cond = MATCH; + } + } else { + if (0 != strncmp(c->colVal, pCt->colVal, skip)) { + break; + } + char* p = taosMemoryCalloc(1, strlen(c->colVal) + 1); + memcpy(p, c->colVal, strlen(c->colVal)); + TExeCond cond = cmpFn(p + skip, term->colVal, dType); } - char* p = taosMemoryCalloc(1, strlen(c->colVal) + 1); - memcpy(p, c->colVal, strlen(c->colVal)); - - TExeCond cond = cmpFn(p + skip, term->colVal, dType); if (cond == MATCH) { if (c->operaType == ADD_VALUE) { INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid) @@ -299,7 +313,6 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR } else if (cond == BREAK) { break; } - taosMemoryFree(p); } taosMemoryFree(pCt); diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index 74b1773e877058a26bfa84afdf87cb2f73fa6aee..d30147d70f03949fa04617d747687594d1edbb89 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -97,6 +97,11 @@ static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { __compar_fn_t func = indexGetCompar(type); return tCompare(func, QUERY_GREATER_EQUAL, a, b, type); } + +static TExeCond tCompareContains(void* a, void* b, int8_t type) { + __compar_fn_t func = indexGetCompar(type); + return tCompare(func, QUERY_TERM, a, b, type); +} TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) { if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || dtype == TSDB_DATA_TYPE_VARBINARY) { return tDoCompare(func, cmptype, a, b); @@ -185,12 +190,14 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { case QUERY_TERM: { if (ret == 0) return MATCH; } + default: + return BREAK; } return CONTINUE; } -static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual, - tCompareGreaterThan, tCompareGreaterEqual}; +static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = { + tCompareLessThan, tCompareLessEqual, tCompareGreaterThan, tCompareGreaterEqual, tCompareContains}; _cache_range_compare indexGetCompare(RangeType ty) { return rangeCompare[ty]; } diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 9f4dddfd637e5919320228d36bb1b5a8cd6943e0..7c060fe10fdc7724ebf8d50c898a30e1d790e6b2 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -64,6 +64,8 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { *dst = QUERY_TERM; } else if (src == OP_TYPE_LIKE || src == OP_TYPE_MATCH || src == OP_TYPE_NMATCH) { *dst = QUERY_REGEX; + } else if (src == OP_TYPE_JSON_CONTAINS) { + *dst = QUERY_PREFIX; } else { return TSDB_CODE_QRY_INVALID_INPUT; } @@ -171,10 +173,8 @@ static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) { param->colId = l->colId; param->colValType = l->node.resType.type; memcpy(param->dbName, l->dbName, sizeof(l->dbName)); -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wformat-overflow" - sprintf(param->colName, "%s_%s", l->colName, r->literal); -#pragma GCC diagnostic pop + memcpy(param->colName, r->literal, strlen(r->literal)); + // sprintf(param->colName, "%s_%s", l->colName, r->literal); param->colValType = r->typeData; return 0; // memcpy(param->colName, l->colName, sizeof(l->colName)); @@ -186,6 +186,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) { SIF_ERR_RET(sifGetValueFromNode(node, ¶m->condValue)); param->colId = -1; param->colValType = (uint8_t)(vn->node.resType.type); + memcpy(param->colName, vn->literal, strlen(vn->literal)); break; } case QUERY_NODE_COLUMN: { @@ -237,7 +238,7 @@ static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx indexError("invalid operation node, left: %p, rigth: %p", node->pLeft, node->pRight); SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - if (node->opType == OP_TYPE_JSON_GET_VALUE || node->opType == OP_TYPE_JSON_CONTAINS) { + if (node->opType == OP_TYPE_JSON_GET_VALUE) { return code; } SIFParam *paramList = taosMemoryCalloc(nParam, sizeof(SIFParam)); @@ -420,8 +421,8 @@ static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output return sifDoIndex(left, right, id, output); } static int32_t sifJsonContains(SIFParam *left, SIFParam *right, SIFParam *output) { - // return 0 - return 0; + int id = OP_TYPE_JSON_CONTAINS; + return sifDoIndex(left, right, id, output); } static int32_t sifJsonGetValue(SIFParam *left, SIFParam *rigth, SIFParam *output) { // return 0 @@ -504,7 +505,7 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { output->status = SFLT_NOT_INDEX; return code; } - if (node->opType == OP_TYPE_JSON_GET_VALUE || node->opType == OP_TYPE_JSON_CONTAINS) { + if (node->opType == OP_TYPE_JSON_GET_VALUE) { return code; } SIFParam *params = NULL; @@ -618,11 +619,11 @@ EDealRes sifCalcWalker(SNode *node, void *context) { } if (QUERY_NODE_OPERATOR == nodeType(node)) { - indexInfo("node type for index filter, type: %d", nodeType(node)); + // indexInfo("node type for index filter, type: %d", nodeType(node)); return sifWalkOper(node, ctx); } - indexError("invalid node type for index filter calculating, type:%d", nodeType(node)); + // indexError("invalid node type for index filter calculating, type:%d", nodeType(node)); ctx->code = TSDB_CODE_QRY_INVALID_INPUT; return DEAL_RES_ERROR; } diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 53dd2923ac8c1f07b62098a3663c030016b46a72..34f685db3a9c046602780df03b27b233d199e230 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -425,8 +425,7 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { return TSDB_CODE_SUCCESS; } static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { - // impl later - return TSDB_CODE_SUCCESS; + return tfSearchCompareFunc_JSON(reader, tem, tr, CONTAINS); } static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { // impl later @@ -457,7 +456,17 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt int ret = 0; int skip = 0; - char* p = indexPackJsonDataPrefix(tem, &skip); + char* p = NULL; + if (ctype == CONTAINS) { + SIndexTerm tm = {.suid = tem->suid, + .operType = tem->operType, + .colType = tem->colType, + .colName = tem->colVal, + .nColName = tem->nColVal}; + p = indexPackJsonDataPrefix(&tm, &skip); + } else { + p = indexPackJsonDataPrefix(tem, &skip); + } _cache_range_compare cmpFn = indexGetCompare(ctype); @@ -466,27 +475,25 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX); FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx); - // FstSlice h = fstSliceCreate((uint8_t*)p, skip); - // fstStreamBuilderSetRange(sb, &h, ctype); - // fstSliceDestroy(&h); - StreamWithState* st = streamBuilderIntoStream(sb); StreamWithStateResult* rt = NULL; while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { FstSlice* s = &rt->data; - int32_t sz = 0; - char* ch = (char*)fstSliceData(s, &sz); - char* tmp = taosMemoryCalloc(1, sz + 1); - memcpy(tmp, ch, sz); - - if (0 != strncmp(tmp, p, skip)) { - swsResultDestroy(rt); - taosMemoryFree(tmp); - break; + int32_t sz = 0; + char* ch = (char*)fstSliceData(s, &sz); + TExeCond cond = CONTINUE; + if (ctype == CONTAINS) { + if (0 != strncmp(ch, p, skip)) { + cond = MATCH; + } + } else { + if (0 != strncmp(ch, p, skip)) { + swsResultDestroy(rt); + break; + } + cond = cmpFn(ch + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType)); } - - TExeCond cond = cmpFn(tmp + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType)); if (MATCH == cond) { tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); } else if (CONTINUE == cond) { @@ -494,7 +501,6 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt swsResultDestroy(rt); break; } - taosMemoryFree(tmp); swsResultDestroy(rt); } streamWithStateDestroy(st); diff --git a/source/libs/monitor/src/monMsg.c b/source/libs/monitor/src/monMsg.c index 944a7b54750c9e8850d0fe124f36561c54a6630e..a041b582a9eaafc0e8553262150c1a15e7edc469 100644 --- a/source/libs/monitor/src/monMsg.c +++ b/source/libs/monitor/src/monMsg.c @@ -569,6 +569,7 @@ int32_t tSerializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) { if (tEncodeI64(&encoder, pInfo->numOfProcessedFetch) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfProcessedDrop) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfProcessedHb) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->numOfProcessedDelete) < 0) return -1; if (tEncodeI64(&encoder, pInfo->cacheDataSize) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfQueryInQueue) < 0) return -1; if (tEncodeI64(&encoder, pInfo->numOfFetchInQueue) < 0) return -1; @@ -591,6 +592,7 @@ int32_t tDeserializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo) { if (tDecodeI64(&decoder, &pInfo->numOfProcessedFetch) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfProcessedDrop) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfProcessedHb) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->numOfProcessedDelete) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->cacheDataSize) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfQueryInQueue) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->numOfFetchInQueue) < 0) return -1; diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 0cc9bd712604efc84d09deb00c21acfc1b32529d..d13f17f8958cdc7c94f65ccbba3a7ba707c8749c 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -64,7 +64,7 @@ static SKeyword keywordTable[] = { {"CONSUMER", TK_CONSUMER}, {"COUNT", TK_COUNT}, {"CREATE", TK_CREATE}, - {"CONTAINS", TK_CONTAINS}, + {"CONTAINS", TK_CONTAINS}, {"DATABASE", TK_DATABASE}, {"DATABASES", TK_DATABASES}, {"DAYS", TK_DAYS}, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 1ba47d0d15d1d5d35829727df6d8e0fb6b230c01..d81c6760e8d0b351614e099d867568b2f448248f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5015,6 +5015,10 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { pQuery->haveResultSet = true; pQuery->msgType = TDMT_VND_QUERY; break; + case QUERY_NODE_DELETE_STMT: + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; + pQuery->msgType = TDMT_VND_DELETE; + break; case QUERY_NODE_VNODE_MODIF_STMT: pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->msgType = toMsgType(((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 785f112003ef9684af9162d1a43bb12505f3b72d..6b3d82b64493c3221161a4fcb74849cf518e65f8 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1395,6 +1395,7 @@ static int32_t buildDeleteSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode if (TSDB_CODE_SUCCESS == code) { code = createDataDeleter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink); } + pSubplan->msgType = TDMT_VND_DELETE; return code; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index f8d240c7b2d2162800cbc32ee7af2eeb62645d89..1921b1638875c218a417bb18b8da1dfb7ffd48a5 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -88,7 +88,7 @@ int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstream } int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) { - if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType) { + if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) { SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink; *pLen = insert->size; *pStr = insert->pData; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 082db6428f79575cf29a635a1965f214893d22a5..1d31c86308e749fab5271ccbdb81090c801c9277 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -160,6 +160,7 @@ typedef struct SQWMsgStat { uint64_t cancelProcessed; uint64_t dropProcessed; uint64_t hbProcessed; + uint64_t deleteProcessed; } SQWMsgStat; typedef struct SQWRTStat { @@ -357,6 +358,7 @@ int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); void qwClearExpiredSch(SArray* pExpiredSch); int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); +void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 5a8a45cd5150bc5e2aa009f2f3690e8f98a14c05..29861d87ac8957a8ad3e593e796b57a66b5b8eb6 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -30,6 +30,7 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); +int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes *pRes); int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 2f1ecc51723a4799071292f5d7eb8d3b3d2f65ab..848a0420cadc01df68ab051949d2fadc9e7d7990 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -300,13 +300,6 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = msg->sId; - msg->queryId = msg->queryId; - msg->taskId = msg->taskId; - msg->refId = msg->refId; - msg->phyLen = msg->phyLen; - msg->sqlLen = msg->sqlLen; - uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; @@ -523,3 +516,37 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ return TSDB_CODE_SUCCESS; } + + +int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == pRsp) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + int32_t code = 0; + SVDeleteReq req = {0}; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + + QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1); + + tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req); + + uint64_t sId = req.sId; + uint64_t qId = req.queryId; + uint64_t tId = req.taskId; + int64_t rId = 0; + + SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info}; + QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql); + taosMemoryFreeClear(req.sql); + + QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRsp, pRes)); + + QW_SCH_TASK_DLOG("processDelete end, node:%p", node); + +_return: + + QW_RET(code); +} + + diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 8bfb80f0616e223f96eafcd34e8d89768ffb98ca..667008e68e70451f8a4394ab2a792d179f09497f 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -290,8 +290,11 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { QW_RET(code); } -void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { - tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); +void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + if (ctx->ctrlConnInfo.handle) { + tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); + } + ctx->ctrlConnInfo.handle = NULL; ctx->ctrlConnInfo.refId = -1; @@ -333,7 +336,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } - qwFreeTask(QW_FPARAMS(), &octx); + qwFreeTaskCtx(QW_FPARAMS(), &octx); QW_TASK_DLOG_E("task ctx dropped"); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 44a8fdf7f4e29c6ad8ee0e8dc5b26612bc1e665f..333884f883d8ee15e1432a0c2bee179d6c3c5f72 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -183,7 +183,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) return TSDB_CODE_SUCCESS; } -int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { +int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { int32_t len = 0; SRetrieveTableRsp *rsp = NULL; bool queryEnd = false; @@ -242,6 +242,53 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void return TSDB_CODE_SUCCESS; } +int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SDeleteRes *pRes) { + int32_t len = 0; + SVDeleteRsp rsp = {0}; + bool queryEnd = false; + int32_t code = 0; + SOutputData output = {0}; + + dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); + + if (len <= 0 || len != sizeof(SDeleterRes)) { + QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + output.pData = taosMemoryCalloc(1, len); + if (NULL == output.pData) { + QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + code = dsGetDataBlock(ctx->sinkHandle, &output); + if (code) { + QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); + taosMemoryFree(output.pData); + QW_ERR_RET(code); + } + + SDeleterRes* pDelRes = (SDeleterRes*)output.pData; + + rsp.affectedRows = pDelRes->affectedRows; + pRes->uid = pDelRes->uid; + pRes->uidList = pDelRes->uidList; + pRes->skey = pDelRes->skey; + pRes->ekey = pDelRes->ekey; + + SEncoder coder = {0}; + tEncodeSize(tEncodeSVDeleteRsp, &rsp, len, code); + void *msg = rpcMallocCont(len); + tEncoderInit(&coder, msg, len); + tEncodeSVDeleteRsp(&coder, &rsp); + tEncoderClear(&coder); + + *rspMsg = msg; + *dataLen = len; + + return TSDB_CODE_SUCCESS; +} + int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; @@ -547,7 +594,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; - QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { QW_TASK_DLOG("task not end and buf is %s, need to continue query", qwBufStatusStr(sOutput.bufStatus)); @@ -620,7 +667,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); SOutputData sOutput = {0}; - QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if (NULL == rsp) { ctx->dataConnInfo = qwMsg->connInfo; @@ -875,6 +922,47 @@ _return: qwRelease(refId); } +int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes *pRes) { + int32_t code = 0; + SSubplan *plan = NULL; + qTaskInfo_t pTaskInfo = NULL; + DataSinkHandle sinkHandle = NULL; + SQWTaskCtx ctx = {0}; + + code = qStringToSubplan(qwMsg->msg, &plan); + if (TSDB_CODE_SUCCESS != code) { + code = TSDB_CODE_INVALID_MSG; + QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_JRET(code); + } + + ctx.plan = plan; + + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH); + if (code) { + QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_JRET(code); + } + + if (NULL == sinkHandle || NULL == pTaskInfo) { + QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + ctx.taskHandle = pTaskInfo; + ctx.sinkHandle = sinkHandle; + + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL)); + + QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, &pRsp->contLen, &pRsp->pCont, pRes)); + +_return: + + qwFreeTaskCtx(QW_FPARAMS(), &ctx); + + QW_RET(TSDB_CODE_SUCCESS); +} + int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) { @@ -1007,6 +1095,7 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt pStat->fetchProcessed = QW_STAT_GET(mgmt->stat.msgStat.fetchProcessed); pStat->dropProcessed = QW_STAT_GET(mgmt->stat.msgStat.dropProcessed); pStat->hbProcessed = QW_STAT_GET(mgmt->stat.msgStat.hbProcessed); + pStat->deleteProcessed = QW_STAT_GET(mgmt->stat.msgStat.deleteProcessed); pStat->numOfQueryInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, QUERY_QUEUE); pStat->numOfFetchInQueue = handle->pMsgCb->qsizeFp(handle->pMsgCb->mgmt, mgmt->nodeId, FETCH_QUEUE); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 7a0272a4d0009148662e8401e3fa544f4b46b5ee..09f2efc7e2bbe51b2389823f86f0c7391236e396 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -1476,7 +1476,7 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_ SSchJob *pJob = NULL; SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync)); - qDebug("QID:0x%" PRIx64 " jobId:0x%"PRIx64 " started", pDag->queryId, pJob->refId); + qDebug("QID:0x%" PRIx64 " job refId 0x%"PRIx64 " started", pDag->queryId, pJob->refId); *job = pJob->refId; SCH_ERR_JRET(schLaunchJob(pJob)); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index bf51d8d631bfff7501ba40fea396b570e14c5c90..e40db48401426d8bf9cef8f53c3e5ecb9bc4b0c0 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -62,10 +62,11 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy case TDMT_VND_DROP_TABLE_RSP: case TDMT_VND_ALTER_TABLE_RSP: case TDMT_VND_SUBMIT_RSP: + case TDMT_VND_DELETE_RSP: break; default: SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus)); - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + SCH_ERR_RET(TSDB_CODE_INVALID_MSG); } if (lastMsgType != reqMsgType) { @@ -227,6 +228,25 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } + case TDMT_VND_DELETE_RSP: { + SCH_ERR_JRET(rspCode); + + if (msg) { + SDecoder coder = {0}; + SVDeleteRsp rsp = {0}; + tDecoderInit(&coder, msg, msgSize); + tDecodeSVDeleteRsp(&coder, &rsp); + + atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows); + SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows); + } + + taosMemoryFreeClear(msg); + + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + + break; + } case TDMT_VND_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)msg; @@ -411,6 +431,10 @@ int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); } +int32_t schHandleDeleteCallback(void *param, const SDataBuf *pMsg, int32_t code) { + return schHandleCallback(param, pMsg, TDMT_VND_DELETE_RSP, code); +} + int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code); } @@ -501,6 +525,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { case TDMT_VND_QUERY: *fp = schHandleQueryCallback; break; + case TDMT_VND_DELETE: + *fp = schHandleDeleteCallback; + break; case TDMT_VND_EXPLAIN: *fp = schHandleExplainCallback; break; @@ -982,6 +1009,26 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, break; } + case TDMT_VND_DELETE: { + SVDeleteReq req = {0}; + req.header.vgId = addr->nodeId; + req.sId = schMgmt.sId; + req.queryId = pJob->queryId; + req.taskId = pTask->taskId; + req.phyLen = pTask->msgLen; + req.sqlLen = strlen(pJob->sql); + req.sql = (char*)pJob->sql; + req.msg = pTask->msg; + msgSize = tSerializeSVDeleteReq(NULL, 0, &req); + msg = taosMemoryCalloc(1, msgSize); + if (NULL == msg) { + SCH_TASK_ELOG("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + tSerializeSVDeleteReq(msg, msgSize, &req); + break; + } case TDMT_VND_QUERY: { SCH_ERR_RET(schMakeQueryRpcCtx(pJob, pTask, &rpcCtx)); diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 18398802dbd2ed0c251bd93a1fcafc56363d797e..38c03c74d9617f037591d52574a6b6bf04831d0c 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -257,6 +257,8 @@ void schFreeRpcCtx(SRpcCtx *pCtx) { taosHashCleanup(pCtx->args); - (*pCtx->freeFunc)(pCtx->brokenVal.val); + if (pCtx->freeFunc) { + (*pCtx->freeFunc)(pCtx->brokenVal.val); + } } diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 604539f16a8061ec20873b50d7cd14ab8a25b366..48c43b0775391cda99daff9ad0e7d582ddd51aee 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -25,6 +25,9 @@ extern "C" { int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb); +int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data); + +int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); #ifdef __cplusplus } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d89f2ed57d5289997b67387ec7aafd761cb1fd8d..91ede155bb5157d7d60fc84adecc79bb16124e13 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -35,18 +35,19 @@ int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { return 0; } -#if 1 int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - SStreamDataBlock* pBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); int8_t status; // enqueue - if (pBlock != NULL) { - pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK; - pBlock->sourceVg = pReq->sourceVg; - pBlock->blocks = pReq->data; + if (pData != NULL) { + pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK; + pData->sourceVg = pReq->sourceVg; + // decode + /*pData->blocks = pReq->data;*/ /*pBlock->sourceVer = pReq->sourceVer;*/ - if (streamTaskInput(pTask, (SStreamQueueItem*)pBlock) == 0) { + streamDispatchReqToData(pReq, pData); + if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { status = TASK_INPUT_STATUS__FAILED; @@ -57,16 +58,17 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* } // rsp by input status - SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp)); + void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); + ((SMsgHead*)buf)->vgId = htonl(pReq->sourceVg); + SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); pCont->inputStatus = status; pCont->streamId = pReq->streamId; pCont->taskId = pReq->sourceTaskId; - pRsp->pCont = pCont; - pRsp->contLen = sizeof(SStreamDispatchRsp); + pRsp->pCont = buf; + pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } -#endif int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { // 1. handle input @@ -87,8 +89,12 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp } int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) { + ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED); + int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); + ASSERT(old == TASK_OUTPUT_STATUS__WAIT); if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { // TODO: init recover timer + return 0; } // continue dispatch streamSink1(pTask, pMsgCb); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 95c0290058743d1837b9ad2a433c328d97210e19..7139e77407ba59ec643108cb42a7a365f1880347 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -36,6 +36,29 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) { } #endif +int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { + int32_t blockNum = pReq->blockNum; + SArray* pArray = taosArrayInit(blockNum, sizeof(SSDataBlock)); + if (pArray == NULL) { + return -1; + } + taosArraySetSize(pArray, blockNum); + + ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data)); + ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen)); + + for (int32_t i = 0; i < blockNum; i++) { + int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i); + SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i); + SSDataBlock* pDataBlock = taosArrayGet(pArray, i); + blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data); + // TODO: refactor + pDataBlock->info.childId = pReq->sourceChildId; + } + pData->blocks = pArray; + return 0; +} + SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); if (pDataSubmit == NULL) return NULL; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 4b88cf503e19976168a689f2f921d5bb6ea33564..72df516e0d63222e6ebab82a086b9116e7c62432 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -43,6 +43,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) if (output == NULL) break; // TODO: do we need free memory? SSDataBlock* outputCopy = createOneDataBlock(output, true); + outputCopy->info.childId = pTask->childId; taosArrayPush(pRes, outputCopy); } return 0; diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 81f18fea8d49b8cb5b70fa75cebd751d1a5da878..e5f953b7cc4e2d2c99a124fc49ee06ff8463ceea 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tstream.h" +#include "streamInc.h" int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -32,7 +32,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p if (tEncodeBinary(pEncoder, data, len) < 0) return -1; } tEndEncode(pEncoder); - return 0; + return pEncoder->pos; } int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { @@ -60,14 +60,169 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { return 0; } -int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { +static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + void* buf = taosMemoryCalloc(1, dataStrLen); + if (buf == NULL) return -1; + + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; + pRetrieve->useconds = 0; + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->numOfRows = htonl(pBlock->info.rows); + pRetrieve->numOfCols = htonl(pBlock->info.numOfCols); + + int32_t actualLen = 0; + blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false); + actualLen += sizeof(SRetrieveTableRsp); + ASSERT(actualLen <= dataStrLen); + taosArrayPush(pReq->dataLen, &actualLen); + taosArrayPush(pReq->data, &buf); + + return 0; +} + +int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { + void* buf = NULL; + int32_t code = -1; + int32_t blockNum = taosArrayGetSize(data->blocks); + ASSERT(blockNum != 0); + SStreamDispatchReq req = { .streamId = pTask->streamId, - .data = data, + .sourceTaskId = pTask->taskId, + .sourceVg = data->sourceVg, + .sourceChildId = pTask->childId, + .blockNum = blockNum, }; + + req.data = taosArrayInit(blockNum, sizeof(void*)); + req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); + if (req.data == NULL || req.dataLen == NULL) { + goto FAIL; + } + for (int32_t i = 0; i < blockNum; i++) { + SSDataBlock* pDataBlock = taosArrayGet(data->blocks, i); + if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) { + goto FAIL; + } + } + int32_t vgId = 0; + int32_t downstreamTaskId = 0; + // find ep + if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + vgId = pTask->fixedEpDispatcher.nodeId; + *ppEpSet = &pTask->fixedEpDispatcher.epSet; + downstreamTaskId = pTask->fixedEpDispatcher.taskId; + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + // TODO get ctbName + char ctbName[TSDB_TABLE_FNAME_LEN + 22] = {0}; + SSDataBlock* pBlock = taosArrayGet(data->blocks, 0); + sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId); + // get vg and ep + // TODO: get hash function by hashMethod + + // get groupId, compute hash value + uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); + + // get node + // TODO: optimize search process + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(vgInfo); + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { + vgId = pVgInfo->vgId; + downstreamTaskId = pVgInfo->taskId; + *ppEpSet = &pVgInfo->epSet; + break; + } + } + ASSERT(vgId != 0); + } + + req.taskId = downstreamTaskId; + + // serialize + int32_t tlen; + tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code); + if (code < 0) goto FAIL; + code = -1; + buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + goto FAIL; + } + + ((SMsgHead*)buf)->vgId = htonl(vgId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamDispatchReq(&encoder, &req)) < 0) { + goto FAIL; + } + tEncoderClear(&encoder); + + pMsg->contLen = tlen + sizeof(SMsgHead); + pMsg->pCont = buf; + pMsg->msgType = pTask->dispatchMsgType; + + code = 0; +FAIL: + if (code < 0 && buf) rpcFreeCont(buf); + if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree); + if (req.dataLen) taosArrayDestroy(req.dataLen); + return code; +} + +int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) { +#if 0 + int8_t old = + atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); + if (old != TASK_OUTPUT_STATUS__NORMAL) { + return 0; + } +#endif + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + SRpcMsg dispatchMsg = {0}; + if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) { + ASSERT(0); + return -1; + } + + int32_t qType; + if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) { + qType = FETCH_QUEUE; + } else if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH_WRITE) { + qType = WRITE_QUEUE; + } else { + ASSERT(0); + } + tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet = NULL; + if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); + return -1; + } + + tmsgSendReq(pEpSet, &dispatchMsg); + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet = NULL; + if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); + return -1; + } + + tmsgSendReq(pEpSet, &dispatchMsg); + } return 0; } +#if 0 static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { SStreamTaskExecReq req = { .streamId = pTask->streamId, @@ -148,3 +303,4 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb } return 0; } +#endif diff --git a/source/libs/stream/src/streamSink.c b/source/libs/stream/src/streamSink.c index 6acdd1064c7cfcc61684c04bddb0a16aa62a178b..35bebe0e63e15aabd0d74a8bacdc9a134037ce78 100644 --- a/source/libs/stream/src/streamSink.c +++ b/source/libs/stream/src/streamSink.c @@ -13,8 +13,7 @@ * along with this program. If not, see . */ -#include "executor.h" -#include "tstream.h" +#include "streamInc.h" int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) { SStreamQueue* queue; @@ -23,12 +22,13 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) { } else { queue = pTask->outputQueue; } + /*if (streamDequeueBegin(queue) == true) {*/ /*return -1;*/ /*}*/ - if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA) { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA || + pTask->dispatchType != TASK_DISPATCH__NONE) { while (1) { SStreamDataBlock* pBlock = streamQueueNextItem(queue); if (pBlock == NULL) break; @@ -36,17 +36,19 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) { // local sink if (pTask->sinkType == TASK_SINK__TABLE) { + ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); } else if (pTask->sinkType == TASK_SINK__SMA) { + ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks); } - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - ASSERT(queue == pTask->outputQueue); - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - ASSERT(queue == pTask->outputQueue); - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + // TODO: sink and dispatch should be only one + if (pTask->dispatchType != TASK_DISPATCH__NONE) { ASSERT(queue == pTask->outputQueue); + ASSERT(pTask->sinkType == TASK_SINK__NONE); + + streamDispatch(pTask, pMsgCb, pBlock); } streamQueueProcessSuccess(queue); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0090701ba567378c4e2aab0869a757663b9d289e..e62a73b7fa2e4b8de3c89956ad3956231a6e92cb 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -338,8 +338,7 @@ void cliHandleResp(SCliConn* conn) { return; } - int ret = cliAppCb(conn, &transMsg, pMsg); - if (ret != 0) { + if (cliAppCb(conn, &transMsg, pMsg) != 0) { tTrace("try to send req to next node"); return; } @@ -403,15 +402,13 @@ void cliHandleExcept(SCliConn* pConn) { continue; } } - int ret = cliAppCb(pConn, &transMsg, pMsg); - if (ret != 0) { + if (cliAppCb(pConn, &transMsg, pMsg) != 0) { tTrace("try to send req to next node"); return; } destroyCmsg(pMsg); tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); } while (!transQueueEmpty(&pConn->cliMsgs)); - transUnrefCliHandle(pConn); } @@ -976,7 +973,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { arg->param1 = pMsg; arg->param2 = pThrd; transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); - cliDestroyConn(pConn, true); + transUnrefCliHandle(pConn); return -1; } } else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {