diff --git a/cmake/cmake.define b/cmake/cmake.define
index 1500858d9f541a061803c7bc76c514927813d5a2..f55a9bdabc79e31f129b2184144c9472572d5454 100644
--- a/cmake/cmake.define
+++ b/cmake/cmake.define
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.0)
-set(CMAKE_VERBOSE_MAKEFILE ON)
+set(CMAKE_VERBOSE_MAKEFILE OFF)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory
diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in
index 7d9f49d3fa20874b19c6f4e4e318c0a9da26d4e9..ba4a404af690b5762985532e801592a679799716 100644
--- a/cmake/rocksdb_CMakeLists.txt.in
+++ b/cmake/rocksdb_CMakeLists.txt.in
@@ -1,8 +1,8 @@
# rocksdb
ExternalProject_Add(rocksdb
- GIT_REPOSITORY https://github.com/taosdata-contrib/rocksdb.git
- GIT_TAG v6.23.3
+ GIT_REPOSITORY https://github.com/facebook/rocksdb.git
+ GIT_TAG v8.1.1
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in
index 4a8f4864b301101fd04214674069e8c3196ba49d..c67918351de7b25d18403314886bd5227f8fa6c6 100644
--- a/cmake/taosadapter_CMakeLists.txt.in
+++ b/cmake/taosadapter_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
- GIT_TAG ae8d51c
+ GIT_TAG 565ca21
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/docs/en/12-taos-sql/01-data-type.md b/docs/en/12-taos-sql/01-data-type.md
index 204713f97135ee0c1b6219a53adb55ac879b56fa..cca256139d409ac009bb188ccb377bffc0dd42ef 100644
--- a/docs/en/12-taos-sql/01-data-type.md
+++ b/docs/en/12-taos-sql/01-data-type.md
@@ -24,24 +24,24 @@ CREATE DATABASE db_name PRECISION 'ns';
In TDengine, the data types below can be used when specifying a column or tag.
-| # | **type** | **Bytes** | **Description** |
-| --- | :--------------: | ------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| 1 | TIMESTAMP | 8 | Default precision is millisecond, microsecond and nanosecond are also supported. |
-| 2 | INT | 4 | Integer, the value range is [-2^31, 2^31-1]. |
-| 3 | INT UNSIGNED | 4 | Unsigned integer, the value range is [0, 2^32-1]. |
-| 4 | BIGINT | 8 | Long integer, the value range is [-2^63, 2^63-1]. |
-| 5 | BIGINT UNSIGNED | 8 | unsigned long integer, the value range is [0, 2^64-1]. |
-| 6 | FLOAT | 4 | Floating point number, the effective number of digits is 6-7, the value range is [-3.4E38, 3.4E38]. |
-| 7 | DOUBLE | 8 | Double precision floating point number, the effective number of digits is 15-16, the value range is [-1.7E308, 1.7E308]. |
-| 8 | BINARY | User Defined | Single-byte string for ASCII visible characters. Length must be specified when defining a column or tag of binary type. |
-| 9 | SMALLINT | 2 | Short integer, the value range is [-32768, 32767]. |
-| 10 | INT UNSIGNED | 2 | unsigned integer, the value range is [0, 65535]. |
-| 11 | TINYINT | 1 | Single-byte integer, the value range is [-128, 127]. |
-| 12 | TINYINT UNSIGNED | 1 | unsigned single-byte integer, the value range is [0, 255]. |
-| 13 | BOOL | 1 | Bool, the value range is {true, false}. |
-| 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. |
-| 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. |
-| 16 | VARCHAR | User-defined | Alias of BINARY |
+| # | **type** | **Bytes** | **Description** |
+| --- | :---------------: | ------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| 1 | TIMESTAMP | 8 | Default precision is millisecond, microsecond and nanosecond are also supported. |
+| 2 | INT | 4 | Integer, the value range is [-2^31, 2^31-1]. |
+| 3 | INT UNSIGNED | 4 | Unsigned integer, the value range is [0, 2^32-1]. |
+| 4 | BIGINT | 8 | Long integer, the value range is [-2^63, 2^63-1]. |
+| 5 | BIGINT UNSIGNED | 8 | unsigned long integer, the value range is [0, 2^64-1]. |
+| 6 | FLOAT | 4 | Floating point number, the effective number of digits is 6-7, the value range is [-3.4E38, 3.4E38]. |
+| 7 | DOUBLE | 8 | Double precision floating point number, the effective number of digits is 15-16, the value range is [-1.7E308, 1.7E308]. |
+| 8 | BINARY | User Defined | Single-byte string for ASCII visible characters. Length must be specified when defining a column or tag of binary type. |
+| 9 | SMALLINT | 2 | Short integer, the value range is [-32768, 32767]. |
+| 10 | SMALLINT UNSIGNED | 2 | unsigned integer, the value range is [0, 65535]. |
+| 11 | TINYINT | 1 | Single-byte integer, the value range is [-128, 127]. |
+| 12 | TINYINT UNSIGNED | 1 | unsigned single-byte integer, the value range is [0, 255]. |
+| 13 | BOOL | 1 | Bool, the value range is {true, false}. |
+| 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. |
+| 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. |
+| 16 | VARCHAR | User-defined | Alias of BINARY |
:::note
diff --git a/include/client/taos.h b/include/client/taos.h
index d9fd1ca1b81e72b4d08150422a49c7e3051f7f89..8811c4ab64e3ae65085c26ccd791705a98541423 100644
--- a/include/client/taos.h
+++ b/include/client/taos.h
@@ -310,6 +310,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
+DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
/* ------------------------------ TAOSX -----------------------------------*/
// note: following apis are unstable
diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h
index 63e9e3799a4f418b792ec4a61182b12bce30b21c..5b125b42d4bc67ed6625d38f9baf4fb356ad041e 100644
--- a/include/libs/stream/streamState.h
+++ b/include/libs/stream/streamState.h
@@ -20,13 +20,13 @@
#include "tsimplehash.h"
#include "tstreamFileState.h"
+#ifndef _STREAM_STATE_H_
+#define _STREAM_STATE_H_
+
#ifdef __cplusplus
extern "C" {
#endif
-#ifndef _STREAM_STATE_H_
-#define _STREAM_STATE_H_
-
// void* streamBackendInit(const char* path);
// void streamBackendCleanup(void* arg);
// SListNode* streamBackendAddCompare(void* backend, void* arg);
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 865977d62b212d5e621ab3fce05ca7dbedd979d4..c7e55650cd7a422b2af9680a79660e64328deeb0 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -13,16 +13,13 @@
* along with this program. If not, see .
*/
-#include "executor.h"
#include "os.h"
-#include "query.h"
#include "streamState.h"
#include "tdatablock.h"
#include "tdbInt.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tqueue.h"
-#include "trpc.h"
#ifdef __cplusplus
extern "C" {
@@ -340,7 +337,7 @@ typedef struct SStreamMeta {
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
- SArray* pTaskList; // SArray
+ SArray* pTaskList; // SArray
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
@@ -568,6 +565,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
+void streamMetaInit();
+void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c
index c8f3feb2d425858a76779cd2b94fcede72849e39..cae5c8715daafff8f58f82579af2577b9a6a323b 100644
--- a/source/client/src/clientEnv.c
+++ b/source/client/src/clientEnv.c
@@ -449,6 +449,7 @@ static void *tscCrashReportThreadFp(void *param) {
tscError("failed to send crash report");
if (pFile) {
taosReleaseCrashLogFile(pFile, false);
+ pFile = NULL;
continue;
}
} else {
@@ -468,6 +469,7 @@ static void *tscCrashReportThreadFp(void *param) {
if (pFile) {
taosReleaseCrashLogFile(pFile, truncateFile);
+ pFile = NULL;
truncateFile = false;
}
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index 10c42bb67d2961ac0530333fbb1e9fd4a2294062..7c05b2f50cd4b1483b1a5ee2a57cda0d8cb6a4ba 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -24,6 +24,8 @@ typedef struct {
struct {
int64_t clusterId;
int32_t passKeyCnt;
+ int32_t passVer;
+ int32_t reqCnt;
};
};
} SHbParam;
@@ -536,14 +538,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_SUCCESS;
}
-static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
+static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (!pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_APP_ERROR;
}
- int32_t code = 0;
+ int32_t code = 0;
+
+ if (param && (param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) {
+ tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver);
+ goto _return;
+ }
+
SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion));
if (!user) {
code = TSDB_CODE_OUT_OF_MEMORY;
@@ -570,6 +578,11 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
goto _return;
}
+ // assign the passVer
+ if (param) {
+ param->passVer = pTscObj->passInfo.ver;
+ }
+
_return:
releaseTscObj(connKey->tscRid);
if (code) {
@@ -714,13 +727,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
}
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
- SHbParam *hbParam = (SHbParam *)param;
- struct SCatalog *pCatalog = NULL;
-
- int32_t code = catalogGetHandle(hbParam->clusterId, &pCatalog);
- if (code != TSDB_CODE_SUCCESS) {
- tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
- return code;
+ int32_t code = 0;
+ SHbParam *hbParam = (SHbParam *)param;
+ SCatalog *pCatalog = NULL;
+
+ if (hbParam->reqCnt == 0) {
+ code = catalogGetHandle(hbParam->clusterId, &pCatalog);
+ if (code != TSDB_CODE_SUCCESS) {
+ tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
+ return code;
+ }
}
hbGetAppInfo(hbParam->clusterId, req);
@@ -728,24 +744,28 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
hbGetQueryBasicInfo(connKey, req);
if (hbParam->passKeyCnt > 0) {
- hbGetUserBasicInfo(connKey, req);
+ hbGetUserBasicInfo(connKey, hbParam, req);
}
- code = hbGetExpiredUserInfo(connKey, pCatalog, req);
- if (TSDB_CODE_SUCCESS != code) {
- return code;
- }
+ if (hbParam->reqCnt == 0) {
+ code = hbGetExpiredUserInfo(connKey, pCatalog, req);
+ if (TSDB_CODE_SUCCESS != code) {
+ return code;
+ }
- code = hbGetExpiredDBInfo(connKey, pCatalog, req);
- if (TSDB_CODE_SUCCESS != code) {
- return code;
- }
+ code = hbGetExpiredDBInfo(connKey, pCatalog, req);
+ if (TSDB_CODE_SUCCESS != code) {
+ return code;
+ }
- code = hbGetExpiredStbInfo(connKey, pCatalog, req);
- if (TSDB_CODE_SUCCESS != code) {
- return code;
+ code = hbGetExpiredStbInfo(connKey, pCatalog, req);
+ if (TSDB_CODE_SUCCESS != code) {
+ return code;
+ }
}
+ ++hbParam->reqCnt; // success to get catalog info
+
return TSDB_CODE_SUCCESS;
}
@@ -766,55 +786,47 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
}
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
-
- int64_t rid = -1;
- int32_t code = 0;
-
- void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
-
- SClientHbReq *pOneReq = pIter;
- SClientHbKey *connKey = pOneReq ? &pOneReq->connKey : NULL;
- if (connKey != NULL) rid = connKey->tscRid;
-
- STscObj *pTscObj = (STscObj *)acquireTscObj(rid);
- if (pTscObj == NULL) {
+ if (!pBatchReq->reqs) {
tFreeClientHbBatchReq(pBatchReq);
return NULL;
}
- while (pIter != NULL) {
+ void *pIter = NULL;
+ SHbParam param = {0};
+ while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
+ SClientHbReq *pOneReq = pIter;
+ SClientHbKey *connKey = &pOneReq->connKey;
+ STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
+
+ if (!pTscObj) {
+ continue;
+ }
+
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
- SHbParam param;
- switch (pOneReq->connKey.connType) {
+
+ switch (connKey->connType) {
case CONN_TYPE__QUERY: {
- param.clusterId = pOneReq->clusterId;
+ if (param.clusterId == 0) {
+ // init
+ param.clusterId = pOneReq->clusterId;
+ param.passVer = INT32_MIN;
+ }
param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt);
break;
}
default:
break;
}
- if (clientHbMgr.reqHandle[pOneReq->connKey.connType]) {
- code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, ¶m, pOneReq);
+ if (clientHbMgr.reqHandle[connKey->connType]) {
+ int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, ¶m, pOneReq);
if (code) {
tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code),
- pOneReq->connKey.tscRid, pOneReq->connKey.connType);
+ connKey->tscRid, connKey->connType);
}
}
- break;
-#if 0
- if (code) {
- pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
- pOneReq = pIter;
- continue;
- }
-
- pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
- pOneReq = pIter;
-#endif
+ releaseTscObj(connKey->tscRid);
}
- releaseTscObj(rid);
return pBatchReq;
}
@@ -885,7 +897,6 @@ static void *hbThreadFunc(void *param) {
hbGatherAppInfo();
}
- SArray *mgr = taosArrayInit(sz, sizeof(void *));
for (int i = 0; i < sz; i++) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) {
@@ -894,7 +905,6 @@ static void *hbThreadFunc(void *param) {
int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
if (connCnt == 0) {
- taosArrayPush(mgr, &pAppHbMgr);
continue;
}
SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
@@ -908,7 +918,6 @@ static void *hbThreadFunc(void *param) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr);
- taosArrayPush(mgr, &pAppHbMgr);
break;
}
@@ -920,7 +929,6 @@ static void *hbThreadFunc(void *param) {
tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr);
taosMemoryFree(buf);
- taosArrayPush(mgr, &pAppHbMgr);
break;
}
pInfo->fp = hbAsyncCallBack;
@@ -941,12 +949,8 @@ static void *hbThreadFunc(void *param) {
// hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
- taosArrayPush(mgr, &pAppHbMgr);
}
- taosArrayDestroy(clientHbMgr.appHbMgrs);
- clientHbMgr.appHbMgrs = mgr;
-
taosThreadMutexUnlock(&clientHbMgr.lock);
taosMsleep(HEARTBEAT_INTERVAL);
diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c
index 87aee4a8a3f8b95afb62338dc2fcd4700212029c..63e8b3097c3cd7a8199e5001723515dafdb2bb24 100644
--- a/source/client/src/clientTmq.c
+++ b/source/client/src/clientTmq.c
@@ -2109,6 +2109,29 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
}
}
+int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
+ if (TD_RES_TMQ(res)) {
+ SMqRspObj* pRspObj = (SMqRspObj*) res;
+ STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
+ if (pOffset->type == TMQ_OFFSET__LOG) {
+ return pRspObj->rsp.rspOffset.version;
+ }
+ } else if (TD_RES_TMQ_META(res)) {
+ SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
+ if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
+ return pRspObj->metaRsp.rspOffset.version;
+ }
+ } else if (TD_RES_TMQ_METADATA(res)) {
+ SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
+ if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
+ return pRspObj->rsp.rspOffset.version;
+ }
+ }
+
+ // data from tsdb, no valid offset info
+ return -1;
+}
+
const char* tmq_get_table_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
index 06b6221940a4f0991fe520a58e2918409139380b..89c394fdd0889a30b737b43f534aec96e3fb3afa 100644
--- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
+++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
@@ -91,6 +91,7 @@ static void *dmCrashReportThreadFp(void *param) {
dError("failed to send crash report");
if (pFile) {
taosReleaseCrashLogFile(pFile, false);
+ pFile = NULL;
continue;
}
} else {
@@ -110,6 +111,7 @@ static void *dmCrashReportThreadFp(void *param) {
if (pFile) {
taosReleaseCrashLogFile(pFile, truncateFile);
+ pFile = NULL;
truncateFile = false;
}
diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
index d8841201478b948f396e8d62a64c714290559c96..544512233e47e0b6a7f29dcbaf056f2e6ad48195 100644
--- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
+++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c
@@ -18,6 +18,7 @@
#include "dmNodes.h"
#include "index.h"
#include "qworker.h"
+#include "tstream.h"
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
@@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
}
indexInit(tsNumOfCommitThreads);
+ streamMetaInit();
dmReportStartup("dnode-transport", "initialized");
dDebug("dnode is created, ptr:%p", pDnode);
@@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupServer(pDnode);
dmClearVars(pDnode);
rpcCleanup();
+ streamMetaCleanup();
indexCleanup();
taosConvDestroy();
dDebug("dnode is closed, ptr:%p", pDnode);
diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c
index 002407ce8ab5b93206077113da7ee09452f2e196..d4cbcaaacda393b52df4d9fc27d10de55be7ae03 100644
--- a/source/dnode/mnode/impl/src/mndDnode.c
+++ b/source/dnode/mnode/impl/src/mndDnode.c
@@ -751,7 +751,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
SDnodeObj *pDnode = NULL;
SCreateDnodeReq createReq = {0};
- if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0) {
+ if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0 || (terrno = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
code = terrno;
goto _OVER;
}
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index df7955771d799b35030bc745df7f220e6920ed43..68b8dd72019df1ed623170249272e3cda3c5ad1a 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
- if (mndPauseStreamTask(pTrans, pTask) < 0) {
+ if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) {
return -1;
}
}
@@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
- if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
+ if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1;
}
}
diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c
index d56595dae93c00e884d70c91bdcf9cff44aa53f6..c59669fc53c9006ae9c535fcb9cb671369adb61f 100644
--- a/source/libs/executor/src/timesliceoperator.c
+++ b/source/libs/executor/src/timesliceoperator.c
@@ -894,8 +894,10 @@ void destroyTimeSliceOperatorInfo(void* param) {
}
taosArrayDestroy(pInfo->pLinearInfo);
- taosMemoryFree(pInfo->pPrevGroupKey->pData);
- taosMemoryFree(pInfo->pPrevGroupKey);
+ if (pInfo->pPrevGroupKey) {
+ taosMemoryFree(pInfo->pPrevGroupKey->pData);
+ taosMemoryFree(pInfo->pPrevGroupKey);
+ }
cleanupExprSupp(&pInfo->scalarSup);
diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h
index 5d2970a4b70cb2ec68cb6c069d393513757871d4..0f39cf817b1bbd191d9ab49d3456be9e1bfa1c66 100644
--- a/source/libs/stream/inc/streamBackendRocksdb.h
+++ b/source/libs/stream/inc/streamBackendRocksdb.h
@@ -16,8 +16,6 @@
#ifndef _STREAM_BACKEDN_ROCKSDB_H_
#define _STREAM_BACKEDN_ROCKSDB_H_
-#include "executor.h"
-
#include "rocksdb/c.h"
// #include "streamInc.h"
#include "streamState.h"
@@ -112,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove);
-void* streamStateCreateBatch();
-int32_t streamStateGetBatchSize(void* pBatch);
-void streamStateClearBatch(void* pBatch);
-void streamStateDestroyBatch(void* pBatch);
-int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
- void* val, int32_t vlen);
-int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
-
// default cf
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
@@ -138,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
- void* val, int32_t vlen);
+ void* val, int32_t vlen, int64_t ttl);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif
\ No newline at end of file
diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h
index 71fbe5e086c9e0ae49ab78ab57f618ed87fd0c62..c471bc2bd8f41723067de1d0ae98d504aa4a1aa9 100644
--- a/source/libs/stream/inc/streamInc.h
+++ b/source/libs/stream/inc/streamInc.h
@@ -16,9 +16,12 @@
#ifndef _STREAM_INC_H_
#define _STREAM_INC_H_
-//#include "executor.h"
+#include "executor.h"
+#include "query.h"
#include "tstream.h"
+#include "trpc.h"
+
#ifdef __cplusplus
extern "C" {
#endif
diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c
index 635024519e8cb665e08a05ddf665638033edda3a..1a24445af68be9942b9b20fa93d0a9cce7514062 100644
--- a/source/libs/stream/src/stream.c
+++ b/source/libs/stream/src/stream.c
@@ -308,6 +308,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
numOfBlocks, size);
streamDataSubmitDestroy(pSubmitBlock);
+ taosFreeQitem(pSubmitBlock);
return -1;
}
diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c
index db4ec17b19294b2396f10d24cc10baa0e8df9950..16ba81c74a610c761d6ad697b14a2db2ff5cba24 100644
--- a/source/libs/stream/src/streamBackendRocksdb.c
+++ b/source/libs/stream/src/streamBackendRocksdb.c
@@ -13,8 +13,9 @@
* along with this program. If not, see .
*/
-// #include "streamStateRocksdb.h"
#include "streamBackendRocksdb.h"
+#include "executor.h"
+#include "query.h"
#include "tcommon.h"
typedef struct SCompactFilteFactory {
@@ -110,6 +111,9 @@ void* streamBackendInit(const char* path) {
taosMemoryFreeClear(err);
}
} else {
+ /*
+ list all cf and get prefix
+ */
int64_t streamId;
int32_t taskId, dummpy = 0;
SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
@@ -649,18 +653,7 @@ const char* compactFilteFactoryName(void* arg) {
void destroyCompactFilte(void* arg) { (void)arg; }
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) {
- // int64_t unixTime = taosGetTimestampMs();
- if (streamStateValueIsStale((char*)val)) {
- return 1;
- }
- // SStreamValue value;
- // memset(&value, 0, sizeof(value));
- // streamValueDecode(&value, (char*)val);
- // taosMemoryFree(value.data);
- // if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) {
- // return 1;
- // }
- return 0;
+ return streamStateValueIsStale((char*)val) ? 1 : 0;
}
const char* compactFilteName(void* arg) { return "stream_filte"; }
@@ -703,7 +696,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
memcpy(cfNames[0], "default", strlen("default"));
continue;
}
- qError("cf name %s", idstr);
GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key);
if (i % cfLen == 0) {
@@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen);
}
}
- for (int i = 0; i < nSize * cfLen + 1; i++) {
- qError("cf name %s", cfNames[i]);
- }
rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*));
RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*));
for (int i = 0; i < nSize * cfLen + 1; i++) {
@@ -858,7 +847,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
- // return -1;
}
}
pState->pTdbState->rocksdb = handle->db;
@@ -1012,53 +1000,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
taosMemoryFree(ttlV); \
} while (0);
-#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
- do { \
- code = 0; \
- char buf[128] = {0}; \
- char* err = NULL; \
- int i = streamGetInit(funcname); \
- if (i < 0) { \
- qWarn("streamState failed to get cf name: %s", funcname); \
- code = -1; \
- break; \
- } \
- char toString[128] = {0}; \
- if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
- int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
- rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
- rocksdb_t* db = pState->pTdbState->rocksdb; \
- rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
- size_t len = 0; \
- char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
- if (val == NULL) { \
- qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
- if (err != NULL) taosMemoryFree(err); \
- code = -1; \
- } else { \
- char * p = NULL, *end = NULL; \
- int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \
- if (len < 0) { \
- qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \
- code = -1; \
- } else { \
- qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \
- } \
- if (pVal != NULL) { \
- *pVal = p; \
- } else { \
- taosMemoryFree(p); \
- } \
- taosMemoryFree(val); \
- if (vLen != NULL) *vLen = len; \
- } \
- if (err != NULL) { \
- taosMemoryFree(err); \
- qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
- code = -1; \
- } else { \
- if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
- } \
+#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
+ do { \
+ code = 0; \
+ char buf[128] = {0}; \
+ char* err = NULL; \
+ int i = streamGetInit(funcname); \
+ if (i < 0) { \
+ qWarn("streamState failed to get cf name: %s", funcname); \
+ code = -1; \
+ break; \
+ } \
+ char toString[128] = {0}; \
+ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
+ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
+ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
+ rocksdb_t* db = pState->pTdbState->rocksdb; \
+ rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
+ size_t len = 0; \
+ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
+ if (val == NULL) { \
+ if (err == NULL) { \
+ qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
+ funcname); \
+ } else { \
+ qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
+ err); \
+ taosMemoryFreeClear(err); \
+ } \
+ code = -1; \
+ } else { \
+ char* p = NULL; \
+ int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
+ if (len < 0) { \
+ qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
+ funcname); \
+ code = -1; \
+ } else { \
+ qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
+ len); \
+ } \
+ taosMemoryFree(val); \
+ if (vLen != NULL) *vLen = len; \
+ } \
+ if (code == 0) \
+ qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
@@ -1133,10 +1119,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// eLen);
if (err != NULL) {
- qWarn(
- "failed to delete range cf(state) err: %s, "
- "start: %s, end:%s",
- err, toStringStart, toStringEnd);
+ qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
taosMemoryFree(err);
}
@@ -1588,20 +1571,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
return -1;
}
- size_t tlen;
- char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
+ size_t klen, vlen;
+ char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
winKeyDecode(&winKey, keyStr);
- size_t vlen = 0;
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
- char* dst = NULL;
- int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst);
+ // char* dst = NULL;
+ int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal);
if (len < 0) {
return -1;
}
-
- if (pVal != NULL) *pVal = (char*)dst;
- if (pVLen != NULL) *pVLen = vlen;
+ if (pVLen != NULL) *pVLen = len;
*pKey = winKey;
return 0;
@@ -1999,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) {
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
- void* val, int32_t vlen) {
+ void* val, int32_t vlen, int64_t ttl) {
int i = streamGetInit(cfName);
if (i < 0) {
@@ -2010,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
char* ttlV = NULL;
- int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV);
+ int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(ttlV);
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 0fb78fb589e60d8ebb1a562f589715351bb576cf..f4d8522f317ee4c0247edfcce134039d39674d8b 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -20,12 +20,12 @@
#define MIN_STREAM_EXEC_BATCH_NUM 16
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
- int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
+ int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
- int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
+ int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__PAUSE);
}
@@ -35,7 +35,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus);
- if (status != TASK_STATUS__NORMAL) {
+ if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
atomic_load_8(&pTask->status.taskStatus));
taosMsleep(2);
@@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
- qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr,
- pSubmit->submit.msgLen, pSubmit->submit.ver);
+ qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
+ pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
- qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
+ qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data;
@@ -202,7 +202,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
qRes->blocks = pRes;
code = streamTaskOutput(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
- taosFreeQitem(pRes);
+ taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
+ taosFreeQitem(qRes);
return code;
}
@@ -332,12 +333,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int64_t ckId = 0;
int64_t dataVer = 0;
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
- if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated
+ if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
", checkPoint id:%" PRId64 " -> %" PRId64,
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
- pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
+ pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
taosWLockLatch(&pTask->pMeta->lock);
@@ -407,7 +408,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed", pTask->id.idStr);
- if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) {
+ if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
}
}
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index de56cf24ca7ca343b1d6bcc3aaaa008914a9f0a0..682ce08c7fcdd1d25279a8e9e8503c6e43dc2991 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -19,6 +19,13 @@
#include "tref.h"
#include "ttimer.h"
+static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
+static int32_t streamBackendId = 0;
+static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
+
+void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
+void streamMetaCleanup() { taosCloseRef(streamBackendId); }
+
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
@@ -32,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = taosStrdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
- taosMemoryFree(streamPath);
goto _err;
}
+ memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
- taosMemoryFree(streamPath);
goto _err;
}
- taosMemoryFree(streamPath);
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err;
@@ -74,26 +79,26 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->vgId = vgId;
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
+ pMeta->streamBackendId = streamBackendId;
- char* statePath = taosMemoryCalloc(1, len);
- sprintf(statePath, "%s/%s", pMeta->path, "state");
- code = taosMulModeMkDir(statePath, 0755);
+ memset(streamPath, 0, len);
+ sprintf(streamPath, "%s/%s", pMeta->path, "state");
+ code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
- taosMemoryFree(streamPath);
goto _err;
}
- pMeta->streamBackend = streamBackendInit(statePath);
- pMeta->streamBackendId = taosOpenRef(20, streamBackendCleanup);
- pMeta->streamBackendRid = taosAddRef(pMeta->streamBackendId, pMeta->streamBackend);
+ pMeta->streamBackend = streamBackendInit(streamPath);
+ pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
- taosMemoryFree(statePath);
+ taosMemoryFree(streamPath);
taosInitRWLatch(&pMeta->lock);
return pMeta;
_err:
+ taosMemoryFree(streamPath);
taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
@@ -129,9 +134,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
taosHashCleanup(pMeta->pTasks);
- taosRemoveRef(pMeta->streamBackendId, pMeta->streamBackendRid);
- // streamBackendCleanup(pMeta->streamBackend);
- taosCloseRef(pMeta->streamBackendId);
+ taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta);
@@ -265,13 +268,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
-
- // taosWLockLatch(&pMeta->lock);
-
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
- //
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c
index 1cca4d55cf34339810cc63a48565b27d122e629d..373cb27941f56abf55f8bcfb0486a67225703d3b 100644
--- a/source/libs/stream/src/streamState.c
+++ b/source/libs/stream/src/streamState.c
@@ -115,7 +115,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
pState->taskId = pTask->id.taskId;
pState->streamId = pTask->id.streamId;
#ifdef USE_ROCKSDB
- qWarn("open stream state1");
+ // qWarn("open stream state1");
taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState);
if (code == -1) {
@@ -220,6 +220,7 @@ void streamStateClose(SStreamState* pState, bool remove) {
#ifdef USE_ROCKSDB
// streamStateCloseBackend(pState);
streamStateDestroy(pState, remove);
+ taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
#else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
@@ -231,7 +232,6 @@ void streamStateClose(SStreamState* pState, bool remove) {
tdbTbClose(pState->pTdbState->pParTagDb);
tdbClose(pState->pTdbState->db);
#endif
- taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
}
int32_t streamStateBegin(SStreamState* pState) {
@@ -399,7 +399,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo
int32_t code = 0;
void* batch = streamStateCreateBatch();
- code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen);
+ code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
if (code != 0) {
return code;
}
diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c
index b7401ec5d97a3f73733483828c6977ce4d636b62..67835e77b8847d45d1015923bdc6bc3b035fdf5d 100644
--- a/source/libs/stream/src/tstreamFileState.c
+++ b/source/libs/stream/src/tstreamFileState.c
@@ -15,6 +15,7 @@
#include "tstreamFileState.h"
+#include "query.h"
#include "streamBackendRocksdb.h"
#include "taos.h"
#include "tcommon.h"
@@ -154,9 +155,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, 0, true);
}
-bool needClearDiskBuff(SStreamFileState* pFileState) {
- return pFileState->flushMark > 0;
-}
+bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
uint64_t i = 0;
@@ -325,7 +324,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; }
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
- clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
+ int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
+ : pFileState->maxTs - pFileState->deleteMark;
+ clearExpiredRowBuff(pFileState, mark, false);
return pFileState->usedBuffs;
}
@@ -356,7 +357,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
}
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
- code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize);
+ code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0);
qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
}
if (streamStateGetBatchSize(batch) > 0) {
@@ -372,7 +373,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = 0;
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
- code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
+ code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
taosMemoryFree(valBuf);
}
{
@@ -381,7 +382,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = 0;
memcpy(keyBuf, taskKey, strlen(taskKey));
len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
- code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
+ code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
}
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
}
@@ -440,7 +441,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
- deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark);
+ int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
+ : pFileState->maxTs - pFileState->deleteMark;
+ deleteExpiredCheckPoint(pFileState, mark);
void* pStVal = NULL;
int32_t len = 0;
diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt
index 5a97ba45f684dc444f3e7b5cb0cfdeade8728fd1..a0c171769025f62b0a70a3677d9faeab2a4f6693 100644
--- a/source/libs/stream/test/CMakeLists.txt
+++ b/source/libs/stream/test/CMakeLists.txt
@@ -10,7 +10,7 @@ ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
TARGET_LINK_LIBRARIES(
streamUpdateTest
- PUBLIC os util common gtest stream
+ PUBLIC os util common gtest gtest_main stream
)
TARGET_INCLUDE_DIRECTORIES(
diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp
index c6981878744510b13a18c5795a3b9f05b968dc58..18c60aff284414e5ba5044d50000a9bd45718965 100644
--- a/source/libs/stream/test/tstreamUpdateTest.cpp
+++ b/source/libs/stream/test/tstreamUpdateTest.cpp
@@ -1,11 +1,28 @@
#include
+#include "streamBackendRocksdb.h"
+#include "tstream.h"
#include "tstreamUpdate.h"
#include "ttime.h"
using namespace std;
#define MAX_NUM_SCALABLE_BF 100000
+class StreamStateEnv : public ::testing::Test {
+ protected:
+ virtual void SetUp() {
+ streamMetaInit();
+ backend = streamBackendInit(path);
+ }
+ virtual void TearDown() {
+ streamMetaCleanup();
+ // indexClose(index);
+ }
+
+ const char *path = TD_TMP_DIR_PATH "stream";
+ void *backend;
+};
+
bool equalSBF(SScalableBf *left, SScalableBf *right) {
if (left->growth != right->growth) return false;
if (left->numBits != right->numBits) return false;
@@ -191,8 +208,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
// updateInfoDestroy(pSU6);
// updateInfoDestroy(pSU7);
}
-
-int main(int argc, char *argv[]) {
- testing::InitGoogleTest(&argc, argv);
- return RUN_ALL_TESTS();
-}
\ No newline at end of file
+// TEST()
+TEST(StreamStateEnv, test1) {}
+// int main(int argc, char *argv[]) {
+// testing::InitGoogleTest(&argc, argv);
+// return RUN_ALL_TESTS();
+// }
\ No newline at end of file
diff --git a/tests/script/tsim/alter/table.sim b/tests/script/tsim/alter/table.sim
index db2a22205f9002c715f516fe9cdc31ea8082d1ee..0cf291523a7b4b6ca2cbf321a797c65656dfd59f 100644
--- a/tests/script/tsim/alter/table.sim
+++ b/tests/script/tsim/alter/table.sim
@@ -657,36 +657,33 @@ if $data20 != null then
return -1
endi
-#print =============== error for normal table
-#sql create table tb2023(ts timestamp, f int);
-#sql_error alter table tb2023 add column v varchar(65535);
-#sql_error alter table tb2023 add column v varchar(65535);
-#sql_error alter table tb2023 add column v varchar(65530);
-#sql alter table tb2023 add column v varchar(16374);
-#sql_error alter table tb2023 modify column v varchar(65536);
-#sql desc tb2023
-#sql alter table tb2023 drop column v
-#sql_error alter table tb2023 add column v nchar(16384);
-#sql alter table tb2023 add column v nchar(4093);
-#sql_error alter table tb2023 modify column v nchar(16384);
-#sql_error alter table tb2023 add column v nchar(16384);
-#sql alter table tb2023 drop column v
-#sql alter table tb2023 add column v nchar(16374);
-#sql desc tb2023
-#
-#print =============== error for super table
-#sql create table stb2023(ts timestamp, f int) tags(t1 int);
-#sql_error alter table stb2023 add column v varchar(65535);
-#sql_error alter table stb2023 add column v varchar(65536);
-#sql_error alter table stb2023 add column v varchar(33100);
-#sql alter table stb2023 add column v varchar(16374);
-#sql_error alter table stb2023 modify column v varchar(16375);
-#sql desc stb2023
-#sql alter table stb2023 drop column v
-#sql_error alter table stb2023 add column v nchar(4094);
-#sql alter table stb2023 add column v nchar(4093);
-#sql_error alter table stb2023 modify column v nchar(4094);
-#sql desc stb2023
+print =============== error for normal table
+sql create table tb2023(ts timestamp, f int);
+sql_error alter table tb2023 add column v varchar(65518);
+sql_error alter table tb2023 add column v varchar(65531);
+sql_error alter table tb2023 add column v varchar(65535);
+sql alter table tb2023 add column v varchar(65517);
+sql_error alter table tb2023 modify column v varchar(65518);
+sql desc tb2023
+sql alter table tb2023 drop column v
+sql_error alter table tb2023 add column v nchar(16380);
+sql alter table tb2023 add column v nchar(16379);
+sql_error alter table tb2023 modify column v nchar(16380);
+sql desc tb2023
+
+print =============== error for super table
+sql create table stb2023(ts timestamp, f int) tags(t1 int);
+sql_error alter table stb2023 add column v varchar(65518);
+sql_error alter table stb2023 add column v varchar(65531);
+sql_error alter table stb2023 add column v varchar(65535);
+sql alter table stb2023 add column v varchar(65517);
+sql_error alter table stb2023 modify column v varchar(65518);
+sql desc stb2023
+sql alter table stb2023 drop column v
+sql_error alter table stb2023 add column v nchar(16380);
+sql alter table stb2023 add column v nchar(16379);
+sql_error alter table stb2023 modify column v nchar(16380);
+sql desc stb2023
print ======= over
sql drop database d1
diff --git a/tests/script/tsim/parser/alter_column.sim b/tests/script/tsim/parser/alter_column.sim
index f89211573596ae02f0a484840cc7679fb1d58a34..2bf369b91054c3953406fc5116a88c94f508aafb 100644
--- a/tests/script/tsim/parser/alter_column.sim
+++ b/tests/script/tsim/parser/alter_column.sim
@@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10);
sql_error alter table tb modify column c2 binary(9);
sql_error alter table tb modify column c2 binary(-9);
sql_error alter table tb modify column c2 binary(0);
-sql_error alter table tb modify column c2 binary(65600);
+sql_error alter table tb modify column c2 binary(65436);
sql_error alter table tb modify column c2 nchar(30);
sql_error alter table tb modify column c3 double;
sql_error alter table tb modify column c3 nchar(10);