diff --git a/docs/zh/05-get-started/index.md b/docs/zh/05-get-started/index.md index 6faa7fed01cdbf03ebb3ee02c73c37d08d31137a..f179dea26ddbb8b778fae36dadd03feb13f99e7e 100644 --- a/docs/zh/05-get-started/index.md +++ b/docs/zh/05-get-started/index.md @@ -18,7 +18,18 @@ import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; ``` -### 加入 TDengine 官方社区 +## 学习 TDengine 知识地图 + +TDengine 知识地图中涵盖了 TDengine 的各种知识点,揭示了各概念实体之间的调用关系和数据流向。学习和了解 TDengine 知识地图有助于你快速掌握 TDengine 的知识体系。 + +
+
+ +
图 1. TDengine 知识地图
+
+
+ +## 加入 TDengine 官方社区 微信扫描以下二维码,学习了解 TDengine 的最新技术,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。 diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 7234d306a443ca5ad9513851f1411929abb1e23c..59f030a60c651a118b4f956996eb8e60ce42cd87 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -27,8 +27,7 @@ typedef struct SStreamTask SStreamTask; typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2); -// incremental state storage -typedef struct { +typedef struct STdbState { SStreamTask* pOwner; TDB* db; TTB* pStateDb; @@ -37,7 +36,12 @@ typedef struct { TTB* pSessionStateDb; TTB* pParNameDb; TXN txn; - int32_t number; +} STdbState; + +// incremental state storage +typedef struct { + STdbState* pTdbState; + int32_t number; } SStreamState; SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); @@ -45,6 +49,7 @@ void streamStateClose(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); int32_t streamStateAbort(SStreamState* pState); +void streamStateDestroy(SStreamState* pState); typedef struct { TBC* pCur; diff --git a/packaging/docker/DockerfileCloud b/packaging/docker/DockerfileCloud index 2b060c1b913d1830bd62c966983c7bc7a2edb67f..21e387bab3f81053683b83208e6632cd76fea652 100644 --- a/packaging/docker/DockerfileCloud +++ b/packaging/docker/DockerfileCloud @@ -7,6 +7,9 @@ ARG dirName ARG cpuType RUN echo ${pkgFile} && echo ${dirName} +RUN apt update +RUN apt install -y curl + COPY ${pkgFile} /root/ ENV TINI_VERSION v0.19.0 ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini diff --git a/packaging/docker/run.sh b/packaging/docker/run.sh old mode 100644 new mode 100755 index 2700b0b0607bc942901497a51f2d5118fbe880ef..b5d6b011ea256147fa91f949b31f1a239990b78d --- a/packaging/docker/run.sh +++ b/packaging/docker/run.sh @@ -1,16 +1,160 @@ #!/bin/bash + TAOS_RUN_TAOSBENCHMARK_TEST_ONCE=0 +#ADMIN_URL=${ADMIN_URL:-http://172.26.10.84:10001} +TAOSD_STARTUP_TIMEOUT_SECOND=${TAOSD_STARTUP_TIMEOUT_SECOND:-160} +TAOS_TIMEOUT_SECOND=${TAOS_TIMEOUT_SECOND:-5} +BACKUP_CORE_FOLDER=/data/corefile +ALERT_URL=app/system/alert/add + +echo "ADMIN_URL: ${ADMIN_URL}" +echo "TAOS_TIMEOUT_SECOND: ${TAOS_TIMEOUT_SECOND}" + +function set_service_state() { + #echo "set service state: $1, $2" + service_state="$1" + service_msg="$2" +} +set_service_state "init" "ok" +app_name=`hostname |cut -d\- -f1` + +function check_taosd() { + timeout $TAOS_TIMEOUT_SECOND taos -s "show databases;" >/dev/null + local ret=$? + if [ $ret -ne 0 ]; then + echo "`date` check taosd error $ret" + if [ "x$1" != "xignore" ]; then + set_service_state "error" "taos check failed $ret" + fi + else + set_service_state "ready" "ok" + fi +} +function post_error_msg() { + if [ ! -z "${ADMIN_URL}" ]; then + taos_version=`taos --version` + echo "app_name: ${app_name}" + echo "service_state: ${service_state}" + echo "`date` service_msg: ${service_msg}" + echo "${taos_version}" + curl -X POST -H "Content-Type: application/json" \ + -d"{\"appName\":\"${app_name}\",\ + \"alertLevel\":\"${service_state}\",\ + \"taosVersion\":\"${taos_version}\",\ + \"alertMsg\":\"${service_msg}\"}" \ + ${ADMIN_URL}/${ALERT_URL} + fi +} +function check_taosd_exit_type() { + local core_pattern=`cat /proc/sys/kernel/core_pattern` + echo "$core_pattern" | grep -q "^/" + if [ $? -eq 0 ]; then + core_folder=`dirname $core_pattern` + core_prefix=`basename $core_pattern | sed "s/%.*//"` + else + core_folder=`pwd` + core_prefix="$core_pattern" + fi + local core_files=`ls $core_folder | grep "^${core_prefix}"` + if [ ! -z "$core_files" ]; then + # move core files to another folder + mkdir -p ${BACKUP_CORE_FOLDER} + mv ${core_folder}/${core_prefix}* ${BACKUP_CORE_FOLDER}/ + set_service_state "error" "taosd exit with core file" + else + set_service_state "error" "taosd exit without core file" + fi +} +disk_usage_level=(60 80 99) +current_disk_level=0 +disk_state="ok" +disk_msg="ok" +get_usage_ok="yes" +function post_disk_error_msg() { + if [ ! -z "${ADMIN_URL}" ]; then + taos_version=`taos --version` + echo "app_name: ${app_name}" + echo "disk_state: ${disk_state}" + echo "`date` disk_msg: ${disk_msg}" + echo "${taos_version}" + curl -X POST -H "Content-Type: application/json" \ + -d"{\"appName\":\"${app_name}\",\ + \"alertLevel\":\"${disk_state}\",\ + \"taosVersion\":\"${taos_version}\",\ + \"alertMsg\":\"${disk_msg}\"}" \ + ${ADMIN_URL}/${ALERT_URL} + fi +} +function check_disk() { + local folder=`cat /etc/taos/taos.cfg|grep -v "^#"|grep dataDir|awk '{print $NF}'` + if [ -z "$folder" ]; then + folder="/var/lib/taos" + fi + local mount_point="$folder" + local usage="" + while [ -z "$usage" ]; do + usage=`df -h|grep -w "${mount_point}"|awk '{print $5}'|grep -v Use|sed "s/%$//"` + if [ "x${mount_point}" = "x/" ]; then + break + fi + mount_point=`dirname ${mount_point}` + done + if [ -z "$usage" ]; then + disk_state="error" + disk_msg="cannot get disk usage" + if [ "$get_usage_ok" = "yes" ]; then + post_disk_error_msg + get_usage_ok="no" + fi + else + get_usage_ok="yes" + local current_level=0 + for level in ${disk_usage_level[*]}; do + if [ ${usage} -ge ${level} ]; then + disk_state="error" + disk_msg="disk usage over ${level}%" + current_level=${level} + fi + done + if [ ${current_level} -gt ${current_disk_level} ]; then + post_disk_error_msg + elif [ ${current_level} -lt ${current_disk_level} ]; then + echo "disk usage reduced from ${current_disk_level} to ${current_level}" + fi + current_disk_level=${current_level} + fi +} +function run_taosd() { + taosd + set_service_state "error" "taosd exit" + # post error msg + # check crash or OOM + check_taosd_exit_type + post_error_msg +} +function print_service_state_change() { + if [ "x$1" != "x${service_state}" ]; then + echo "`date` service state: ${service_state}, ${service_msg}" + fi +} +taosd_start_time=`date +%s` while ((1)) do + check_disk # echo "outer loop: $a" - sleep 10 - output=`taos -k` - status=${output:0:1} + output=`timeout $TAOS_TIMEOUT_SECOND taos -k` + if [ -z "${output}" ]; then + echo "`date` taos -k error" + status="" + else + status=${output:0:1} + fi # echo $output # echo $status if [ "$status"x = "0"x ] then - taosd & + # taosd_start_time=`date +%s` + run_taosd & fi # echo "$status"x "$TAOS_RUN_TAOSBENCHMARK_TEST"x "$TAOS_RUN_TAOSBENCHMARK_TEST_ONCE"x if [ "$status"x = "2"x ] && [ "$TAOS_RUN_TAOSBENCHMARK_TEST"x = "1"x ] && [ "$TAOS_RUN_TAOSBENCHMARK_TEST_ONCE"x = "0"x ] @@ -24,13 +168,37 @@ do taos -s "select stable_name from information_schema.ins_stables where db_name = 'test';"|grep -q -w meters if [ $? -ne 0 ]; then taosBenchmark -y -t 1000 -n 1000 -S 900000 - taos -s "create user admin_user pass 'NDS65R6t' sysinfo 0;" - taos -s "GRANT ALL on test.* to admin_user;" + taos -s "create user admin_user pass 'NDS65R6t' sysinfo 0;" + taos -s "GRANT ALL on test.* to admin_user;" + fi + fi + # check taosd status + if [ "$service_state" = "ready" ]; then + # check taosd status + check_taosd + print_service_state_change "ready" + if [ "$service_state" = "error" ]; then + post_error_msg + fi + elif [ "$service_state" = "init" ]; then + check_taosd "ignore" + # check timeout + current_time=`date +%s` + time_elapsed=$(( current_time - taosd_start_time )) + if [ ${time_elapsed} -gt ${TAOSD_STARTUP_TIMEOUT_SECOND} ]; then + set_service_state "error" "taosd startup timeout" + post_error_msg fi + print_service_state_change "init" + elif [ "$service_state" = "error" ]; then + # check taosd status + check_taosd + print_service_state_change "error" fi # check taosadapter nc -z localhost 6041 if [ $? -ne 0 ]; then - taosadapter & + taosadapter & fi + sleep 30 done diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index ae80af565bbbfecc40f08c673be3de6d0ab7c6c5..8a58d30ba4ccde9e3cd4affae50cea5b45503258 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1698,7 +1698,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol size += pBlockCol->szBitmap; // offset - if (IS_VAR_DATA_TYPE(pColData->type)) { + if (IS_VAR_DATA_TYPE(pColData->type) && pColData->flag != (HAS_NULL | HAS_NONE)) { code = tsdbCmprData((uint8_t *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg, ppOut, nOut + size, &pBlockCol->szOffset, ppBuf); if (code) goto _exit; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 63ec0caa95dadd70b3bb1f6e2dc69a24025b5ae0..0374e22a4a69e07d75406ac4343aa0a8ee21ff51 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -114,6 +114,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pState->pTdbState = taosMemoryCalloc(1, sizeof(STdbState)); + if (pState->pTdbState == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + streamStateDestroy(pState); + return NULL; + } char statePath[1024]; if (!specPath) { @@ -122,31 +128,34 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int memset(statePath, 0, 1024); tstrncpy(statePath, path, 1024); } - if (tdbOpen(statePath, szPage, pages, &pState->db, 0) < 0) { + if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 0) < 0) { goto _err; } // open state storage backend - if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb, 0) < 0) { + if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->pTdbState->db, &pState->pTdbState->pStateDb, + 0) < 0) { goto _err; } // todo refactor - if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb, 0) < 0) { + if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->pTdbState->db, + &pState->pTdbState->pFillStateDb, 0) < 0) { goto _err; } - if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->db, - &pState->pSessionStateDb, 0) < 0) { + if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->pTdbState->db, + &pState->pTdbState->pSessionStateDb, 0) < 0) { goto _err; } - if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb, 0) < 0) { + if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->pTdbState->db, + &pState->pTdbState->pFuncStateDb, 0) < 0) { goto _err; } - if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->db, &pState->pParNameDb, 0) < - 0) { + if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->pTdbState->db, + &pState->pTdbState->pParNameDb, 0) < 0) { goto _err; } @@ -154,117 +163,117 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int goto _err; } - pState->pOwner = pTask; + pState->pTdbState->pOwner = pTask; return pState; _err: - tdbTbClose(pState->pStateDb); - tdbTbClose(pState->pFuncStateDb); - tdbTbClose(pState->pFillStateDb); - tdbTbClose(pState->pSessionStateDb); - tdbTbClose(pState->pParNameDb); - tdbClose(pState->db); - taosMemoryFree(pState); + tdbTbClose(pState->pTdbState->pStateDb); + tdbTbClose(pState->pTdbState->pFuncStateDb); + tdbTbClose(pState->pTdbState->pFillStateDb); + tdbTbClose(pState->pTdbState->pSessionStateDb); + tdbTbClose(pState->pTdbState->pParNameDb); + tdbClose(pState->pTdbState->db); + streamStateDestroy(pState); return NULL; } void streamStateClose(SStreamState* pState) { - tdbCommit(pState->db, &pState->txn); - tdbPostCommit(pState->db, &pState->txn); - tdbTbClose(pState->pStateDb); - tdbTbClose(pState->pFuncStateDb); - tdbTbClose(pState->pFillStateDb); - tdbTbClose(pState->pSessionStateDb); - tdbTbClose(pState->pParNameDb); - tdbClose(pState->db); + tdbCommit(pState->pTdbState->db, &pState->pTdbState->txn); + tdbPostCommit(pState->pTdbState->db, &pState->pTdbState->txn); + tdbTbClose(pState->pTdbState->pStateDb); + tdbTbClose(pState->pTdbState->pFuncStateDb); + tdbTbClose(pState->pTdbState->pFillStateDb); + tdbTbClose(pState->pTdbState->pSessionStateDb); + tdbTbClose(pState->pTdbState->pParNameDb); + tdbClose(pState->pTdbState->db); - taosMemoryFree(pState); + streamStateDestroy(pState); } int32_t streamStateBegin(SStreamState* pState) { - if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { + if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } - if (tdbBegin(pState->db, &pState->txn) < 0) { - tdbTxnClose(&pState->txn); + if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { + tdbTxnClose(&pState->pTdbState->txn); return -1; } return 0; } int32_t streamStateCommit(SStreamState* pState) { - if (tdbCommit(pState->db, &pState->txn) < 0) { + if (tdbCommit(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { return -1; } - if (tdbPostCommit(pState->db, &pState->txn) < 0) { + if (tdbPostCommit(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { return -1; } - memset(&pState->txn, 0, sizeof(TXN)); - if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { + memset(&pState->pTdbState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } - if (tdbBegin(pState->db, &pState->txn) < 0) { + if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { return -1; } return 0; } int32_t streamStateAbort(SStreamState* pState) { - if (tdbAbort(pState->db, &pState->txn) < 0) { + if (tdbAbort(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { return -1; } - memset(&pState->txn, 0, sizeof(TXN)); - if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { + memset(&pState->pTdbState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->pTdbState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } - if (tdbBegin(pState->db, &pState->txn) < 0) { + if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn) < 0) { return -1; } return 0; } int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { - return tdbTbUpsert(pState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->pTdbState->txn); } int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { - return tdbTbGet(pState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); + return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); } int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { - return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), &pState->txn); + return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), &pState->pTdbState->txn); } // todo refactor int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { SStateKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbUpsert(pState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->pTdbState->txn); } // todo refactor int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { - return tdbTbUpsert(pState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->pTdbState->txn); } // todo refactor int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { SStateKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbGet(pState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); + return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); } // todo refactor int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - return tdbTbGet(pState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen); + return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen); } // todo refactor int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { SStateKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbDelete(pState->pStateDb, &sKey, sizeof(SStateKey), &pState->txn); + return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), &pState->pTdbState->txn); } int32_t streamStateClear(SStreamState* pState) { @@ -288,7 +297,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number // todo refactor int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { - return tdbTbDelete(pState->pFillStateDb, key, sizeof(SWinKey), &pState->txn); + return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), &pState->pTdbState->txn); } int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { @@ -314,7 +323,7 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; - tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); + tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL); int32_t c = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; @@ -330,7 +339,7 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; - tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL); + tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL); int32_t c = 0; tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c); @@ -422,7 +431,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key return NULL; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -448,7 +457,7 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* if (!pCur) { return NULL; } - if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -473,7 +482,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* if (pCur == NULL) { return NULL; } - if (tdbTbcOpen(pState->pFillStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -520,7 +529,8 @@ void streamFreeVal(void* val) { tdbFree(val); } int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, + &pState->pTdbState->txn); } int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { @@ -543,7 +553,7 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn); + return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->pTdbState->txn); } SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) { @@ -552,7 +562,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons return NULL; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -579,7 +589,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons return NULL; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -607,7 +617,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess return NULL; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; } @@ -674,7 +684,7 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* return -1; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return -1; } @@ -821,13 +831,19 @@ _end: } int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - tdbTbUpsert(pState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, &pState->txn); + tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, + &pState->pTdbState->txn); return 0; } int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { int32_t len; - return tdbTbGet(pState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len); + return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len); +} + +void streamStateDestroy(SStreamState* pState) { + taosMemoryFreeClear(pState->pTdbState); + taosMemoryFreeClear(pState); } #if 0 @@ -837,7 +853,7 @@ char* streamStateSessionDump(SStreamState* pState) { return NULL; } pCur->number = pState->number; - if (tdbTbcOpen(pState->pSessionStateDb, &pCur->pCur, NULL) < 0) { + if (tdbTbcOpen(pState->pTdbState->pSessionStateDb, &pCur->pCur, NULL) < 0) { streamStateFreeCur(pCur); return NULL; }