Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ffee5ebc
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ffee5ebc
编写于
5月 17, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/3_liaohj
上级
a4d9210b
0e215134
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
244 addition
and
241 deletion
+244
-241
cmake/cmake.define
cmake/cmake.define
+1
-1
cmake/rocksdb_CMakeLists.txt.in
cmake/rocksdb_CMakeLists.txt.in
+2
-2
include/libs/stream/streamState.h
include/libs/stream/streamState.h
+3
-3
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+3
-4
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+2
-0
source/client/src/clientHb.c
source/client/src/clientHb.c
+64
-60
source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
+2
-0
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
+3
-0
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+2
-2
source/libs/executor/src/timesliceoperator.c
source/libs/executor/src/timesliceoperator.c
+4
-2
source/libs/stream/inc/streamBackendRocksdb.h
source/libs/stream/inc/streamBackendRocksdb.h
+1
-11
source/libs/stream/inc/streamInc.h
source/libs/stream/inc/streamInc.h
+4
-1
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+59
-79
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+11
-10
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+17
-18
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+3
-3
source/libs/stream/src/tstreamFileState.c
source/libs/stream/src/tstreamFileState.c
+11
-8
source/libs/stream/test/CMakeLists.txt
source/libs/stream/test/CMakeLists.txt
+1
-1
source/libs/stream/test/tstreamUpdateTest.cpp
source/libs/stream/test/tstreamUpdateTest.cpp
+23
-5
tests/script/tsim/alter/table.sim
tests/script/tsim/alter/table.sim
+27
-30
tests/script/tsim/parser/alter_column.sim
tests/script/tsim/parser/alter_column.sim
+1
-1
未找到文件。
cmake/cmake.define
浏览文件 @
ffee5ebc
cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE O
N
)
set(CMAKE_VERBOSE_MAKEFILE O
FF
)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory
...
...
cmake/rocksdb_CMakeLists.txt.in
浏览文件 @
ffee5ebc
# rocksdb
ExternalProject_Add(rocksdb
GIT_REPOSITORY https://github.com/
taosdata-contrib
/rocksdb.git
GIT_TAG v
6.23.3
GIT_REPOSITORY https://github.com/
facebook
/rocksdb.git
GIT_TAG v
8.1.1
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
...
...
include/libs/stream/streamState.h
浏览文件 @
ffee5ebc
...
...
@@ -20,13 +20,13 @@
#include "tsimplehash.h"
#include "tstreamFileState.h"
#ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_
// void* streamBackendInit(const char* path);
// void streamBackendCleanup(void* arg);
// SListNode* streamBackendAddCompare(void* backend, void* arg);
...
...
include/libs/stream/tstream.h
浏览文件 @
ffee5ebc
...
...
@@ -13,16 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "os.h"
#include "query.h"
#include "streamState.h"
#include "tdatablock.h"
#include "tdbInt.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tqueue.h"
#include "trpc.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -339,7 +336,7 @@ typedef struct SStreamMeta {
TTB
*
pTaskDb
;
TTB
*
pCheckpointDb
;
SHashObj
*
pTasks
;
SArray
*
pTaskList
;
// SArray<task_id*>
SArray
*
pTaskList
;
// SArray<task_id*>
void
*
ahandle
;
TXN
*
txn
;
FTaskExpand
*
expandFunc
;
...
...
@@ -567,6 +564,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t
streamProcessRecoverFinishReq
(
SStreamTask
*
pTask
,
int32_t
childId
);
void
streamMetaInit
();
void
streamMetaCleanup
();
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
,
int32_t
vgId
);
void
streamMetaClose
(
SStreamMeta
*
streamMeta
);
...
...
source/client/src/clientEnv.c
浏览文件 @
ffee5ebc
...
...
@@ -449,6 +449,7 @@ static void *tscCrashReportThreadFp(void *param) {
tscError
(
"failed to send crash report"
);
if
(
pFile
)
{
taosReleaseCrashLogFile
(
pFile
,
false
);
pFile
=
NULL
;
continue
;
}
}
else
{
...
...
@@ -468,6 +469,7 @@ static void *tscCrashReportThreadFp(void *param) {
if
(
pFile
)
{
taosReleaseCrashLogFile
(
pFile
,
truncateFile
);
pFile
=
NULL
;
truncateFile
=
false
;
}
...
...
source/client/src/clientHb.c
浏览文件 @
ffee5ebc
...
...
@@ -24,6 +24,8 @@ typedef struct {
struct
{
int64_t
clusterId
;
int32_t
passKeyCnt
;
int32_t
passVer
;
int32_t
reqCnt
;
};
};
}
SHbParam
;
...
...
@@ -540,14 +542,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
hbGetUserBasicInfo
(
SClientHbKey
*
connKey
,
SClientHbReq
*
req
)
{
static
int32_t
hbGetUserBasicInfo
(
SClientHbKey
*
connKey
,
S
HbParam
*
param
,
S
ClientHbReq
*
req
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
acquireTscObj
(
connKey
->
tscRid
);
if
(
!
pTscObj
)
{
tscWarn
(
"tscObj rid %"
PRIx64
" not exist"
,
connKey
->
tscRid
);
return
TSDB_CODE_APP_ERROR
;
}
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
param
&&
(
param
->
passVer
!=
INT32_MIN
)
&&
(
param
->
passVer
<=
pTscObj
->
passInfo
.
ver
))
{
tscDebug
(
"hb got user basic info, no need since passVer %d <= %d"
,
param
->
passVer
,
pTscObj
->
passInfo
.
ver
);
goto
_return
;
}
SUserPassVersion
*
user
=
taosMemoryMalloc
(
sizeof
(
SUserPassVersion
));
if
(
!
user
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -574,6 +582,11 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
goto
_return
;
}
// assign the passVer
if
(
param
)
{
param
->
passVer
=
pTscObj
->
passInfo
.
ver
;
}
_return:
releaseTscObj
(
connKey
->
tscRid
);
if
(
code
)
{
...
...
@@ -719,13 +732,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
}
int32_t
hbQueryHbReqHandle
(
SClientHbKey
*
connKey
,
void
*
param
,
SClientHbReq
*
req
)
{
SHbParam
*
hbParam
=
(
SHbParam
*
)
param
;
struct
SCatalog
*
pCatalog
=
NULL
;
int32_t
code
=
catalogGetHandle
(
hbParam
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscWarn
(
"catalogGetHandle failed, clusterId:%"
PRIx64
", error:%s"
,
hbParam
->
clusterId
,
tstrerror
(
code
));
return
code
;
int32_t
code
=
0
;
SHbParam
*
hbParam
=
(
SHbParam
*
)
param
;
SCatalog
*
pCatalog
=
NULL
;
if
(
hbParam
->
reqCnt
==
0
)
{
code
=
catalogGetHandle
(
hbParam
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscWarn
(
"catalogGetHandle failed, clusterId:%"
PRIx64
", error:%s"
,
hbParam
->
clusterId
,
tstrerror
(
code
));
return
code
;
}
}
hbGetAppInfo
(
hbParam
->
clusterId
,
req
);
...
...
@@ -733,24 +749,28 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
hbGetQueryBasicInfo
(
connKey
,
req
);
if
(
hbParam
->
passKeyCnt
>
0
)
{
hbGetUserBasicInfo
(
connKey
,
req
);
hbGetUserBasicInfo
(
connKey
,
hbParam
,
req
);
}
code
=
hbGetExpiredUserInfo
(
connKey
,
pCatalog
,
req
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
if
(
hbParam
->
reqCnt
==
0
)
{
code
=
hbGetExpiredUserInfo
(
connKey
,
pCatalog
,
req
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
code
=
hbGetExpiredDBInfo
(
connKey
,
pCatalog
,
req
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
code
=
hbGetExpiredDBInfo
(
connKey
,
pCatalog
,
req
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
code
=
hbGetExpiredStbInfo
(
connKey
,
pCatalog
,
req
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
code
=
hbGetExpiredStbInfo
(
connKey
,
pCatalog
,
req
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
}
++
hbParam
->
reqCnt
;
// success to get catalog info
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -771,55 +791,47 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
}
int32_t
connKeyCnt
=
atomic_load_32
(
&
pAppHbMgr
->
connKeyCnt
);
pBatchReq
->
reqs
=
taosArrayInit
(
connKeyCnt
,
sizeof
(
SClientHbReq
));
int64_t
rid
=
-
1
;
int32_t
code
=
0
;
void
*
pIter
=
taosHashIterate
(
pAppHbMgr
->
activeInfo
,
NULL
);
SClientHbReq
*
pOneReq
=
pIter
;
SClientHbKey
*
connKey
=
pOneReq
?
&
pOneReq
->
connKey
:
NULL
;
if
(
connKey
!=
NULL
)
rid
=
connKey
->
tscRid
;
STscObj
*
pTscObj
=
(
STscObj
*
)
acquireTscObj
(
rid
);
if
(
pTscObj
==
NULL
)
{
if
(
!
pBatchReq
->
reqs
)
{
tFreeClientHbBatchReq
(
pBatchReq
);
return
NULL
;
}
while
(
pIter
!=
NULL
)
{
void
*
pIter
=
NULL
;
SHbParam
param
=
{
0
};
while
((
pIter
=
taosHashIterate
(
pAppHbMgr
->
activeInfo
,
pIter
)))
{
SClientHbReq
*
pOneReq
=
pIter
;
SClientHbKey
*
connKey
=
&
pOneReq
->
connKey
;
STscObj
*
pTscObj
=
(
STscObj
*
)
acquireTscObj
(
connKey
->
tscRid
);
if
(
!
pTscObj
)
{
continue
;
}
pOneReq
=
taosArrayPush
(
pBatchReq
->
reqs
,
pOneReq
);
SHbParam
param
;
switch
(
pOneReq
->
connKey
.
connType
)
{
switch
(
connKey
->
connType
)
{
case
CONN_TYPE__QUERY
:
{
param
.
clusterId
=
pOneReq
->
clusterId
;
if
(
param
.
clusterId
==
0
)
{
// init
param
.
clusterId
=
pOneReq
->
clusterId
;
param
.
passVer
=
INT32_MIN
;
}
param
.
passKeyCnt
=
atomic_load_32
(
&
pAppHbMgr
->
passKeyCnt
);
break
;
}
default:
break
;
}
if
(
clientHbMgr
.
reqHandle
[
pOneReq
->
connKey
.
connType
])
{
code
=
(
*
clientHbMgr
.
reqHandle
[
pOneReq
->
connKey
.
connType
])(
&
pOneReq
->
connKey
,
&
param
,
pOneReq
);
if
(
clientHbMgr
.
reqHandle
[
connKey
->
connType
])
{
int32_t
code
=
(
*
clientHbMgr
.
reqHandle
[
connKey
->
connType
])(
connKey
,
&
param
,
pOneReq
);
if
(
code
)
{
tscWarn
(
"hbGatherAllInfo failed since %s, tscRid:%"
PRIi64
", connType:%"
PRIi8
,
tstrerror
(
code
),
pOneReq
->
connKey
.
tscRid
,
pOneReq
->
connKey
.
connType
);
connKey
->
tscRid
,
connKey
->
connType
);
}
}
break
;
#if 0
if (code) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
pOneReq = pIter;
continue;
}
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
pOneReq = pIter;
#endif
releaseTscObj
(
connKey
->
tscRid
);
}
releaseTscObj
(
rid
);
return
pBatchReq
;
}
...
...
@@ -890,7 +902,6 @@ static void *hbThreadFunc(void *param) {
hbGatherAppInfo
();
}
SArray
*
mgr
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SAppHbMgr
*
pAppHbMgr
=
taosArrayGetP
(
clientHbMgr
.
appHbMgrs
,
i
);
if
(
pAppHbMgr
==
NULL
)
{
...
...
@@ -899,7 +910,6 @@ static void *hbThreadFunc(void *param) {
int32_t
connCnt
=
atomic_load_32
(
&
pAppHbMgr
->
connKeyCnt
);
if
(
connCnt
==
0
)
{
taosArrayPush
(
mgr
,
&
pAppHbMgr
);
continue
;
}
SClientHbBatchReq
*
pReq
=
hbGatherAllInfo
(
pAppHbMgr
);
...
...
@@ -913,7 +923,6 @@ static void *hbThreadFunc(void *param) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tFreeClientHbBatchReq
(
pReq
);
// hbClearReqInfo(pAppHbMgr);
taosArrayPush
(
mgr
,
&
pAppHbMgr
);
break
;
}
...
...
@@ -925,7 +934,6 @@ static void *hbThreadFunc(void *param) {
tFreeClientHbBatchReq
(
pReq
);
// hbClearReqInfo(pAppHbMgr);
taosMemoryFree
(
buf
);
taosArrayPush
(
mgr
,
&
pAppHbMgr
);
break
;
}
pInfo
->
fp
=
hbAsyncCallBack
;
...
...
@@ -946,12 +954,8 @@ static void *hbThreadFunc(void *param) {
// hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32
(
&
pAppHbMgr
->
reportCnt
,
1
);
taosArrayPush
(
mgr
,
&
pAppHbMgr
);
}
taosArrayDestroy
(
clientHbMgr
.
appHbMgrs
);
clientHbMgr
.
appHbMgrs
=
mgr
;
taosThreadMutexUnlock
(
&
clientHbMgr
.
lock
);
taosMsleep
(
HEARTBEAT_INTERVAL
);
...
...
source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
浏览文件 @
ffee5ebc
...
...
@@ -91,6 +91,7 @@ static void *dmCrashReportThreadFp(void *param) {
dError
(
"failed to send crash report"
);
if
(
pFile
)
{
taosReleaseCrashLogFile
(
pFile
,
false
);
pFile
=
NULL
;
continue
;
}
}
else
{
...
...
@@ -110,6 +111,7 @@ static void *dmCrashReportThreadFp(void *param) {
if
(
pFile
)
{
taosReleaseCrashLogFile
(
pFile
,
truncateFile
);
pFile
=
NULL
;
truncateFile
=
false
;
}
...
...
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
浏览文件 @
ffee5ebc
...
...
@@ -18,6 +18,7 @@
#include "dmNodes.h"
#include "index.h"
#include "qworker.h"
#include "tstream.h"
static
bool
dmRequireNode
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
)
{
SMgmtInputOpt
input
=
dmBuildMgmtInputOpt
(
pWrapper
);
...
...
@@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
}
indexInit
(
tsNumOfCommitThreads
);
streamMetaInit
();
dmReportStartup
(
"dnode-transport"
,
"initialized"
);
dDebug
(
"dnode is created, ptr:%p"
,
pDnode
);
...
...
@@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupServer
(
pDnode
);
dmClearVars
(
pDnode
);
rpcCleanup
();
streamMetaCleanup
();
indexCleanup
();
taosConvDestroy
();
dDebug
(
"dnode is closed, ptr:%p"
,
pDnode
);
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
ffee5ebc
...
...
@@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t
sz
=
taosArrayGetSize
(
pTasks
);
for
(
int32_t
j
=
0
;
j
<
sz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGetP
(
pTasks
,
j
);
if
(
mndPauseStreamTask
(
pTrans
,
pTask
)
<
0
)
{
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
&&
mndPauseStreamTask
(
pTrans
,
pTask
)
<
0
)
{
return
-
1
;
}
}
...
...
@@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t
sz
=
taosArrayGetSize
(
pTasks
);
for
(
int32_t
j
=
0
;
j
<
sz
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGetP
(
pTasks
,
j
);
if
(
mndResumeStreamTask
(
pTrans
,
pTask
,
igUntreated
)
<
0
)
{
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
&&
mndResumeStreamTask
(
pTrans
,
pTask
,
igUntreated
)
<
0
)
{
return
-
1
;
}
}
...
...
source/libs/executor/src/timesliceoperator.c
浏览文件 @
ffee5ebc
...
...
@@ -899,8 +899,10 @@ void destroyTimeSliceOperatorInfo(void* param) {
}
taosArrayDestroy
(
pInfo
->
pLinearInfo
);
taosMemoryFree
(
pInfo
->
pPrevGroupKey
->
pData
);
taosMemoryFree
(
pInfo
->
pPrevGroupKey
);
if
(
pInfo
->
pPrevGroupKey
)
{
taosMemoryFree
(
pInfo
->
pPrevGroupKey
->
pData
);
taosMemoryFree
(
pInfo
->
pPrevGroupKey
);
}
cleanupExprSupp
(
&
pInfo
->
scalarSup
);
...
...
source/libs/stream/inc/streamBackendRocksdb.h
浏览文件 @
ffee5ebc
...
...
@@ -16,8 +16,6 @@
#ifndef _STREAM_BACKEDN_ROCKSDB_H_
#define _STREAM_BACKEDN_ROCKSDB_H_
#include "executor.h"
#include "rocksdb/c.h"
// #include "streamInc.h"
#include "streamState.h"
...
...
@@ -112,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
void
streamStateDestroy_rocksdb
(
SStreamState
*
pState
,
bool
remove
);
void
*
streamStateCreateBatch
();
int32_t
streamStateGetBatchSize
(
void
*
pBatch
);
void
streamStateClearBatch
(
void
*
pBatch
);
void
streamStateDestroyBatch
(
void
*
pBatch
);
int32_t
streamStatePutBatch
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_writebatch_t
*
pBatch
,
void
*
key
,
void
*
val
,
int32_t
vlen
);
int32_t
streamStatePutBatch_rocksdb
(
SStreamState
*
pState
,
void
*
pBatch
);
// default cf
int32_t
streamDefaultPut_rocksdb
(
SStreamState
*
pState
,
const
void
*
key
,
void
*
pVal
,
int32_t
pVLen
);
int32_t
streamDefaultGet_rocksdb
(
SStreamState
*
pState
,
const
void
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
...
...
@@ -138,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch);
void
streamStateClearBatch
(
void
*
pBatch
);
void
streamStateDestroyBatch
(
void
*
pBatch
);
int32_t
streamStatePutBatch
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_writebatch_t
*
pBatch
,
void
*
key
,
void
*
val
,
int32_t
vlen
);
void
*
val
,
int32_t
vlen
,
int64_t
ttl
);
int32_t
streamStatePutBatch_rocksdb
(
SStreamState
*
pState
,
void
*
pBatch
);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif
\ No newline at end of file
source/libs/stream/inc/streamInc.h
浏览文件 @
ffee5ebc
...
...
@@ -16,9 +16,12 @@
#ifndef _STREAM_INC_H_
#define _STREAM_INC_H_
//#include "executor.h"
#include "executor.h"
#include "query.h"
#include "tstream.h"
#include "trpc.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
ffee5ebc
...
...
@@ -13,8 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// #include "streamStateRocksdb.h"
#include "streamBackendRocksdb.h"
#include "executor.h"
#include "query.h"
#include "tcommon.h"
typedef
struct
SCompactFilteFactory
{
...
...
@@ -110,6 +111,9 @@ void* streamBackendInit(const char* path) {
taosMemoryFreeClear
(
err
);
}
}
else
{
/*
list all cf and get prefix
*/
int64_t
streamId
;
int32_t
taskId
,
dummpy
=
0
;
SHashObj
*
tbl
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
...
...
@@ -649,18 +653,7 @@ const char* compactFilteFactoryName(void* arg) {
void
destroyCompactFilte
(
void
*
arg
)
{
(
void
)
arg
;
}
unsigned
char
compactFilte
(
void
*
arg
,
int
level
,
const
char
*
key
,
size_t
klen
,
const
char
*
val
,
size_t
vlen
,
char
**
newval
,
size_t
*
newvlen
,
unsigned
char
*
value_changed
)
{
// int64_t unixTime = taosGetTimestampMs();
if
(
streamStateValueIsStale
((
char
*
)
val
))
{
return
1
;
}
// SStreamValue value;
// memset(&value, 0, sizeof(value));
// streamValueDecode(&value, (char*)val);
// taosMemoryFree(value.data);
// if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) {
// return 1;
// }
return
0
;
return
streamStateValueIsStale
((
char
*
)
val
)
?
1
:
0
;
}
const
char
*
compactFilteName
(
void
*
arg
)
{
return
"stream_filte"
;
}
...
...
@@ -703,7 +696,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
memcpy
(
cfNames
[
0
],
"default"
,
strlen
(
"default"
));
continue
;
}
qError
(
"cf name %s"
,
idstr
);
GEN_COLUMN_FAMILY_NAME
(
cfNames
[
i
],
idstr
,
ginitDict
[(
i
-
1
)
%
(
cfLen
)].
key
);
if
(
i
%
cfLen
==
0
)
{
...
...
@@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
if
(
pIter
!=
NULL
)
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
}
}
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
qError
(
"cf name %s"
,
cfNames
[
i
]);
}
rocksdb_options_t
**
cfOpts
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
rocksdb_options_t
*
));
RocksdbCfParam
*
params
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
RocksdbCfParam
*
));
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
...
...
@@ -858,7 +847,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
if
(
err
!=
NULL
)
{
qError
(
"failed to create cf:%s_%s, reason:%s"
,
pState
->
pTdbState
->
idstr
,
ginitDict
[
i
].
key
,
err
);
taosMemoryFreeClear
(
err
);
// return -1;
}
}
pState
->
pTdbState
->
rocksdb
=
handle
->
db
;
...
...
@@ -1012,53 +1000,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
taosMemoryFree(ttlV); \
} while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
if (err != NULL) taosMemoryFree(err); \
code = -1; \
} else { \
char * p = NULL, *end = NULL; \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \
if (len < 0) { \
qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \
code = -1; \
} else { \
qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \
} \
if (pVal != NULL) { \
*pVal = p; \
} else { \
taosMemoryFree(p); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = len; \
} \
if (err != NULL) { \
taosMemoryFree(err); \
qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
code = -1; \
} else { \
if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
} \
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
if (err == NULL) { \
qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
funcname); \
} else { \
qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
err); \
taosMemoryFreeClear(err); \
} \
code = -1; \
} else { \
char* p = NULL; \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (len < 0) { \
qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
funcname); \
code = -1; \
} else { \
qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
len); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = len; \
} \
if (code == 0) \
qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
...
...
@@ -1133,10 +1119,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// eLen);
if
(
err
!=
NULL
)
{
qWarn
(
"failed to delete range cf(state) err: %s, "
"start: %s, end:%s"
,
err
,
toStringStart
,
toStringEnd
);
qWarn
(
"failed to delete range cf(state) start: %s, end:%s, reason:%s"
,
toStringStart
,
toStringEnd
,
err
);
taosMemoryFree
(
err
);
}
...
...
@@ -1588,20 +1571,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
if
(
!
rocksdb_iter_valid
(
pCur
->
iter
)
||
iterValueIsStale
(
pCur
->
iter
))
{
return
-
1
;
}
size_t
t
len
;
char
*
keyStr
=
(
char
*
)
rocksdb_iter_key
(
pCur
->
iter
,
&
t
len
);
size_t
klen
,
v
len
;
char
*
keyStr
=
(
char
*
)
rocksdb_iter_key
(
pCur
->
iter
,
&
k
len
);
winKeyDecode
(
&
winKey
,
keyStr
);
size_t
vlen
=
0
;
const
char
*
valStr
=
rocksdb_iter_value
(
pCur
->
iter
,
&
vlen
);
char
*
dst
=
NULL
;
int32_t
len
=
decodeValueFunc
((
void
*
)
valStr
,
vlen
,
NULL
,
&
dst
);
//
char* dst = NULL;
int32_t
len
=
decodeValueFunc
((
void
*
)
valStr
,
vlen
,
NULL
,
(
char
**
)
pVal
);
if
(
len
<
0
)
{
return
-
1
;
}
if
(
pVal
!=
NULL
)
*
pVal
=
(
char
*
)
dst
;
if
(
pVLen
!=
NULL
)
*
pVLen
=
vlen
;
if
(
pVLen
!=
NULL
)
*
pVLen
=
len
;
*
pKey
=
winKey
;
return
0
;
...
...
@@ -1999,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) {
void
streamStateClearBatch
(
void
*
pBatch
)
{
rocksdb_writebatch_clear
((
rocksdb_writebatch_t
*
)
pBatch
);
}
void
streamStateDestroyBatch
(
void
*
pBatch
)
{
rocksdb_writebatch_destroy
((
rocksdb_writebatch_t
*
)
pBatch
);
}
int32_t
streamStatePutBatch
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_writebatch_t
*
pBatch
,
void
*
key
,
void
*
val
,
int32_t
vlen
)
{
void
*
val
,
int32_t
vlen
,
int64_t
ttl
)
{
int
i
=
streamGetInit
(
cfName
);
if
(
i
<
0
)
{
...
...
@@ -2010,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
int32_t
klen
=
ginitDict
[
i
].
enFunc
((
void
*
)
key
,
buf
);
char
*
ttlV
=
NULL
;
int32_t
ttlVLen
=
ginitDict
[
i
].
enValueFunc
(
val
,
vlen
,
0
,
&
ttlV
);
int32_t
ttlVLen
=
ginitDict
[
i
].
enValueFunc
(
val
,
vlen
,
ttl
,
&
ttlV
);
rocksdb_column_family_handle_t
*
pCf
=
pState
->
pTdbState
->
pHandle
[
ginitDict
[
i
].
idx
];
rocksdb_writebatch_put_cf
((
rocksdb_writebatch_t
*
)
pBatch
,
pCf
,
buf
,
(
size_t
)
klen
,
ttlV
,
(
size_t
)
ttlVLen
);
taosMemoryFree
(
ttlV
);
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
ffee5ebc
...
...
@@ -20,12 +20,12 @@
#define MIN_STREAM_EXEC_BATCH_NUM 16
bool
streamTaskShouldStop
(
const
SStreamStatus
*
pStatus
)
{
int32_t
status
=
atomic_load_8
((
int8_t
*
)
&
pStatus
->
taskStatus
);
int32_t
status
=
atomic_load_8
((
int8_t
*
)
&
pStatus
->
taskStatus
);
return
(
status
==
TASK_STATUS__STOP
)
||
(
status
==
TASK_STATUS__DROPPING
);
}
bool
streamTaskShouldPause
(
const
SStreamStatus
*
pStatus
)
{
int32_t
status
=
atomic_load_8
((
int8_t
*
)
&
pStatus
->
taskStatus
);
int32_t
status
=
atomic_load_8
((
int8_t
*
)
&
pStatus
->
taskStatus
);
return
(
status
==
TASK_STATUS__PAUSE
);
}
...
...
@@ -35,7 +35,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
while
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
{
int8_t
status
=
atomic_load_8
(
&
pTask
->
status
.
taskStatus
);
if
(
status
!=
TASK_STATUS__NORMAL
)
{
if
(
status
!=
TASK_STATUS__NORMAL
&&
status
!=
TASK_STATUS__PAUSE
)
{
qError
(
"stream task wait for the end of fill history, s-task:%s, status:%d"
,
pTask
->
id
.
idStr
,
atomic_load_8
(
&
pTask
->
status
.
taskStatus
));
taosMsleep
(
2
);
...
...
@@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
);
const
SStreamDataSubmit2
*
pSubmit
=
(
const
SStreamDataSubmit2
*
)
data
;
qSetMultiStreamInput
(
pExecutor
,
&
pSubmit
->
submit
,
1
,
STREAM_INPUT__DATA_SUBMIT
);
qDebug
(
"s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%"
PRId64
,
pTask
->
id
.
idStr
,
pSubmit
,
pSubmit
->
submit
.
msgStr
,
pSubmit
->
submit
.
msgLen
,
pSubmit
->
submit
.
ver
);
qDebug
(
"s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%"
PRId64
,
pTask
->
id
.
idStr
,
pSubmit
,
pSubmit
->
submit
.
msg
Str
,
pSubmit
->
submit
.
msg
Len
,
pSubmit
->
submit
.
ver
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_BLOCK
||
pItem
->
type
==
STREAM_INPUT__DATA_RETRIEVE
)
{
const
SStreamDataBlock
*
pBlock
=
(
const
SStreamDataBlock
*
)
data
;
SArray
*
pBlockList
=
pBlock
->
blocks
;
int32_t
numOfBlocks
=
taosArrayGetSize
(
pBlockList
);
qDebug
(
"s-task:%s set sdata blocks as input num:%d, ver:%"
PRId64
,
pTask
->
id
.
idStr
,
numOfBlocks
,
pBlock
->
sourceVer
);
qDebug
(
"s-task:%s set sdata blocks as input num:%d, ver:%"
PRId64
,
pTask
->
id
.
idStr
,
numOfBlocks
,
pBlock
->
sourceVer
);
qSetMultiStreamInput
(
pExecutor
,
pBlockList
->
pData
,
numOfBlocks
,
STREAM_INPUT__DATA_BLOCK
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__MERGED_SUBMIT
)
{
const
SStreamMergedSubmit2
*
pMerged
=
(
const
SStreamMergedSubmit2
*
)
data
;
...
...
@@ -202,7 +202,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
qRes
->
blocks
=
pRes
;
code
=
streamTaskOutput
(
pTask
,
qRes
);
if
(
code
==
TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY
)
{
taosFreeQitem
(
pRes
);
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
taosFreeQitem
(
qRes
);
return
code
;
}
...
...
@@ -332,12 +333,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int64_t
ckId
=
0
;
int64_t
dataVer
=
0
;
qGetCheckpointVersion
(
pTask
->
exec
.
pExecutor
,
&
dataVer
,
&
ckId
);
if
(
ckId
>
pTask
->
chkInfo
.
id
)
{
// save it since the checkpoint is updated
if
(
ckId
>
pTask
->
chkInfo
.
id
)
{
// save it since the checkpoint is updated
qDebug
(
"s-task:%s exec end, start to update check point, ver from %"
PRId64
" to %"
PRId64
", checkPoint id:%"
PRId64
" -> %"
PRId64
,
pTask
->
id
.
idStr
,
pTask
->
chkInfo
.
version
,
dataVer
,
pTask
->
chkInfo
.
id
,
ckId
);
pTask
->
chkInfo
=
(
SCheckpointInfo
)
{.
version
=
dataVer
,
.
id
=
ckId
,
.
currentVer
=
pTask
->
chkInfo
.
currentVer
};
pTask
->
chkInfo
=
(
SCheckpointInfo
){.
version
=
dataVer
,
.
id
=
ckId
,
.
currentVer
=
pTask
->
chkInfo
.
currentVer
};
taosWLockLatch
(
&
pTask
->
pMeta
->
lock
);
...
...
@@ -407,7 +408,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
atomic_store_8
(
&
pTask
->
status
.
schedStatus
,
TASK_SCHED_STATUS__INACTIVE
);
qDebug
(
"s-task:%s exec completed"
,
pTask
->
id
.
idStr
);
if
(
!
taosQueueEmpty
(
pTask
->
inputQueue
->
queue
)
&&
(
!
streamTaskShouldStop
(
&
pTask
->
status
)))
{
if
(
!
taosQueueEmpty
(
pTask
->
inputQueue
->
queue
)
&&
(
!
streamTaskShouldStop
(
&
pTask
->
status
))
&&
(
!
streamTaskShouldPause
(
&
pTask
->
status
))
)
{
streamSchedExec
(
pTask
);
}
}
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
ffee5ebc
...
...
@@ -19,6 +19,13 @@
#include "tref.h"
#include "ttimer.h"
static
TdThreadOnce
streamMetaModuleInit
=
PTHREAD_ONCE_INIT
;
static
int32_t
streamBackendId
=
0
;
static
void
streamMetaEnvInit
()
{
streamBackendId
=
taosOpenRef
(
20
,
streamBackendCleanup
);
}
void
streamMetaInit
()
{
taosThreadOnce
(
&
streamMetaModuleInit
,
streamMetaEnvInit
);
}
void
streamMetaCleanup
()
{
taosCloseRef
(
streamBackendId
);
}
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
,
int32_t
vgId
)
{
int32_t
code
=
-
1
;
SStreamMeta
*
pMeta
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamMeta
));
...
...
@@ -32,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
sprintf
(
streamPath
,
"%s/%s"
,
path
,
"stream"
);
pMeta
->
path
=
taosStrdup
(
streamPath
);
if
(
tdbOpen
(
pMeta
->
path
,
16
*
1024
,
1
,
&
pMeta
->
db
,
0
)
<
0
)
{
taosMemoryFree
(
streamPath
);
goto
_err
;
}
memset
(
streamPath
,
0
,
len
);
sprintf
(
streamPath
,
"%s/%s"
,
pMeta
->
path
,
"checkpoints"
);
code
=
taosMulModeMkDir
(
streamPath
,
0755
);
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
taosMemoryFree
(
streamPath
);
goto
_err
;
}
taosMemoryFree
(
streamPath
);
if
(
tdbTbOpen
(
"task.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pMeta
->
db
,
&
pMeta
->
pTaskDb
,
0
)
<
0
)
{
goto
_err
;
...
...
@@ -74,26 +79,26 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta
->
vgId
=
vgId
;
pMeta
->
ahandle
=
ahandle
;
pMeta
->
expandFunc
=
expandFunc
;
pMeta
->
streamBackendId
=
streamBackendId
;
char
*
statePath
=
taosMemoryCalloc
(
1
,
len
);
sprintf
(
st
ate
Path
,
"%s/%s"
,
pMeta
->
path
,
"state"
);
code
=
taosMulModeMkDir
(
st
ate
Path
,
0755
);
memset
(
streamPath
,
0
,
len
);
sprintf
(
st
ream
Path
,
"%s/%s"
,
pMeta
->
path
,
"state"
);
code
=
taosMulModeMkDir
(
st
ream
Path
,
0755
);
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
taosMemoryFree
(
streamPath
);
goto
_err
;
}
pMeta
->
streamBackend
=
streamBackendInit
(
statePath
);
pMeta
->
streamBackendId
=
taosOpenRef
(
20
,
streamBackendCleanup
);
pMeta
->
streamBackendRid
=
taosAddRef
(
pMeta
->
streamBackendId
,
pMeta
->
streamBackend
);
pMeta
->
streamBackend
=
streamBackendInit
(
streamPath
);
pMeta
->
streamBackendRid
=
taosAddRef
(
streamBackendId
,
pMeta
->
streamBackend
);
taosMemoryFree
(
st
ate
Path
);
taosMemoryFree
(
st
ream
Path
);
taosInitRWLatch
(
&
pMeta
->
lock
);
return
pMeta
;
_err:
taosMemoryFree
(
streamPath
);
taosMemoryFree
(
pMeta
->
path
);
if
(
pMeta
->
pTasks
)
taosHashCleanup
(
pMeta
->
pTasks
);
if
(
pMeta
->
pTaskList
)
taosArrayDestroy
(
pMeta
->
pTaskList
);
...
...
@@ -129,9 +134,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
taosHashCleanup
(
pMeta
->
pTasks
);
taosRemoveRef
(
pMeta
->
streamBackendId
,
pMeta
->
streamBackendRid
);
// streamBackendCleanup(pMeta->streamBackend);
taosCloseRef
(
pMeta
->
streamBackendId
);
taosRemoveRef
(
streamBackendId
,
pMeta
->
streamBackendRid
);
pMeta
->
pTaskList
=
taosArrayDestroy
(
pMeta
->
pTaskList
);
taosMemoryFree
(
pMeta
->
path
);
taosMemoryFree
(
pMeta
);
...
...
@@ -265,13 +268,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask
**
ppTask
=
(
SStreamTask
**
)
taosHashGet
(
pMeta
->
pTasks
,
&
taskId
,
sizeof
(
int32_t
));
if
(
ppTask
)
{
SStreamTask
*
pTask
=
*
ppTask
;
// taosWLockLatch(&pMeta->lock);
taosHashRemove
(
pMeta
->
pTasks
,
&
taskId
,
sizeof
(
int32_t
));
tdbTbDelete
(
pMeta
->
pTaskDb
,
&
taskId
,
sizeof
(
int32_t
),
pMeta
->
txn
);
//
atomic_store_8
(
&
pTask
->
status
.
taskStatus
,
TASK_STATUS__DROPPING
);
int32_t
num
=
taosArrayGetSize
(
pMeta
->
pTaskList
);
...
...
source/libs/stream/src/streamState.c
浏览文件 @
ffee5ebc
...
...
@@ -115,7 +115,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
pState
->
taskId
=
pTask
->
id
.
taskId
;
pState
->
streamId
=
pTask
->
id
.
streamId
;
#ifdef USE_ROCKSDB
qWarn
(
"open stream state1"
);
//
qWarn("open stream state1");
taosAcquireRef
(
pTask
->
pMeta
->
streamBackendId
,
pTask
->
pMeta
->
streamBackendRid
);
int
code
=
streamStateOpenBackend
(
pTask
->
pMeta
->
streamBackend
,
pState
);
if
(
code
==
-
1
)
{
...
...
@@ -220,6 +220,7 @@ void streamStateClose(SStreamState* pState, bool remove) {
#ifdef USE_ROCKSDB
// streamStateCloseBackend(pState);
streamStateDestroy
(
pState
,
remove
);
taosReleaseRef
(
pTask
->
pMeta
->
streamBackendId
,
pTask
->
pMeta
->
streamBackendRid
);
#else
tdbCommit
(
pState
->
pTdbState
->
db
,
pState
->
pTdbState
->
txn
);
tdbPostCommit
(
pState
->
pTdbState
->
db
,
pState
->
pTdbState
->
txn
);
...
...
@@ -231,7 +232,6 @@ void streamStateClose(SStreamState* pState, bool remove) {
tdbTbClose
(
pState
->
pTdbState
->
pParTagDb
);
tdbClose
(
pState
->
pTdbState
->
db
);
#endif
taosReleaseRef
(
pTask
->
pMeta
->
streamBackendId
,
pTask
->
pMeta
->
streamBackendRid
);
}
int32_t
streamStateBegin
(
SStreamState
*
pState
)
{
...
...
@@ -399,7 +399,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo
int32_t
code
=
0
;
void
*
batch
=
streamStateCreateBatch
();
code
=
streamStatePutBatch
(
pState
,
"default"
,
batch
,
pKey
,
pVal
,
vLen
);
code
=
streamStatePutBatch
(
pState
,
"default"
,
batch
,
pKey
,
pVal
,
vLen
,
0
);
if
(
code
!=
0
)
{
return
code
;
}
...
...
source/libs/stream/src/tstreamFileState.c
浏览文件 @
ffee5ebc
...
...
@@ -15,6 +15,7 @@
#include "tstreamFileState.h"
#include "query.h"
#include "streamBackendRocksdb.h"
#include "taos.h"
#include "tcommon.h"
...
...
@@ -154,9 +155,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
clearExpiredRowBuff
(
pFileState
,
0
,
true
);
}
bool
needClearDiskBuff
(
SStreamFileState
*
pFileState
)
{
return
pFileState
->
flushMark
>
0
;
}
bool
needClearDiskBuff
(
SStreamFileState
*
pFileState
)
{
return
pFileState
->
flushMark
>
0
;
}
void
popUsedBuffs
(
SStreamFileState
*
pFileState
,
SStreamSnapshot
*
pFlushList
,
uint64_t
max
,
bool
used
)
{
uint64_t
i
=
0
;
...
...
@@ -325,7 +324,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
void
releaseRowBuffPos
(
SRowBuffPos
*
pBuff
)
{
pBuff
->
beUsed
=
false
;
}
SStreamSnapshot
*
getSnapshot
(
SStreamFileState
*
pFileState
)
{
clearExpiredRowBuff
(
pFileState
,
pFileState
->
maxTs
-
pFileState
->
deleteMark
,
false
);
int64_t
mark
=
(
INT64_MIN
+
pFileState
->
deleteMark
>=
pFileState
->
maxTs
)
?
INT64_MIN
:
pFileState
->
maxTs
-
pFileState
->
deleteMark
;
clearExpiredRowBuff
(
pFileState
,
mark
,
false
);
return
pFileState
->
usedBuffs
;
}
...
...
@@ -356,7 +357,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
}
SStateKey
sKey
=
{.
key
=
*
((
SWinKey
*
)
pPos
->
pKey
),
.
opNum
=
((
SStreamState
*
)
pFileState
->
pFileStore
)
->
number
};
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"state"
,
batch
,
&
sKey
,
pPos
->
pRowBuff
,
pFileState
->
rowSize
);
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"state"
,
batch
,
&
sKey
,
pPos
->
pRowBuff
,
pFileState
->
rowSize
,
0
);
qDebug
(
"===stream===put %"
PRId64
" to disc, res %d"
,
sKey
.
key
.
ts
,
code
);
}
if
(
streamStateGetBatchSize
(
batch
)
>
0
)
{
...
...
@@ -372,7 +373,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t
len
=
0
;
sprintf
(
keyBuf
,
"%s:%"
PRId64
""
,
taskKey
,
((
SStreamState
*
)
pFileState
->
pFileStore
)
->
checkPointId
);
streamFileStateEncode
(
&
pFileState
->
flushMark
,
&
valBuf
,
&
len
);
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"default"
,
batch
,
keyBuf
,
valBuf
,
len
);
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"default"
,
batch
,
keyBuf
,
valBuf
,
len
,
0
);
taosMemoryFree
(
valBuf
);
}
{
...
...
@@ -381,7 +382,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t
len
=
0
;
memcpy
(
keyBuf
,
taskKey
,
strlen
(
taskKey
));
len
=
sprintf
(
valBuf
,
"%"
PRId64
""
,
((
SStreamState
*
)
pFileState
->
pFileStore
)
->
checkPointId
);
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"default"
,
batch
,
keyBuf
,
valBuf
,
len
);
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"default"
,
batch
,
keyBuf
,
valBuf
,
len
,
0
);
}
streamStatePutBatch_rocksdb
(
pFileState
->
pFileStore
,
batch
);
}
...
...
@@ -440,7 +441,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t
recoverSnapshot
(
SStreamFileState
*
pFileState
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
deleteExpiredCheckPoint
(
pFileState
,
pFileState
->
maxTs
-
pFileState
->
deleteMark
);
int64_t
mark
=
(
INT64_MIN
+
pFileState
->
deleteMark
>=
pFileState
->
maxTs
)
?
INT64_MIN
:
pFileState
->
maxTs
-
pFileState
->
deleteMark
;
deleteExpiredCheckPoint
(
pFileState
,
mark
);
void
*
pStVal
=
NULL
;
int32_t
len
=
0
;
...
...
source/libs/stream/test/CMakeLists.txt
浏览文件 @
ffee5ebc
...
...
@@ -10,7 +10,7 @@ ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
TARGET_LINK_LIBRARIES
(
streamUpdateTest
PUBLIC os util common gtest stream
PUBLIC os util common gtest
gtest_main
stream
)
TARGET_INCLUDE_DIRECTORIES
(
...
...
source/libs/stream/test/tstreamUpdateTest.cpp
浏览文件 @
ffee5ebc
#include <gtest/gtest.h>
#include "streamBackendRocksdb.h"
#include "tstream.h"
#include "tstreamUpdate.h"
#include "ttime.h"
using
namespace
std
;
#define MAX_NUM_SCALABLE_BF 100000
class
StreamStateEnv
:
public
::
testing
::
Test
{
protected:
virtual
void
SetUp
()
{
streamMetaInit
();
backend
=
streamBackendInit
(
path
);
}
virtual
void
TearDown
()
{
streamMetaCleanup
();
// indexClose(index);
}
const
char
*
path
=
TD_TMP_DIR_PATH
"stream"
;
void
*
backend
;
};
bool
equalSBF
(
SScalableBf
*
left
,
SScalableBf
*
right
)
{
if
(
left
->
growth
!=
right
->
growth
)
return
false
;
if
(
left
->
numBits
!=
right
->
numBits
)
return
false
;
...
...
@@ -191,8 +208,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
// updateInfoDestroy(pSU6);
// updateInfoDestroy(pSU7);
}
int
main
(
int
argc
,
char
*
argv
[])
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
\ No newline at end of file
// TEST()
TEST
(
StreamStateEnv
,
test1
)
{}
// int main(int argc, char *argv[]) {
// testing::InitGoogleTest(&argc, argv);
// return RUN_ALL_TESTS();
// }
\ No newline at end of file
tests/script/tsim/alter/table.sim
浏览文件 @
ffee5ebc
...
...
@@ -657,36 +657,33 @@ if $data20 != null then
return -1
endi
#print =============== error for normal table
#sql create table tb2023(ts timestamp, f int);
#sql_error alter table tb2023 add column v varchar(65535);
#sql_error alter table tb2023 add column v varchar(65535);
#sql_error alter table tb2023 add column v varchar(65530);
#sql alter table tb2023 add column v varchar(16374);
#sql_error alter table tb2023 modify column v varchar(65536);
#sql desc tb2023
#sql alter table tb2023 drop column v
#sql_error alter table tb2023 add column v nchar(16384);
#sql alter table tb2023 add column v nchar(4093);
#sql_error alter table tb2023 modify column v nchar(16384);
#sql_error alter table tb2023 add column v nchar(16384);
#sql alter table tb2023 drop column v
#sql alter table tb2023 add column v nchar(16374);
#sql desc tb2023
#
#print =============== error for super table
#sql create table stb2023(ts timestamp, f int) tags(t1 int);
#sql_error alter table stb2023 add column v varchar(65535);
#sql_error alter table stb2023 add column v varchar(65536);
#sql_error alter table stb2023 add column v varchar(33100);
#sql alter table stb2023 add column v varchar(16374);
#sql_error alter table stb2023 modify column v varchar(16375);
#sql desc stb2023
#sql alter table stb2023 drop column v
#sql_error alter table stb2023 add column v nchar(4094);
#sql alter table stb2023 add column v nchar(4093);
#sql_error alter table stb2023 modify column v nchar(4094);
#sql desc stb2023
print =============== error for normal table
sql create table tb2023(ts timestamp, f int);
sql_error alter table tb2023 add column v varchar(65518);
sql_error alter table tb2023 add column v varchar(65531);
sql_error alter table tb2023 add column v varchar(65535);
sql alter table tb2023 add column v varchar(65517);
sql_error alter table tb2023 modify column v varchar(65518);
sql desc tb2023
sql alter table tb2023 drop column v
sql_error alter table tb2023 add column v nchar(16380);
sql alter table tb2023 add column v nchar(16379);
sql_error alter table tb2023 modify column v nchar(16380);
sql desc tb2023
print =============== error for super table
sql create table stb2023(ts timestamp, f int) tags(t1 int);
sql_error alter table stb2023 add column v varchar(65518);
sql_error alter table stb2023 add column v varchar(65531);
sql_error alter table stb2023 add column v varchar(65535);
sql alter table stb2023 add column v varchar(65517);
sql_error alter table stb2023 modify column v varchar(65518);
sql desc stb2023
sql alter table stb2023 drop column v
sql_error alter table stb2023 add column v nchar(16380);
sql alter table stb2023 add column v nchar(16379);
sql_error alter table stb2023 modify column v nchar(16380);
sql desc stb2023
print ======= over
sql drop database d1
...
...
tests/script/tsim/parser/alter_column.sim
浏览文件 @
ffee5ebc
...
...
@@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10);
sql_error alter table tb modify column c2 binary(9);
sql_error alter table tb modify column c2 binary(-9);
sql_error alter table tb modify column c2 binary(0);
sql_error alter table tb modify column c2 binary(65
600
);
sql_error alter table tb modify column c2 binary(65
436
);
sql_error alter table tb modify column c2 nchar(30);
sql_error alter table tb modify column c3 double;
sql_error alter table tb modify column c3 nchar(10);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录