Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
71331d79
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
71331d79
编写于
7月 07, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feat/tsdb_snapshot
上级
77f6c41d
f9c89fdd
变更
54
展开全部
隐藏空白更改
内联
并排
Showing
54 changed file
with
866 addition
and
571 deletion
+866
-571
cmake/cmake.options
cmake/cmake.options
+7
-0
examples/c/tmq.c
examples/c/tmq.c
+2
-2
include/common/tcommon.h
include/common/tcommon.h
+12
-0
include/common/tmsg.h
include/common/tmsg.h
+4
-4
include/libs/executor/executor.h
include/libs/executor/executor.h
+4
-4
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+11
-2
include/libs/wal/wal.h
include/libs/wal/wal.h
+27
-21
include/util/tutil.h
include/util/tutil.h
+1
-0
source/client/CMakeLists.txt
source/client/CMakeLists.txt
+2
-0
source/client/src/tmq.c
source/client/src/tmq.c
+13
-14
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+13
-0
source/dnode/mgmt/test/sut/src/sut.cpp
source/dnode/mgmt/test/sut/src/sut.cpp
+3
-0
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-10
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+1
-0
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+18
-14
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+3
-1
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+4
-4
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+3
-1
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+9
-4
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+12
-6
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+7
-7
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+55
-18
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+13
-6
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+122
-110
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+69
-34
source/libs/executor/test/CMakeLists.txt
source/libs/executor/test/CMakeLists.txt
+15
-13
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+4
-0
source/libs/index/test/CMakeLists.txt
source/libs/index/test/CMakeLists.txt
+103
-101
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+2
-2
source/libs/planner/test/CMakeLists.txt
source/libs/planner/test/CMakeLists.txt
+28
-26
source/libs/stream/inc/streamInc.h
source/libs/stream/inc/streamInc.h
+4
-2
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+0
-3
source/libs/sync/inc/syncRaftLog.h
source/libs/sync/inc/syncRaftLog.h
+2
-2
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+6
-6
source/libs/tfs/test/tfsTest.cpp
source/libs/tfs/test/tfsTest.cpp
+16
-0
source/libs/wal/inc/walInt.h
source/libs/wal/inc/walInt.h
+1
-0
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+148
-77
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+8
-1
source/libs/wal/test/walMetaTest.cpp
source/libs/wal/test/walMetaTest.cpp
+7
-7
source/os/src/osTimezone.c
source/os/src/osTimezone.c
+37
-21
source/util/src/tutil.c
source/util/src/tutil.c
+14
-0
tests/script/tsim/stream/session0.sim
tests/script/tsim/stream/session0.sim
+11
-11
tests/script/tsim/valgrind/basic1.sim
tests/script/tsim/valgrind/basic1.sim
+9
-23
tests/script/tsim/valgrind/basic2.sim
tests/script/tsim/valgrind/basic2.sim
+7
-2
tests/script/tsim/valgrind/checkError1.sim
tests/script/tsim/valgrind/checkError1.sim
+5
-1
tests/script/tsim/valgrind/checkError2.sim
tests/script/tsim/valgrind/checkError2.sim
+13
-2
tests/system-test/7-tmq/stbTagFilter.py
tests/system-test/7-tmq/stbTagFilter.py
+2
-2
tests/system-test/failed.txt
tests/system-test/failed.txt
+1
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+1
-1
tools/taosadapter
tools/taosadapter
+1
-1
未找到文件。
cmake/cmake.options
浏览文件 @
71331d79
...
...
@@ -51,6 +51,13 @@ IF(${TD_WINDOWS})
"If build unit tests using googletest"
ON
)
ELSEIF (TD_DARWIN_64)
add_definitions(-DCOMPILER_SUPPORTS_CXX13)
option(
BUILD_TEST
"If build unit tests using googletest"
ON
)
ELSE ()
include(CheckCXXCompilerFlag)
CHECK_CXX_COMPILER_FLAG("-std=c++13" COMPILER_SUPPORTS_CXX13)
...
...
examples/c/tmq.c
浏览文件 @
71331d79
...
...
@@ -241,7 +241,7 @@ int32_t create_topic() {
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column with meta as database abc1"
);
// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -302,7 +302,7 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"false"
);
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
...
...
include/common/tcommon.h
浏览文件 @
71331d79
...
...
@@ -32,6 +32,18 @@ enum {
TMQ_CONF__RESET_OFFSET__LATEST
=
-
1
,
};
// clang-format off
#define IS_META_MSG(x) ( \
x == TDMT_VND_CREATE_STB \
|| x == TDMT_VND_ALTER_STB \
|| x == TDMT_VND_DROP_STB \
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DROP_TTL_TABLE \
)
// clang-format on
enum
{
TMQ_MSG_TYPE__DUMMY
=
0
,
TMQ_MSG_TYPE__POLL_RSP
,
...
...
include/common/tmsg.h
浏览文件 @
71331d79
...
...
@@ -2826,8 +2826,8 @@ typedef struct {
static
FORCE_INLINE
int32_t
tEncodeSMqMetaRsp
(
void
**
buf
,
const
SMqMetaRsp
*
pRsp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
reqOffset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
rspOffset
);
//
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
//
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen
+=
taosEncodeFixedI16
(
buf
,
pRsp
->
resMsgType
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
metaRspLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pRsp
->
metaRsp
,
pRsp
->
metaRspLen
);
...
...
@@ -2835,8 +2835,8 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp
}
static
FORCE_INLINE
void
*
tDecodeSMqMetaRsp
(
const
void
*
buf
,
SMqMetaRsp
*
pRsp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
reqOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
rspOffset
);
//
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
//
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf
=
taosDecodeFixedI16
(
buf
,
&
pRsp
->
resMsgType
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
metaRspLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pRsp
->
metaRsp
,
pRsp
->
metaRspLen
);
...
...
include/libs/executor/executor.h
浏览文件 @
71331d79
...
...
@@ -30,15 +30,15 @@ struct SRpcMsg;
struct
SSubplan
;
typedef
struct
SReadHandle
{
void
*
r
eader
;
void
*
streamR
eader
;
void
*
meta
;
void
*
config
;
void
*
vnode
;
void
*
mnd
;
SMsgCb
*
pMsgCb
;
// int8_t initTsdb
Reader;
bool
tq
Reader
;
bool
initMetaReader
;
bool
initTable
Reader
;
bool
initStream
Reader
;
}
SReadHandle
;
typedef
enum
{
...
...
include/libs/stream/tstream.h
浏览文件 @
71331d79
...
...
@@ -223,7 +223,7 @@ typedef struct {
SEpSet
epSet
;
}
SStreamChildEpInfo
;
struct
SStreamTask
{
typedef
struct
SStreamTask
{
int64_t
streamId
;
int32_t
taskId
;
int8_t
isDataScan
;
...
...
@@ -235,6 +235,11 @@ struct SStreamTask {
int8_t
taskStatus
;
int8_t
execStatus
;
// exec info
int64_t
enqueueVer
;
int64_t
processedVer
;
int64_t
checkpointVer
;
// node info
int32_t
selfChildId
;
int32_t
nodeId
;
...
...
@@ -277,7 +282,7 @@ struct SStreamTask {
// msg handle
SMsgCb
*
pMsgCb
;
};
}
SStreamTask
;
int32_t
tEncodeStreamEpInfo
(
SEncoder
*
pEncoder
,
const
SStreamChildEpInfo
*
pInfo
);
int32_t
tDecodeStreamEpInfo
(
SDecoder
*
pDecoder
,
SStreamChildEpInfo
*
pInfo
);
...
...
@@ -288,6 +293,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void
tFreeSStreamTask
(
SStreamTask
*
pTask
);
static
FORCE_INLINE
int32_t
streamTaskInput
(
SStreamTask
*
pTask
,
SStreamQueueItem
*
pItem
)
{
#if 0
while (1) {
int8_t inputStatus =
atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
...
...
@@ -296,6 +302,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
}
ASSERT(0);
}
#endif
if
(
pItem
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit
*
pSubmitClone
=
streamSubmitRefClone
((
SStreamDataSubmit
*
)
pItem
);
...
...
@@ -316,8 +323,10 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
atomic_val_compare_exchange_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__IN_ACTIVE
,
TASK_TRIGGER_STATUS__ACTIVE
);
}
#if 0
// TODO: back pressure
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return
0
;
}
...
...
include/libs/wal/wal.h
浏览文件 @
71331d79
...
...
@@ -88,7 +88,7 @@ typedef struct {
EWalType
level
;
// wal level
}
SWalCfg
;
typedef
struct
SWalVer
{
typedef
struct
{
int64_t
firstVer
;
int64_t
verInSnapshotting
;
int64_t
snapshotVer
;
...
...
@@ -149,17 +149,22 @@ typedef struct SWal {
SWalCkHead
writeHead
;
}
SWal
;
// WAL HANDLE
typedef
struct
SWalReadHandle
{
SWal
*
pWal
;
TdFilePtr
pReadLogTFile
;
TdFilePtr
pReadIdxTFile
;
int64_t
curFileFirstVer
;
int64_t
curVersion
;
int64_t
capacity
;
int64_t
status
;
// if cursor valid
TdThreadMutex
mutex
;
SWalCkHead
*
pHead
;
}
SWalReadHandle
;
typedef
struct
{
int8_t
scanUncommited
;
int8_t
scanMeta
;
}
SWalFilterCond
;
typedef
struct
{
SWal
*
pWal
;
TdFilePtr
pLogFile
;
TdFilePtr
pIdxFile
;
int64_t
curFileFirstVer
;
int64_t
curVersion
;
int64_t
capacity
;
TdThreadMutex
mutex
;
SWalFilterCond
cond
;
SWalCkHead
*
pHead
;
}
SWalReader
;
// module initialization
int32_t
walInit
();
...
...
@@ -178,7 +183,6 @@ void walFsync(SWal *, bool force);
// apis for lifecycle management
int32_t
walCommit
(
SWal
*
,
int64_t
ver
);
// truncate after
int32_t
walRollback
(
SWal
*
,
int64_t
ver
);
// notify that previous logs can be pruned safely
int32_t
walBeginSnapshot
(
SWal
*
,
int64_t
ver
);
...
...
@@ -187,15 +191,16 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*);
// read
SWalReadHandle
*
walOpenReadHandle
(
SWal
*
);
void
walCloseReadHandle
(
SWalReadHandle
*
);
int32_t
walReadWithHandle
(
SWalReadHandle
*
pRead
,
int64_t
ver
);
SWalReader
*
walOpenReader
(
SWal
*
,
SWalFilterCond
*
pCond
);
void
walCloseReader
(
SWalReader
*
pRead
);
int32_t
walReadVer
(
SWalReader
*
pRead
,
int64_t
ver
);
int32_t
walNextValidMsg
(
SWalReader
*
pRead
);
// only for tq usage
void
walSetReaderCapacity
(
SWalRead
Handle
*
pRead
,
int32_t
capacity
);
int32_t
walFetchHead
(
SWalRead
Handle
*
pRead
,
int64_t
ver
,
SWalCkHead
*
pHead
);
int32_t
walFetchBody
(
SWalRead
Handle
*
pRead
,
SWalCkHead
**
ppHead
);
int32_t
walSkipFetchBody
(
SWalRead
Handle
*
pRead
,
const
SWalCkHead
*
pHead
);
void
walSetReaderCapacity
(
SWalRead
er
*
pRead
,
int32_t
capacity
);
int32_t
walFetchHead
(
SWalRead
er
*
pRead
,
int64_t
ver
,
SWalCkHead
*
pHead
);
int32_t
walFetchBody
(
SWalRead
er
*
pRead
,
SWalCkHead
**
ppHead
);
int32_t
walSkipFetchBody
(
SWalRead
er
*
pRead
,
const
SWalCkHead
*
pHead
);
typedef
struct
{
int64_t
refId
;
...
...
@@ -207,10 +212,11 @@ void walCloseRef(SWalRef *);
int32_t
walRefVer
(
SWalRef
*
,
int64_t
ver
);
int32_t
walUnrefVer
(
SWal
*
);
// help function for raft
bool
walLogExist
(
SWal
*
,
int64_t
ver
);
bool
walIsEmpty
(
SWal
*
);
// lifecycle check
bool
walIsEmpty
(
SWal
*
);
int64_t
walGetFirstVer
(
SWal
*
);
int64_t
walGetSnapshotVer
(
SWal
*
);
int64_t
walGetLastVer
(
SWal
*
);
...
...
include/util/tutil.h
浏览文件 @
71331d79
...
...
@@ -45,6 +45,7 @@ void taosIp2String(uint32_t ip, char *str);
void
taosIpPort2String
(
uint32_t
ip
,
uint16_t
port
,
char
*
str
);
void
*
tmemmem
(
const
char
*
haystack
,
int
hlen
,
const
char
*
needle
,
int
nlen
);
char
*
strDupUnquo
(
const
char
*
src
);
static
FORCE_INLINE
void
taosEncryptPass
(
uint8_t
*
inBuf
,
size_t
inLen
,
char
*
target
)
{
T_MD5_CTX
context
;
...
...
source/client/CMakeLists.txt
浏览文件 @
71331d79
...
...
@@ -26,6 +26,8 @@ if(TD_WINDOWS)
/DEF:
${
CMAKE_CURRENT_SOURCE_DIR
}
/src/taos.def
)
INCLUDE_DIRECTORIES
(
jni/windows
)
INCLUDE_DIRECTORIES
(
jni/windows/win32
)
INCLUDE_DIRECTORIES
(
jni/windows/win32/bridge
)
else
()
INCLUDE_DIRECTORIES
(
jni/linux
)
endif
()
...
...
source/client/src/tmq.c
浏览文件 @
71331d79
...
...
@@ -50,19 +50,18 @@ struct tmq_list_t {
};
struct
tmq_conf_t
{
char
clientId
[
256
];
char
groupId
[
TSDB_CGROUP_LEN
];
int8_t
autoCommit
;
int8_t
resetOffset
;
int8_t
withTbName
;
int8_t
spEnable
;
int32_t
spBatchSize
;
uint16_t
port
;
int32_t
autoCommitInterval
;
char
*
ip
;
char
*
user
;
char
*
pass
;
/*char* db;*/
char
clientId
[
256
];
char
groupId
[
TSDB_CGROUP_LEN
];
int8_t
autoCommit
;
int8_t
resetOffset
;
int8_t
withTbName
;
int8_t
spEnable
;
int32_t
spBatchSize
;
uint16_t
port
;
int32_t
autoCommitInterval
;
char
*
ip
;
char
*
user
;
char
*
pass
;
tmq_commit_cb
*
commitCb
;
void
*
commitCbUserParam
;
};
...
...
@@ -338,7 +337,7 @@ tmq_list_t* tmq_list_new() {
int32_t
tmq_list_append
(
tmq_list_t
*
list
,
const
char
*
src
)
{
SArray
*
container
=
&
list
->
container
;
char
*
topic
=
str
dup
(
src
);
char
*
topic
=
str
DupUnquo
(
src
);
if
(
taosArrayPush
(
container
,
&
topic
)
==
NULL
)
return
-
1
;
return
0
;
}
...
...
source/common/src/tdatablock.c
浏览文件 @
71331d79
...
...
@@ -1737,41 +1737,54 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t
len
=
0
;
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"
\n
%s |block type %d |child id %d|group id %lu|
\n
"
,
flag
,
(
int32_t
)
pDataBlock
->
info
.
type
,
pDataBlock
->
info
.
childId
,
pDataBlock
->
info
.
groupId
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"%s |"
,
flag
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
for
(
int32_t
k
=
0
;
k
<
colNum
;
k
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
if
(
colDataIsNull
(
pColInfoData
,
rows
,
j
,
NULL
))
{
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15s |"
,
"NULL"
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
continue
;
}
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
formatTimestamp
(
pBuf
,
*
(
uint64_t
*
)
var
,
TSDB_TIME_PRECISION_MILLI
);
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %25s |"
,
pBuf
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_INT
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15d |"
,
*
(
int32_t
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_UINT
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15u |"
,
*
(
uint32_t
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_BIGINT
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15ld |"
,
*
(
int64_t
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15lu |"
,
*
(
uint64_t
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15f |"
,
*
(
float
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15lf |"
,
*
(
double
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
}
}
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"
\n
"
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
}
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"%s |end
\n
"
,
flag
);
return
dumpBuf
;
...
...
source/dnode/mgmt/test/sut/src/sut.cpp
浏览文件 @
71331d79
...
...
@@ -43,6 +43,9 @@ if (taosInitLog("taosdlog", 1) != 0) {
}
void
Testbase
::
Init
(
const
char
*
path
,
int16_t
port
)
{
#ifdef _TD_DARWIN_64
osDefaultInit
();
#endif
tsServerPort
=
port
;
strcpy
(
tsLocalFqdn
,
"localhost"
);
snprintf
(
tsLocalEp
,
TSDB_EP_LEN
,
"%s:%u"
,
tsLocalFqdn
,
tsServerPort
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
71331d79
...
...
@@ -40,15 +40,6 @@ extern "C" {
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define IS_META_MSG(x) ( \
x == TDMT_VND_CREATE_STB \
|| x == TDMT_VND_ALTER_STB \
|| x == TDMT_VND_DROP_STB \
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DROP_TTL_TABLE \
)
// clang-format on
typedef
struct
STqOffsetStore
STqOffsetStore
;
...
...
@@ -128,7 +119,7 @@ typedef struct {
int8_t
fetchMeta
;
// reader
SWalRead
Handle
*
pWalReader
;
SWalRead
er
*
pWalReader
;
// push
STqPushHandle
pushHandle
;
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
71331d79
...
...
@@ -439,6 +439,7 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
tDecoderInit
(
&
dc
,
pData
,
nData
);
tDecodeSSchemaWrapper
(
&
dc
,
pSchemaWrapper
);
tDecoderClear
(
&
dc
);
tdbFree
(
pData
);
// convert
STSchemaBuilder
sb
=
{
0
};
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
71331d79
...
...
@@ -332,7 +332,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
}
SReadHandle
handle
=
{
.
r
eader
=
pReadHandle
,
.
streamR
eader
=
pReadHandle
,
.
meta
=
pMeta
,
.
pMsgCb
=
pMsgCb
,
.
vnode
=
pVnode
,
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
71331d79
...
...
@@ -28,8 +28,12 @@ int32_t tqInit() {
atomic_store_8
(
&
tqMgmt
.
inited
,
0
);
return
-
1
;
}
if
(
streamInit
()
<
0
)
{
return
-
1
;
}
atomic_store_8
(
&
tqMgmt
.
inited
,
1
);
}
return
0
;
}
...
...
@@ -42,6 +46,7 @@ void tqCleanUp() {
if
(
old
==
1
)
{
taosTmrCleanUp
(
tqMgmt
.
timer
);
streamCleanUp
();
atomic_store_8
(
&
tqMgmt
.
inited
,
0
);
}
}
...
...
@@ -144,7 +149,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
abuf
,
len
);
tEncodeSMqDataRsp
(
&
encoder
,
pRsp
);
/*tEncodeSMqDataBlkRsp(&abuf, pRsp);*/
SRpcMsg
rsp
=
{
.
info
=
pMsg
->
info
,
...
...
@@ -361,8 +365,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT
(
IS_META_MSG
(
pHead
->
msgType
));
tqInfo
(
"fetch meta msg, ver: %ld, type: %d"
,
pHead
->
version
,
pHead
->
msgType
);
SMqMetaRsp
metaRsp
=
{
0
};
metaRsp
.
reqOffset
=
pReq
->
reqOffset
.
version
;
metaRsp
.
rspOffset
=
fetchVer
;
/*metaRsp.reqOffset = pReq->reqOffset.version;*/
/*metaRsp.rspOffset = fetchVer;*/
/*metaRsp.rspOffsetNew.version = fetchVer;*/
tqOffsetResetToLog
(
&
metaRsp
.
reqOffsetNew
,
pReq
->
reqOffset
.
version
);
tqOffsetResetToLog
(
&
metaRsp
.
rspOffsetNew
,
fetchVer
);
metaRsp
.
resMsgType
=
pHead
->
msgType
;
metaRsp
.
metaRspLen
=
pHead
->
bodyLen
;
metaRsp
.
metaRsp
=
pHead
->
body
;
...
...
@@ -439,7 +446,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle
->
execHandle
.
subType
=
req
.
subType
;
pHandle
->
fetchMeta
=
req
.
withMeta
;
pHandle
->
pWalReader
=
walOpenRead
Handle
(
pTq
->
pVnode
->
pWal
);
pHandle
->
pWalReader
=
walOpenRead
er
(
pTq
->
pVnode
->
pWal
,
NULL
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
pHandle
->
execHandle
.
pExecReader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
}
...
...
@@ -448,10 +455,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req
.
qmsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
SReadHandle
handle
=
{
.
r
eader
=
pHandle
->
execHandle
.
pExecReader
[
i
],
.
streamR
eader
=
pHandle
->
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
vnode
=
pTq
->
pVnode
,
.
tq
Reader
=
true
,
.
initTable
Reader
=
true
,
};
pHandle
->
execHandle
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
);
ASSERT
(
pHandle
->
execHandle
.
execCol
.
task
[
i
]);
...
...
@@ -522,19 +529,16 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
// expand runners
if
(
pTask
->
isDataScan
)
{
SStreamReader
*
pStreamReader
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
SReadHandle
handle
=
{
.
reader
=
pStreamReader
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
vnode
=
pTq
->
pVnode
,
SReadHandle
handle
=
{
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
vnode
=
pTq
->
pVnode
,
.
initStreamReader
=
1
,
};
/*pTask->exec.inputHandle = pStreamReader;*/
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
ASSERT
(
pTask
->
exec
.
executor
);
}
else
{
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
NULL
);
ASSERT
(
pTask
->
exec
.
executor
);
}
ASSERT
(
pTask
->
exec
.
executor
);
}
// sink
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
71331d79
...
...
@@ -77,14 +77,14 @@ int32_t tqMetaOpen(STQ* pTq) {
STqHandle
handle
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
pVal
,
vLen
);
tDecodeSTqHandle
(
&
decoder
,
&
handle
);
handle
.
pWalReader
=
walOpenRead
Handle
(
pTq
->
pVnode
->
pWal
);
handle
.
pWalReader
=
walOpenRead
er
(
pTq
->
pVnode
->
pWal
,
NULL
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
handle
.
execHandle
.
pExecReader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
}
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
SReadHandle
reader
=
{
.
r
eader
=
handle
.
execHandle
.
pExecReader
[
i
],
.
streamR
eader
=
handle
.
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
.
vnode
=
pTq
->
pVnode
,
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
71331d79
...
...
@@ -309,9 +309,11 @@ static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t
if
(
code
)
goto
_err
;
// code = tMapDataSearch(&delIdxMap, &idx, tGetDelIdx, tCmprDelIdx, pDelIdx);
pDel
Idx
=
taosArraySearch
(
pDelIdxArray
,
&
idx
,
tCmprDelIdx
,
TD_EQ
);
SDelIdx
*
p
Idx
=
taosArraySearch
(
pDelIdxArray
,
&
idx
,
tCmprDelIdx
,
TD_EQ
);
if
(
code
)
goto
_err
;
*
pDelIdx
=
*
pIdx
;
if
(
pDelIdxArray
)
{
taosArrayDestroy
(
pDelIdxArray
);
}
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
71331d79
...
...
@@ -168,8 +168,6 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
tb_uid_t
suid
;
tb_uid_t
uid
;
taosArrayClear
(
pCommitter
->
aDelData
);
if
(
pTbData
)
{
suid
=
pTbData
->
suid
;
uid
=
pTbData
->
uid
;
...
...
@@ -185,6 +183,8 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
code
=
tsdbReadDelData
(
pCommitter
->
pDelFReader
,
pDelIdx
,
pCommitter
->
aDelData
,
NULL
);
if
(
code
)
goto
_err
;
}
else
{
taosArrayClear
(
pCommitter
->
aDelData
);
}
if
(
pTbData
==
NULL
&&
pDelIdx
==
NULL
)
goto
_exit
;
...
...
@@ -205,7 +205,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
if
(
code
)
goto
_err
;
// put delIdx
if
(
taosArrayPush
(
pCommitter
->
aDelIdx
,
&
delIdx
)
==
NULL
)
{
if
(
taosArrayPush
(
pCommitter
->
aDelIdx
N
,
&
delIdx
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
...
...
@@ -854,7 +854,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
if
(
pCommitter
->
pReader
)
{
code
=
tsdbDataFReaderClose
(
&
pCommitter
->
pReader
);
goto
_err
;
if
(
code
)
goto
_err
;
}
_exit:
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
71331d79
...
...
@@ -520,7 +520,7 @@ static int32_t tsdbScanAndTryFixFS(STsdbFS *pFS, int8_t deepScan) {
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb can and try fix fs failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d tsdb
s
can and try fix fs failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
71331d79
...
...
@@ -105,7 +105,7 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
// check if table exists (todo: refact)
SMetaReader
mr
=
{
0
};
SMetaEntry
me
=
{
0
};
//
SMetaEntry me = {0};
metaReaderInit
(
&
mr
,
pTsdb
->
pVnode
->
pMeta
,
0
);
if
(
metaGetTableEntryByUid
(
&
mr
,
pMsgIter
->
uid
)
<
0
)
{
metaReaderClear
(
&
mr
);
...
...
@@ -117,6 +117,8 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
if
(
mr
.
me
.
type
==
TSDB_NORMAL_TABLE
)
{
sverNew
=
mr
.
me
.
ntbEntry
.
schemaRow
.
version
;
}
else
{
tDecoderClear
(
&
mr
.
coder
);
metaGetTableEntryByUid
(
&
mr
,
mr
.
me
.
ctbEntry
.
suid
);
sverNew
=
mr
.
me
.
stbEntry
.
schemaRow
.
version
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
71331d79
...
...
@@ -49,7 +49,7 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
}
pDelFWriter
->
fDel
.
size
=
TSDB_FHDR_SIZE
;
pDelFWriter
->
fDel
.
size
=
0
;
pDelFWriter
->
fDel
.
offset
=
0
;
*
ppWriter
=
pDelFWriter
;
return
code
;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
71331d79
...
...
@@ -282,8 +282,9 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
)
{
vTrace
(
"message in fetch queue is processing"
);
if
((
pMsg
->
msgType
==
TDMT_SCH_FETCH
||
pMsg
->
msgType
==
TDMT_VND_TABLE_META
||
pMsg
->
msgType
==
TDMT_VND_TABLE_CFG
)
&&
!
vnodeIsLeader
(
pVnode
))
{
if
((
pMsg
->
msgType
==
TDMT_SCH_FETCH
||
pMsg
->
msgType
==
TDMT_VND_TABLE_META
||
pMsg
->
msgType
==
TDMT_VND_TABLE_CFG
)
&&
!
vnodeIsLeader
(
pVnode
))
{
vnodeRedirectRpcMsg
(
pVnode
,
pMsg
);
return
0
;
}
...
...
@@ -349,7 +350,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
if
(
tbUids
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
t
=
ntohl
(
*
(
int32_t
*
)
pReq
);
v
Error
(
"rec ttl time:%d"
,
t
);
v
Debug
(
"rec ttl time:%d"
,
t
);
int32_t
ret
=
metaTtlDropTable
(
pVnode
->
pMeta
,
t
,
tbUids
);
if
(
ret
!=
0
)
{
goto
end
;
...
...
@@ -390,10 +391,14 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *p
goto
_err
;
}
taosMemoryFree
(
req
.
schemaRow
.
pSchema
);
taosMemoryFree
(
req
.
schemaTag
.
pSchema
);
tDecoderClear
(
&
coder
);
return
0
;
_err:
taosMemoryFree
(
req
.
schemaRow
.
pSchema
);
taosMemoryFree
(
req
.
schemaTag
.
pSchema
);
tDecoderClear
(
&
coder
);
return
-
1
;
}
...
...
@@ -917,4 +922,4 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
_err:
return
code
;
}
\ No newline at end of file
}
source/libs/executor/inc/executorimpl.h
浏览文件 @
71331d79
...
...
@@ -348,7 +348,7 @@ typedef struct SessionWindowSupporter {
uint8_t
parentType
;
}
SessionWindowSupporter
;
typedef
struct
SStream
Block
ScanInfo
{
typedef
struct
SStreamScanInfo
{
uint64_t
tableUid
;
// queried super table uid
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
...
...
@@ -365,7 +365,7 @@ typedef struct SStreamBlockScanInfo {
int32_t
blockType
;
// current block type
int32_t
validBlockIndex
;
// Is current data has returned?
uint64_t
numOfExec
;
// execution times
void
*
stream
Block
Reader
;
// stream block reader handle
void
*
streamReader
;
// stream block reader handle
int32_t
tsArrayIndex
;
SArray
*
tsArray
;
...
...
@@ -374,7 +374,7 @@ typedef struct SStreamBlockScanInfo {
EStreamScanMode
scanMode
;
SOperatorInfo
*
pStreamScanOp
;
SOperatorInfo
*
p
SnapshotRead
Op
;
SOperatorInfo
*
p
TableScan
Op
;
SArray
*
childIds
;
SessionWindowSupporter
sessionSup
;
bool
assignBlockUid
;
// assign block uid to groupId, temporarily used for generating rollup SMA.
...
...
@@ -383,7 +383,7 @@ typedef struct SStreamBlockScanInfo {
SSDataBlock
*
pPullDataRes
;
// pull data SSDataBlock
SSDataBlock
*
pDeleteDataRes
;
// delete data SSDataBlock
int32_t
deleteDataIndex
;
}
SStream
Block
ScanInfo
;
}
SStreamScanInfo
;
typedef
struct
SSysTableScanInfo
{
SRetrieveMetaTableRsp
*
pRsp
;
...
...
@@ -518,6 +518,7 @@ typedef struct SIndefOperatorInfo {
SAggSupporter
aggSup
;
SArray
*
pPseudoColInfo
;
SExprSupp
scalarSup
;
SNode
*
pCondition
;
}
SIndefOperatorInfo
;
typedef
struct
SFillOperatorInfo
{
...
...
@@ -527,6 +528,8 @@ typedef struct SFillOperatorInfo {
void
**
p
;
SSDataBlock
*
existNewGroupBlock
;
bool
multigroupResult
;
STimeWindow
win
;
SNode
*
pCondition
;
}
SFillOperatorInfo
;
typedef
struct
SGroupbyOperatorInfo
{
...
...
@@ -587,6 +590,7 @@ typedef struct SSessionAggOperatorInfo {
int64_t
gap
;
// session window gap
int32_t
tsSlotId
;
// primary timestamp slot id
STimeWindowAggSupp
twAggSup
;
SNode
*
pCondition
;
}
SSessionAggOperatorInfo
;
typedef
struct
SResultWindowInfo
{
...
...
@@ -648,6 +652,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t
tsSlotId
;
// primary timestamp column slot id
STimeWindowAggSupp
twAggSup
;
// bool reptScan;
const
SNode
*
pCondition
;
}
SStateWindowOperatorInfo
;
typedef
struct
SStreamStateAggOperatorInfo
{
...
...
@@ -805,7 +810,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
int32_t
tsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
);
S
Node
*
pCondition
,
S
ExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SNode
*
pCondition
,
SExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
);
...
...
@@ -819,7 +824,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
STimeWindowAggSupp
*
pTwAggSupp
,
int32_t
tsSlotId
,
SColumn
*
pStateKeyCol
,
SExecTaskInfo
*
pTaskInfo
);
SSDataBlock
*
pResBlock
,
STimeWindowAggSupp
*
pTwAggSupp
,
int32_t
tsSlotId
,
SColumn
*
pStateKeyCol
,
SNode
*
pCondition
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
);
...
...
source/libs/executor/src/executor.c
浏览文件 @
71331d79
...
...
@@ -37,7 +37,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
else
{
pOperator
->
status
=
OP_NOT_OPENED
;
SStream
Block
ScanInfo
*
pInfo
=
pOperator
->
info
;
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
pInfo
->
assignBlockUid
=
assignUid
;
// TODO: if a block was set but not consumed,
...
...
@@ -45,7 +45,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pInfo
->
blockType
=
type
;
if
(
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
if
(
tqReadHandleSetMsg
(
pInfo
->
stream
Block
Reader
,
input
,
0
)
<
0
)
{
if
(
tqReadHandleSetMsg
(
pInfo
->
streamReader
,
input
,
0
)
<
0
)
{
qError
(
"submit msg messed up when initing stream block, %s"
PRIx64
,
id
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
...
...
@@ -130,7 +130,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return
pTaskInfo
;
}
static
SArray
*
filterQualifiedChildTables
(
const
SStream
Block
ScanInfo
*
pScanInfo
,
const
SArray
*
tableIdList
)
{
static
SArray
*
filterQualifiedChildTables
(
const
SStreamScanInfo
*
pScanInfo
,
const
SArray
*
tableIdList
)
{
SArray
*
qa
=
taosArrayInit
(
4
,
sizeof
(
tb_uid_t
));
// let's discard the tables those are not created according to the queried super table.
...
...
@@ -168,17 +168,17 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
pInfo
=
pInfo
->
pDownstream
[
0
];
}
int32_t
code
=
0
;
SStream
Block
ScanInfo
*
pScanInfo
=
pInfo
->
info
;
int32_t
code
=
0
;
SStreamScanInfo
*
pScanInfo
=
pInfo
->
info
;
if
(
isAdd
)
{
// add new table id
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
);
qDebug
(
" %d qualified child tables added into stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
qa
));
code
=
tqReadHandleAddTbUidList
(
pScanInfo
->
stream
Block
Reader
,
qa
);
code
=
tqReadHandleAddTbUidList
(
pScanInfo
->
streamReader
,
qa
);
taosArrayDestroy
(
qa
);
}
else
{
// remove the table id in current list
qDebug
(
" %d remove child tables from the stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
tableIdList
));
code
=
tqReadHandleRemoveTbUidList
(
pScanInfo
->
stream
Block
Reader
,
tableIdList
);
code
=
tqReadHandleRemoveTbUidList
(
pScanInfo
->
streamReader
,
tableIdList
);
}
return
code
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
71331d79
...
...
@@ -1340,6 +1340,8 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
extractQualifiedTupleByFilterResult
(
pBlock
,
rowRes
,
keep
);
blockDataUpdateTsWindow
(
pBlock
,
0
);
taosMemoryFree
(
rowRes
);
}
void
extractQualifiedTupleByFilterResult
(
SSDataBlock
*
pBlock
,
const
int8_t
*
rowRes
,
bool
keep
)
{
...
...
@@ -2847,12 +2849,12 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pOperator
->
status
=
OP_OPENED
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStream
Block
ScanInfo
*
pScanInfo
=
pOperator
->
info
;
SStreamScanInfo
*
pScanInfo
=
pOperator
->
info
;
pScanInfo
->
blockType
=
STREAM_INPUT__DATA_SCAN
;
pScanInfo
->
p
SnapshotRead
Op
->
status
=
OP_OPENED
;
pScanInfo
->
p
TableScan
Op
->
status
=
OP_OPENED
;
STableScanInfo
*
pInfo
=
pScanInfo
->
p
SnapshotRead
Op
->
info
;
STableScanInfo
*
pInfo
=
pScanInfo
->
p
TableScan
Op
->
info
;
ASSERT
(
pInfo
->
scanMode
==
TABLE_SCAN__TABLE_ORDER
);
if
(
uid
==
0
)
{
...
...
@@ -2912,8 +2914,8 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
int32_t
doGetScanStatus
(
SOperatorInfo
*
pOperator
,
uint64_t
*
uid
,
int64_t
*
ts
)
{
int32_t
type
=
pOperator
->
operatorType
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStream
Block
ScanInfo
*
pScanInfo
=
pOperator
->
info
;
STableScanInfo
*
pSnapShotScanInfo
=
pScanInfo
->
pSnapshotRead
Op
->
info
;
SStreamScanInfo
*
pScanInfo
=
pOperator
->
info
;
STableScanInfo
*
pSnapShotScanInfo
=
pScanInfo
->
pTableScan
Op
->
info
;
*
uid
=
pSnapShotScanInfo
->
lastStatus
.
uid
;
*
ts
=
pSnapShotScanInfo
->
lastStatus
.
ts
;
}
else
{
...
...
@@ -3334,7 +3336,7 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
SExecTaskInfo
*
pTaskInfo
)
{
pInfo
->
totalInputRows
=
pInfo
->
existNewGroupBlock
->
info
.
rows
;
int64_t
ekey
=
Q_STATUS_EQUAL
(
pTaskInfo
->
status
,
TASK_COMPLETED
)
?
p
TaskInfo
->
window
.
ekey
int64_t
ekey
=
Q_STATUS_EQUAL
(
pTaskInfo
->
status
,
TASK_COMPLETED
)
?
p
Info
->
win
.
ekey
:
pInfo
->
existNewGroupBlock
->
info
.
window
.
ekey
;
taosResetFillInfo
(
pInfo
->
pFillInfo
,
getFillInfoStart
(
pInfo
->
pFillInfo
));
...
...
@@ -3362,7 +3364,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf
}
}
static
SSDataBlock
*
doFill
(
SOperatorInfo
*
pOperator
)
{
static
SSDataBlock
*
doFill
Impl
(
SOperatorInfo
*
pOperator
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -3370,9 +3372,6 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
SSDataBlock
*
pResBlock
=
pInfo
->
pRes
;
blockDataCleanup
(
pResBlock
);
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
// todo handle different group data interpolation
bool
n
=
false
;
...
...
@@ -3395,7 +3394,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
// Fill the previous group data block, before handle the data block of new group.
// Close the fill operation for previous group data block
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
0
,
p
TaskInfo
->
window
.
ekey
);
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
0
,
p
Info
->
win
.
ekey
);
}
else
{
if
(
pBlock
==
NULL
)
{
if
(
pInfo
->
totalInputRows
==
0
)
{
...
...
@@ -3403,7 +3402,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
return
NULL
;
}
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
0
,
p
TaskInfo
->
window
.
ekey
);
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
0
,
p
Info
->
win
.
ekey
);
}
else
{
pInfo
->
totalInputRows
+=
pBlock
->
info
.
rows
;
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
pBlock
->
info
.
rows
,
pBlock
->
info
.
window
.
ekey
);
...
...
@@ -3438,6 +3437,39 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
}
}
static
SSDataBlock
*
doFill
(
SOperatorInfo
*
pOperator
)
{
SFillOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SSDataBlock
*
fillResult
=
NULL
;
while
(
true
)
{
fillResult
=
doFillImpl
(
pOperator
);
if
(
fillResult
!=
NULL
)
{
doFilter
(
pInfo
->
pCondition
,
fillResult
);
}
if
(
fillResult
==
NULL
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
if
(
fillResult
->
info
.
rows
>
0
)
{
break
;
}
}
if
(
fillResult
!=
NULL
)
{
size_t
rows
=
fillResult
->
info
.
rows
;
pOperator
->
resultInfo
.
totalRows
+=
rows
;
}
return
fillResult
;
}
static
void
destroyExprInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfExprs
)
{
for
(
int32_t
i
=
0
;
i
<
numOfExprs
;
++
i
)
{
SExprInfo
*
pExprInfo
=
&
pExpr
[
i
];
...
...
@@ -3830,6 +3862,8 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
}
}
doFilter
(
pIndefInfo
->
pCondition
,
pInfo
->
pRes
);
size_t
rows
=
pInfo
->
pRes
->
info
.
rows
;
pOperator
->
resultInfo
.
totalRows
+=
rows
;
...
...
@@ -3883,6 +3917,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pPseudoColInfo
=
setRowTsColumnOutputInfo
(
pSup
->
pCtx
,
numOfExpr
);
pInfo
->
pCondition
=
pPhyNode
->
node
.
pConditions
;
pOperator
->
name
=
"IndefinitOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_PROJECT
;
...
...
@@ -3920,7 +3955,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
int32_t
order
=
TSDB_ORDER_ASC
;
pInfo
->
pFillInfo
=
taosCreateFillInfo
(
order
,
w
.
skey
,
0
,
capacity
,
numOfCols
,
pInterval
,
fillType
,
pColInfo
,
id
);
pInfo
->
p
=
taosMemoryCalloc
(
numOfCols
,
POINTER_BYTES
);
pInfo
->
win
=
win
;
pInfo
->
p
=
taosMemoryCalloc
(
numOfCols
,
POINTER_BYTES
);
if
(
pInfo
->
pFillInfo
==
NULL
||
pInfo
->
p
==
NULL
)
{
taosMemoryFree
(
pInfo
->
pFillInfo
);
taosMemoryFree
(
pInfo
->
p
);
...
...
@@ -3955,6 +3991,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pInfo
->
pRes
=
pResBlock
;
pInfo
->
multigroupResult
=
multigroupResult
;
pInfo
->
pCondition
=
pPhyFillNode
->
node
.
pConditions
;
pOperator
->
name
=
"FillOperator"
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
...
...
@@ -4452,7 +4489,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t
tsSlotId
=
((
SColumnNode
*
)
pSessionNode
->
window
.
pTspk
)
->
slotId
;
pOptr
=
createSessionAggOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
pSessionNode
->
gap
,
tsSlotId
,
&
as
,
pTaskInfo
);
createSessionAggOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
pSessionNode
->
gap
,
tsSlotId
,
&
as
,
p
PhyNode
->
pConditions
,
p
TaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION
==
type
)
{
pOptr
=
createStreamSessionAggOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION
==
type
)
{
...
...
@@ -4474,7 +4511,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SColumnNode
*
pColNode
=
(
SColumnNode
*
)((
STargetNode
*
)
pStateNode
->
pStateKey
)
->
pExpr
;
SColumn
col
=
extractColumnFromColumnNode
(
pColNode
);
pOptr
=
createStatewindowOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
as
,
tsSlotId
,
&
col
,
pTaskInfo
);
pOptr
=
createStatewindowOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
as
,
tsSlotId
,
&
col
,
p
PhyNode
->
pConditions
,
p
TaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE
==
type
)
{
pOptr
=
createStreamStateAggOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
==
type
)
{
...
...
@@ -4584,9 +4621,9 @@ static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanI
}
return
extractTbscanInStreamOpTree
(
pOperator
->
pDownstream
[
0
],
ppInfo
);
}
else
{
SStream
Block
ScanInfo
*
pInfo
=
pOperator
->
info
;
ASSERT
(
pInfo
->
p
SnapshotRead
Op
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
);
*
ppInfo
=
pInfo
->
p
SnapshotRead
Op
->
info
;
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
ASSERT
(
pInfo
->
p
TableScan
Op
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
);
*
ppInfo
=
pInfo
->
p
TableScan
Op
->
info
;
return
0
;
}
}
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
71331d79
...
...
@@ -302,14 +302,21 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pRes
);
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
size_t
rows
=
pRes
->
info
.
rows
;
if
(
rows
==
0
||
!
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
if
(
pRes
->
info
.
rows
>
0
)
{
break
;
}
}
pOperator
->
resultInfo
.
totalRows
+=
rows
;
pOperator
->
resultInfo
.
totalRows
+=
pRes
->
info
.
rows
;
return
(
pRes
->
info
.
rows
==
0
)
?
NULL
:
pRes
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
71331d79
此差异已折叠。
点击以展开。
source/libs/executor/src/timewindowoperator.c
浏览文件 @
71331d79
...
...
@@ -1146,13 +1146,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo
*
pBInfo
=
&
pInfo
->
binfo
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
}
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
return
pBInfo
->
pRes
;
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
break
;
}
}
pOperator
->
resultInfo
.
totalRows
+=
pBInfo
->
pRes
->
info
.
rows
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
}
int32_t
order
=
TSDB_ORDER_ASC
;
...
...
@@ -1178,15 +1187,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
size_t
rows
=
pBInfo
->
pRes
->
info
.
rows
;
pOperator
->
resultInfo
.
totalRows
+=
rows
;
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
return
(
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
break
;
}
}
pOperator
->
resultInfo
.
totalRows
+=
pBInfo
->
pRes
->
info
.
rows
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
}
static
SSDataBlock
*
doBuildIntervalResult
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -1880,12 +1896,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
return
pBInfo
->
pRes
->
info
.
rows
>
0
?
pBInfo
->
pRes
:
NULL
;
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
break
;
}
}
pOperator
->
resultInfo
.
totalRows
+=
pBInfo
->
pRes
->
info
.
rows
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
}
int64_t
st
=
taosGetTimestampUs
();
...
...
@@ -1914,15 +1940,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pBInfo
->
pRes
);
size_t
rows
=
pBInfo
->
pRes
->
info
.
rows
;
pOperator
->
resultInfo
.
totalRows
+=
rows
;
bool
hasRemain
=
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
doSetOperatorCompleted
(
pOperator
);
break
;
}
return
(
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
break
;
}
}
pOperator
->
resultInfo
.
totalRows
+=
pBInfo
->
pRes
->
info
.
rows
;
return
(
pBInfo
->
pRes
->
info
.
rows
==
0
)
?
NULL
:
pBInfo
->
pRes
;
}
static
void
doKeepPrevRows
(
STimeSliceOperatorInfo
*
pSliceInfo
,
const
SSDataBlock
*
pBlock
,
int32_t
rowIndex
)
{
...
...
@@ -2235,7 +2268,7 @@ _error:
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
STimeWindowAggSupp
*
pTwAggSup
,
int32_t
tsSlotId
,
SColumn
*
pStateKeyCol
,
SExecTaskInfo
*
pTaskInfo
)
{
SColumn
*
pStateKeyCol
,
S
Node
*
pCondition
,
S
ExecTaskInfo
*
pTaskInfo
)
{
SStateWindowOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStateWindowOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -2246,6 +2279,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pInfo
->
stateKey
.
type
=
pInfo
->
stateCol
.
type
;
pInfo
->
stateKey
.
bytes
=
pInfo
->
stateCol
.
bytes
;
pInfo
->
stateKey
.
pData
=
taosMemoryCalloc
(
1
,
pInfo
->
stateCol
.
bytes
);
pInfo
->
pCondition
=
pCondition
;
if
(
pInfo
->
stateKey
.
pData
==
NULL
)
{
goto
_error
;
}
...
...
@@ -2289,7 +2323,7 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
int32_t
tsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
)
{
STimeWindowAggSupp
*
pTwAggSupp
,
S
Node
*
pCondition
,
S
ExecTaskInfo
*
pTaskInfo
)
{
SSessionAggOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSessionAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -2315,6 +2349,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
winSup
.
prevTs
=
INT64_MIN
;
pInfo
->
reptScan
=
false
;
pInfo
->
pCondition
=
pCondition
;
pOperator
->
name
=
"SessionWindowAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION
;
pOperator
->
blocking
=
true
;
...
...
@@ -2988,7 +3023,7 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
void
initDownStream
(
SOperatorInfo
*
downstream
,
SStreamAggSupporter
*
pAggSup
,
int64_t
gap
,
int64_t
waterMark
,
uint8_t
type
)
{
ASSERT
(
downstream
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
);
SStream
Block
ScanInfo
*
pScanInfo
=
downstream
->
info
;
SStreamScanInfo
*
pScanInfo
=
downstream
->
info
;
pScanInfo
->
sessionSup
=
(
SessionWindowSupporter
){.
pStreamAggSup
=
pAggSup
,
.
gap
=
gap
,
.
parentType
=
type
};
pScanInfo
->
pUpdateInfo
=
updateInfoInit
(
60000
,
TSDB_TIME_PRECISION_MILLI
,
waterMark
);
}
...
...
@@ -3545,8 +3580,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo
*
getResWinForSession
(
void
*
pData
)
{
return
(
SResultWindowInfo
*
)
pData
;
}
SResultWindowInfo
*
getResWinForState
(
void
*
pData
)
{
return
&
((
SStateWindowInfo
*
)
pData
)
->
winInfo
;
}
int32_t
closeSessionWindow
(
SHashObj
*
pHashMap
,
STimeWindowAggSupp
*
pTwSup
,
SArray
*
pClosed
,
__get_win_info_
fn
,
bool
delete
)
{
int32_t
closeSessionWindow
(
SHashObj
*
pHashMap
,
STimeWindowAggSupp
*
pTwSup
,
SArray
*
pClosed
,
__get_win_info_
fn
,
bool
delete
)
{
// Todo(liuyao) save window to tdb
void
**
pIte
=
NULL
;
size_t
keyLen
=
0
;
...
...
@@ -3708,8 +3743,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getResWinForSession
,
pInfo
->
ignoreExpiredData
);
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getResWinForSession
,
pInfo
->
ignoreExpiredData
);
closeChildSessionWindow
(
pInfo
->
pChildren
,
pInfo
->
twAggSup
.
maxTs
,
pInfo
->
ignoreExpiredData
);
copyUpdateResult
(
pStUpdated
,
pUpdated
);
taosHashCleanup
(
pStUpdated
);
...
...
@@ -4210,8 +4245,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getResWinForState
,
pInfo
->
ignoreExpiredData
);
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getResWinForState
,
pInfo
->
ignoreExpiredData
);
closeChildSessionWindow
(
pInfo
->
pChildren
,
pInfo
->
twAggSup
.
maxTs
,
pInfo
->
ignoreExpiredData
);
copyUpdateResult
(
pSeUpdated
,
pUpdated
);
taosHashCleanup
(
pSeUpdated
);
...
...
source/libs/executor/test/CMakeLists.txt
浏览文件 @
71331d79
MESSAGE
(
STATUS
"build parser unit test"
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
IF
(
NOT TD_DARWIN
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
executorTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
executorTest
PRIVATE os util common transport gtest taos_static qcom executor function planner scalar nodes vnode
)
ADD_EXECUTABLE
(
executorTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
executorTest
PRIVATE os util common transport gtest taos_static qcom executor function planner scalar nodes vnode
)
TARGET_INCLUDE_DIRECTORIES
(
executorTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/executor/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/executor/inc"
)
TARGET_INCLUDE_DIRECTORIES
(
executorTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/executor/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/executor/inc"
)
ENDIF
()
\ No newline at end of file
source/libs/function/src/tudf.c
浏览文件 @
71331d79
...
...
@@ -75,6 +75,10 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) {
#ifdef WINDOWS
GetModuleFileName
(
NULL
,
path
,
PATH_MAX
);
taosDirName
(
path
);
#elif defined(_TD_DARWIN_64)
uint32_t
pathSize
=
sizeof
(
path
);
_NSGetExecutablePath
(
path
,
&
pathSize
);
taosDirName
(
path
);
#endif
}
else
{
strncpy
(
path
,
tsProcPath
,
strlen
(
tsProcPath
));
...
...
source/libs/index/test/CMakeLists.txt
浏览文件 @
71331d79
add_executable
(
idxTest
""
)
add_executable
(
idxFstTest
""
)
add_executable
(
idxFstUT
""
)
add_executable
(
idxUtilUT
""
)
add_executable
(
idxJsonUT
""
)
IF
(
NOT TD_DARWIN
)
add_executable
(
idxTest
""
)
add_executable
(
idxFstTest
""
)
add_executable
(
idxFstUT
""
)
add_executable
(
idxUtilUT
""
)
add_executable
(
idxJsonUT
""
)
target_sources
(
idxTest
PRIVATE
"indexTests.cc"
)
target_sources
(
idxFstTest
PRIVATE
"fstTest.cc"
)
target_sources
(
idxTest
PRIVATE
"indexTests.cc"
)
target_sources
(
idxFstTest
PRIVATE
"fstTest.cc"
)
target_sources
(
idxFstUT
PRIVATE
"fstUT.cc"
)
target_sources
(
idxUtilUT
PRIVATE
"utilUT.cc"
)
target_sources
(
idxFstUT
PRIVATE
"fstUT.cc"
)
target_sources
(
idxUtilUT
PRIVATE
"utilUT.cc"
)
target_sources
(
idxJsonUT
PRIVATE
"jsonUT.cc"
)
target_include_directories
(
idxTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
idxFstTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_sources
(
idxJsonUT
PRIVATE
"jsonUT.cc"
)
target_include_directories
(
idxTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
idxFstTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
idxFstUT
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
idxFstUT
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
idxUtilUT
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
idxUtilUT
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
idxJsonUT
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
idxTest
os
util
common
gtest_main
index
)
target_link_libraries
(
idxFstTest
os
util
common
gtest_main
index
)
target_link_libraries
(
idxFstUT
os
util
common
gtest_main
index
)
target_include_directories
(
idxJsonUT
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/index"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
idxTest
os
util
common
gtest_main
index
)
target_link_libraries
(
idxFstTest
os
util
common
gtest_main
index
)
target_link_libraries
(
idxFstUT
os
util
common
gtest_main
index
)
target_link_libraries
(
idxUtilUT
os
util
common
gtest_main
index
)
target_link_libraries
(
idxUtilUT
os
util
common
gtest_main
index
)
target_link_libraries
(
idxJsonUT
os
util
common
gtest_main
index
)
target_link_libraries
(
idxJsonUT
os
util
common
gtest_main
index
)
add_test
(
NAME idxtest
COMMAND idxTest
)
add_test
(
NAME idxJsonUT
COMMAND idxJsonUT
)
add_test
(
NAME idxUtilUT
COMMAND idxUtilUT
)
add_test
(
NAME idxFstUT
COMMAND idxFstUT
)
add_test
(
NAME idxtest
COMMAND idxTest
)
add_test
(
NAME idxJsonUT
COMMAND idxJsonUT
)
add_test
(
NAME idxUtilUT
COMMAND idxUtilUT
)
add_test
(
NAME idxFstUT
COMMAND idxFstUT
)
ENDIF
()
\ No newline at end of file
source/libs/planner/src/planOptimizer.c
浏览文件 @
71331d79
...
...
@@ -890,7 +890,7 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN
return
code
;
}
static
int32_t
pushDownCondOpt
DealLogicNode
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pLogicNode
)
{
static
int32_t
pushDownCondOpt
TrivialPushDown
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pLogicNode
)
{
if
(
NULL
==
pLogicNode
->
pConditions
||
OPTIMIZE_FLAG_TEST_MASK
(
pLogicNode
->
optimizedFlag
,
OPTIMIZE_FLAG_PUSH_DOWN_CONDE
))
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -921,7 +921,7 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog
break
;
case
QUERY_NODE_LOGIC_PLAN_SORT
:
case
QUERY_NODE_LOGIC_PLAN_PARTITION
:
code
=
pushDownCondOpt
DealLogicNode
(
pCxt
,
pLogicNode
);
code
=
pushDownCondOpt
TrivialPushDown
(
pCxt
,
pLogicNode
);
break
;
default:
break
;
...
...
source/libs/planner/test/CMakeLists.txt
浏览文件 @
71331d79
MESSAGE
(
STATUS
"build planner unit test"
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
IF
(
NOT TD_DARWIN
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
plannerTest
ADD_EXECUTABLE
(
plannerTest
${
SOURCE_LIST
}
"
${
SOURCE_LIST
}
/../../../parser/test/mockCatalog.cpp"
"
${
SOURCE_LIST
}
/../../../parser/test/mockCatalogService.cpp"
)
)
TARGET_LINK_LIBRARIES
(
plannerTest
PUBLIC os util common nodes planner parser catalog transport gtest function qcom
)
TARGET_LINK_LIBRARIES
(
plannerTest
PUBLIC os util common nodes planner parser catalog transport gtest function qcom
)
TARGET_INCLUDE_DIRECTORIES
(
plannerTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/planner/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/planner/inc"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/parser/test"
)
TARGET_INCLUDE_DIRECTORIES
(
plannerTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/planner/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/planner/inc"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/parser/test"
)
if
(
${
BUILD_WINGETOPT
}
)
target_include_directories
(
plannerTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/contrib/wingetopt/src"
)
target_link_libraries
(
plannerTest PUBLIC wingetopt
)
endif
()
if
(
${
BUILD_WINGETOPT
}
)
target_include_directories
(
plannerTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/contrib/wingetopt/src"
)
target_link_libraries
(
plannerTest PUBLIC wingetopt
)
endif
()
add_test
(
NAME plannerTest
COMMAND plannerTest
)
add_test
(
NAME plannerTest
COMMAND plannerTest
)
ENDIF
()
\ No newline at end of file
source/libs/stream/inc/streamInc.h
浏览文件 @
71331d79
...
...
@@ -17,6 +17,7 @@
#define _STREAM_INC_H_
#include "executor.h"
#include "tref.h"
#include "tstream.h"
#ifdef __cplusplus
...
...
@@ -24,8 +25,9 @@ extern "C" {
#endif
typedef
struct
{
int8_t
inited
;
void
*
timer
;
int8_t
inited
;
int32_t
refPool
;
void
*
timer
;
}
SStreamGlobalEnv
;
static
SStreamGlobalEnv
streamEnv
;
...
...
source/libs/stream/src/stream.c
浏览文件 @
71331d79
...
...
@@ -76,9 +76,6 @@ void streamTriggerByTimer(void* param, void* tmrId) {
int32_t
streamSetupTrigger
(
SStreamTask
*
pTask
)
{
if
(
pTask
->
triggerParam
!=
0
)
{
if
(
streamInit
()
<
0
)
{
return
-
1
;
}
pTask
->
timer
=
taosTmrStart
(
streamTriggerByTimer
,
(
int32_t
)
pTask
->
triggerParam
,
pTask
,
streamEnv
.
timer
);
pTask
->
triggerStatus
=
TASK_TRIGGER_STATUS__IN_ACTIVE
;
}
...
...
source/libs/sync/inc/syncRaftLog.h
浏览文件 @
71331d79
...
...
@@ -32,8 +32,8 @@ typedef struct SSyncLogStoreData {
SSyncNode
*
pSyncNode
;
SWal
*
pWal
;
TdThreadMutex
mutex
;
SWalRead
Handle
*
pWalHandle
;
TdThreadMutex
mutex
;
SWalRead
er
*
pWalHandle
;
// SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
}
SSyncLogStoreData
;
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
71331d79
...
...
@@ -62,7 +62,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
ASSERT
(
pData
->
pWal
!=
NULL
);
taosThreadMutexInit
(
&
(
pData
->
mutex
),
NULL
);
pData
->
pWalHandle
=
walOpenRead
Handle
(
pData
->
pWal
);
pData
->
pWalHandle
=
walOpenRead
er
(
pData
->
pWal
,
NULL
);
ASSERT
(
pData
->
pWalHandle
!=
NULL
);
pLogStore
->
appendEntry
=
logStoreAppendEntry
;
...
...
@@ -95,7 +95,7 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
taosThreadMutexLock
(
&
(
pData
->
mutex
));
if
(
pData
->
pWalHandle
!=
NULL
)
{
walCloseRead
Handle
(
pData
->
pWalHandle
);
walCloseRead
er
(
pData
->
pWalHandle
);
pData
->
pWalHandle
=
NULL
;
}
taosThreadMutexUnlock
(
&
(
pData
->
mutex
));
...
...
@@ -255,7 +255,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
*
ppEntry
=
NULL
;
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalRead
Handle
*
pWalHandle
=
pData
->
pWalHandle
;
SWalRead
er
*
pWalHandle
=
pData
->
pWalHandle
;
if
(
pWalHandle
==
NULL
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
...
...
@@ -263,7 +263,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
taosThreadMutexLock
(
&
(
pData
->
mutex
));
code
=
walRead
WithHandle
(
pWalHandle
,
index
);
code
=
walRead
Ver
(
pWalHandle
,
index
);
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
...
...
@@ -398,10 +398,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
taosThreadMutexLock
(
&
(
pData
->
mutex
));
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalRead
Handle
*
pWalHandle
=
pData
->
pWalHandle
;
SWalRead
er
*
pWalHandle
=
pData
->
pWalHandle
;
ASSERT
(
pWalHandle
!=
NULL
);
int32_t
code
=
walRead
WithHandle
(
pWalHandle
,
index
);
int32_t
code
=
walRead
Ver
(
pWalHandle
,
index
);
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
...
...
source/libs/tfs/test/tfsTest.cpp
浏览文件 @
71331d79
...
...
@@ -16,7 +16,11 @@
class
TfsTest
:
public
::
testing
::
Test
{
protected:
#ifdef _TD_DARWIN_64
static
void
SetUpTestSuite
()
{
root
=
"/private"
TD_TMP_DIR_PATH
"tfsTest"
;
}
#else
static
void
SetUpTestSuite
()
{
root
=
TD_TMP_DIR_PATH
"tfsTest"
;
}
#endif
static
void
TearDownTestSuite
()
{}
public:
...
...
@@ -299,6 +303,17 @@ TEST_F(TfsTest, 04_File) {
TEST_F
(
TfsTest
,
05
_MultiDisk
)
{
int32_t
code
=
0
;
#ifdef _TD_DARWIN_64
const
char
*
root00
=
"/private"
TD_TMP_DIR_PATH
"tfsTest00"
;
const
char
*
root01
=
"/private"
TD_TMP_DIR_PATH
"tfsTest01"
;
const
char
*
root10
=
"/private"
TD_TMP_DIR_PATH
"tfsTest10"
;
const
char
*
root11
=
"/private"
TD_TMP_DIR_PATH
"tfsTest11"
;
const
char
*
root12
=
"/private"
TD_TMP_DIR_PATH
"tfsTest12"
;
const
char
*
root20
=
"/private"
TD_TMP_DIR_PATH
"tfsTest20"
;
const
char
*
root21
=
"/private"
TD_TMP_DIR_PATH
"tfsTest21"
;
const
char
*
root22
=
"/private"
TD_TMP_DIR_PATH
"tfsTest22"
;
const
char
*
root23
=
"/private"
TD_TMP_DIR_PATH
"tfsTest23"
;
#else
const
char
*
root00
=
TD_TMP_DIR_PATH
"tfsTest00"
;
const
char
*
root01
=
TD_TMP_DIR_PATH
"tfsTest01"
;
const
char
*
root10
=
TD_TMP_DIR_PATH
"tfsTest10"
;
...
...
@@ -308,6 +323,7 @@ TEST_F(TfsTest, 05_MultiDisk) {
const
char
*
root21
=
TD_TMP_DIR_PATH
"tfsTest21"
;
const
char
*
root22
=
TD_TMP_DIR_PATH
"tfsTest22"
;
const
char
*
root23
=
TD_TMP_DIR_PATH
"tfsTest23"
;
#endif
SDiskCfg
dCfg
[
9
]
=
{
0
};
tstrncpy
(
dCfg
[
0
].
dir
,
root01
,
TSDB_FILENAME_LEN
);
...
...
source/libs/wal/inc/walInt.h
浏览文件 @
71331d79
...
...
@@ -19,6 +19,7 @@
#include "taoserror.h"
#include "tchecksum.h"
#include "tcoding.h"
#include "tcommon.h"
#include "tcompare.h"
#include "wal.h"
...
...
source/libs/wal/src/walRead.c
浏览文件 @
71331d79
...
...
@@ -16,20 +16,29 @@
#include "taoserror.h"
#include "walInt.h"
SWalReadHandle
*
walOpenReadHandle
(
SWal
*
pWal
)
{
SWalReadHandle
*
pRead
=
taosMemoryMalloc
(
sizeof
(
SWalReadHandle
));
static
int32_t
walFetchHeadNew
(
SWalReader
*
pRead
,
int64_t
fetchVer
);
static
int32_t
walFetchBodyNew
(
SWalReader
*
pRead
);
static
int32_t
walSkipFetchBodyNew
(
SWalReader
*
pRead
);
SWalReader
*
walOpenReader
(
SWal
*
pWal
,
SWalFilterCond
*
cond
)
{
SWalReader
*
pRead
=
taosMemoryMalloc
(
sizeof
(
SWalReader
));
if
(
pRead
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pRead
->
pWal
=
pWal
;
pRead
->
p
ReadIdxT
File
=
NULL
;
pRead
->
p
ReadLogT
File
=
NULL
;
pRead
->
p
Idx
File
=
NULL
;
pRead
->
p
Log
File
=
NULL
;
pRead
->
curVersion
=
-
1
;
pRead
->
curFileFirstVer
=
-
1
;
pRead
->
capacity
=
0
;
pRead
->
status
=
0
;
if
(
cond
)
pRead
->
cond
=
*
cond
;
else
{
pRead
->
cond
.
scanMeta
=
0
;
pRead
->
cond
.
scanUncommited
=
0
;
}
taosThreadMutexInit
(
&
pRead
->
mutex
,
NULL
);
...
...
@@ -39,26 +48,46 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
taosMemoryFree
(
pRead
);
return
NULL
;
}
return
pRead
;
}
void
walCloseRead
Handle
(
SWalReadHandle
*
pRead
)
{
taosCloseFile
(
&
pRead
->
p
ReadIdxT
File
);
taosCloseFile
(
&
pRead
->
p
ReadLogT
File
);
void
walCloseRead
er
(
SWalReader
*
pRead
)
{
taosCloseFile
(
&
pRead
->
p
Idx
File
);
taosCloseFile
(
&
pRead
->
p
Log
File
);
taosMemoryFreeClear
(
pRead
->
pHead
);
taosMemoryFree
(
pRead
);
}
int32_t
walRegisterRead
(
SWalReadHandle
*
pRead
,
int64_t
ver
)
{
// TODO
return
0
;
int32_t
walNextValidMsg
(
SWalReader
*
pRead
)
{
int64_t
fetchVer
=
pRead
->
curVersion
;
int64_t
endVer
=
pRead
->
cond
.
scanUncommited
?
walGetLastVer
(
pRead
->
pWal
)
:
walGetCommittedVer
(
pRead
->
pWal
);
while
(
fetchVer
<=
endVer
)
{
if
(
walFetchHeadNew
(
pRead
,
fetchVer
)
<
0
)
{
return
-
1
;
}
if
(
pRead
->
pHead
->
head
.
msgType
==
TDMT_VND_SUBMIT
||
(
IS_META_MSG
(
pRead
->
pHead
->
head
.
msgType
)
&&
pRead
->
cond
.
scanMeta
))
{
if
(
walFetchBodyNew
(
pRead
)
<
0
)
{
return
-
1
;
}
return
0
;
}
else
{
if
(
walSkipFetchBodyNew
(
pRead
)
<
0
)
{
return
-
1
;
}
fetchVer
++
;
ASSERT
(
fetchVer
==
pRead
->
curVersion
);
}
}
return
-
1
;
}
static
int64_t
walReadSeekFilePos
(
SWalRead
Handle
*
pRead
,
int64_t
fileFirstVer
,
int64_t
ver
)
{
static
int64_t
walReadSeekFilePos
(
SWalRead
er
*
pRead
,
int64_t
fileFirstVer
,
int64_t
ver
)
{
int64_t
ret
=
0
;
TdFilePtr
pIdxTFile
=
pRead
->
p
ReadIdxT
File
;
TdFilePtr
pLogTFile
=
pRead
->
p
ReadLogT
File
;
TdFilePtr
pIdxTFile
=
pRead
->
p
Idx
File
;
TdFilePtr
pLogTFile
=
pRead
->
p
Log
File
;
// seek position
int64_t
offset
=
(
ver
-
fileFirstVer
)
*
sizeof
(
SWalIdxEntry
);
...
...
@@ -90,11 +119,11 @@ static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
return
ret
;
}
static
int32_t
walReadChangeFile
(
SWalRead
Handle
*
pRead
,
int64_t
fileFirstVer
)
{
static
int32_t
walReadChangeFile
(
SWalRead
er
*
pRead
,
int64_t
fileFirstVer
)
{
char
fnameStr
[
WAL_FILE_LEN
];
taosCloseFile
(
&
pRead
->
p
ReadIdxT
File
);
taosCloseFile
(
&
pRead
->
p
ReadLogT
File
);
taosCloseFile
(
&
pRead
->
p
Idx
File
);
taosCloseFile
(
&
pRead
->
p
Log
File
);
walBuildLogName
(
pRead
->
pWal
,
fileFirstVer
,
fnameStr
);
TdFilePtr
pLogTFile
=
taosOpenFile
(
fnameStr
,
TD_FILE_READ
);
...
...
@@ -104,7 +133,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return
-
1
;
}
pRead
->
p
ReadLogT
File
=
pLogTFile
;
pRead
->
p
Log
File
=
pLogTFile
;
walBuildIdxName
(
pRead
->
pWal
,
fileFirstVer
,
fnameStr
);
TdFilePtr
pIdxTFile
=
taosOpenFile
(
fnameStr
,
TD_FILE_READ
);
...
...
@@ -114,11 +143,11 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return
-
1
;
}
pRead
->
p
ReadIdxT
File
=
pIdxTFile
;
pRead
->
p
Idx
File
=
pIdxTFile
;
return
0
;
}
static
int32_t
walReadSeekVer
(
SWalRead
Handle
*
pRead
,
int64_t
ver
)
{
static
int32_t
walReadSeekVer
(
SWalRead
er
*
pRead
,
int64_t
ver
)
{
SWal
*
pWal
=
pRead
->
pWal
;
if
(
ver
==
pRead
->
curVersion
)
{
return
0
;
...
...
@@ -153,9 +182,94 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return
0
;
}
void
walSetReaderCapacity
(
SWalReadHandle
*
pRead
,
int32_t
capacity
)
{
pRead
->
capacity
=
capacity
;
}
void
walSetReaderCapacity
(
SWalReader
*
pRead
,
int32_t
capacity
)
{
pRead
->
capacity
=
capacity
;
}
static
int32_t
walFetchHeadNew
(
SWalReader
*
pRead
,
int64_t
fetchVer
)
{
int64_t
contLen
;
if
(
pRead
->
curVersion
!=
fetchVer
)
{
if
(
walReadSeekVer
(
pRead
,
fetchVer
)
<
0
)
return
-
1
;
}
contLen
=
taosReadFile
(
pRead
->
pLogFile
,
pRead
->
pHead
,
sizeof
(
SWalCkHead
));
if
(
contLen
!=
sizeof
(
SWalCkHead
))
{
if
(
contLen
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
}
else
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
pRead
->
curVersion
=
-
1
;
return
-
1
;
}
return
0
;
}
static
int32_t
walFetchBodyNew
(
SWalReader
*
pRead
)
{
SWalCont
*
pReadHead
=
&
pRead
->
pHead
->
head
;
int64_t
ver
=
pReadHead
->
version
;
if
(
pRead
->
capacity
<
pReadHead
->
bodyLen
)
{
void
*
ptr
=
taosMemoryRealloc
(
pRead
->
pHead
,
sizeof
(
SWalCkHead
)
+
pReadHead
->
bodyLen
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_WAL_OUT_OF_MEMORY
;
return
-
1
;
}
pRead
->
pHead
=
ptr
;
pReadHead
=
&
pRead
->
pHead
->
head
;
pRead
->
capacity
=
pReadHead
->
bodyLen
;
}
if
(
pReadHead
->
bodyLen
!=
taosReadFile
(
pRead
->
pLogFile
,
pReadHead
->
body
,
pReadHead
->
bodyLen
))
{
if
(
pReadHead
->
bodyLen
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"wal fetch body error: %"
PRId64
", read request version:%"
PRId64
", since %s"
,
pRead
->
pHead
->
head
.
version
,
ver
,
tstrerror
(
terrno
));
}
else
{
wError
(
"wal fetch body error: %"
PRId64
", read request version:%"
PRId64
", since file corrupted"
,
pRead
->
pHead
->
head
.
version
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
pRead
->
curVersion
=
-
1
;
ASSERT
(
0
);
return
-
1
;
}
if
(
pReadHead
->
version
!=
ver
)
{
wError
(
"wal fetch body error: %"
PRId64
", read request version:%"
PRId64
""
,
pRead
->
pHead
->
head
.
version
,
ver
);
pRead
->
curVersion
=
-
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
0
);
return
-
1
;
}
if
(
walValidBodyCksum
(
pRead
->
pHead
)
!=
0
)
{
wError
(
"wal fetch body error: % "
PRId64
", since body checksum not passed"
,
ver
);
pRead
->
curVersion
=
-
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
0
);
return
-
1
;
}
pRead
->
curVersion
=
ver
+
1
;
return
0
;
}
static
int32_t
walSkipFetchBodyNew
(
SWalReader
*
pRead
)
{
int64_t
code
;
ASSERT
(
pRead
->
curVersion
==
pRead
->
pHead
->
head
.
version
);
int32_t
walFetchHead
(
SWalReadHandle
*
pRead
,
int64_t
ver
,
SWalCkHead
*
pHead
)
{
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pRead
->
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
pRead
->
curVersion
=
-
1
;
return
-
1
;
}
pRead
->
curVersion
++
;
return
0
;
}
int32_t
walFetchHead
(
SWalReader
*
pRead
,
int64_t
ver
,
SWalCkHead
*
pHead
)
{
int64_t
code
;
// TODO: valid ver
...
...
@@ -168,9 +282,9 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
if
(
code
<
0
)
return
-
1
;
}
ASSERT
(
taosValidFile
(
pRead
->
p
ReadLogT
File
)
==
true
);
ASSERT
(
taosValidFile
(
pRead
->
p
Log
File
)
==
true
);
code
=
taosReadFile
(
pRead
->
p
ReadLogT
File
,
pHead
,
sizeof
(
SWalCkHead
));
code
=
taosReadFile
(
pRead
->
p
Log
File
,
pHead
,
sizeof
(
SWalCkHead
));
if
(
code
!=
sizeof
(
SWalCkHead
))
{
return
-
1
;
}
...
...
@@ -186,12 +300,12 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
return
0
;
}
int32_t
walSkipFetchBody
(
SWalRead
Handle
*
pRead
,
const
SWalCkHead
*
pHead
)
{
int32_t
walSkipFetchBody
(
SWalRead
er
*
pRead
,
const
SWalCkHead
*
pHead
)
{
int64_t
code
;
ASSERT
(
pRead
->
curVersion
==
pHead
->
head
.
version
);
code
=
taosLSeekFile
(
pRead
->
p
ReadLogT
File
,
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
code
=
taosLSeekFile
(
pRead
->
p
Log
File
,
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
pRead
->
curVersion
=
-
1
;
...
...
@@ -203,7 +317,7 @@ int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) {
return
0
;
}
int32_t
walFetchBody
(
SWalRead
Handle
*
pRead
,
SWalCkHead
**
ppHead
)
{
int32_t
walFetchBody
(
SWalRead
er
*
pRead
,
SWalCkHead
**
ppHead
)
{
SWalCont
*
pReadHead
=
&
((
*
ppHead
)
->
head
);
int64_t
ver
=
pReadHead
->
version
;
...
...
@@ -218,7 +332,7 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
pRead
->
capacity
=
pReadHead
->
bodyLen
;
}
if
(
pReadHead
->
bodyLen
!=
taosReadFile
(
pRead
->
p
ReadLogT
File
,
pReadHead
->
body
,
pReadHead
->
bodyLen
))
{
if
(
pReadHead
->
bodyLen
!=
taosReadFile
(
pRead
->
p
Log
File
,
pReadHead
->
body
,
pReadHead
->
bodyLen
))
{
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -241,9 +355,9 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
return
0
;
}
int32_t
walReadWithHandle_s
(
SWalRead
Handle
*
pRead
,
int64_t
ver
,
SWalCont
**
ppHead
)
{
int32_t
walReadWithHandle_s
(
SWalRead
er
*
pRead
,
int64_t
ver
,
SWalCont
**
ppHead
)
{
taosThreadMutexLock
(
&
pRead
->
mutex
);
if
(
walRead
WithHandle
(
pRead
,
ver
)
<
0
)
{
if
(
walRead
Ver
(
pRead
,
ver
)
<
0
)
{
taosThreadMutexUnlock
(
&
pRead
->
mutex
);
return
-
1
;
}
...
...
@@ -257,7 +371,7 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHea
return
0
;
}
int32_t
walRead
WithHandle
(
SWalReadHandle
*
pRead
,
int64_t
ver
)
{
int32_t
walRead
Ver
(
SWalReader
*
pRead
,
int64_t
ver
)
{
int64_t
code
;
if
(
pRead
->
pWal
->
vers
.
firstVer
==
-
1
)
{
...
...
@@ -280,9 +394,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return
-
1
;
}
ASSERT
(
taosValidFile
(
pRead
->
p
ReadLogT
File
)
==
true
);
ASSERT
(
taosValidFile
(
pRead
->
p
Log
File
)
==
true
);
code
=
taosReadFile
(
pRead
->
p
ReadLogT
File
,
pRead
->
pHead
,
sizeof
(
SWalCkHead
));
code
=
taosReadFile
(
pRead
->
p
Log
File
,
pRead
->
pHead
,
sizeof
(
SWalCkHead
));
if
(
code
!=
sizeof
(
SWalCkHead
))
{
if
(
code
<
0
)
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -310,7 +424,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
pRead
->
capacity
=
pRead
->
pHead
->
head
.
bodyLen
;
}
if
((
code
=
taosReadFile
(
pRead
->
p
ReadLogT
File
,
pRead
->
pHead
->
head
.
body
,
pRead
->
pHead
->
head
.
bodyLen
))
!=
if
((
code
=
taosReadFile
(
pRead
->
p
Log
File
,
pRead
->
pHead
->
head
.
body
,
pRead
->
pHead
->
head
.
bodyLen
))
!=
pRead
->
pHead
->
head
.
bodyLen
)
{
if
(
code
<
0
)
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -340,46 +454,3 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return
0
;
}
#if 0
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int code;
code = walSeekVer(pWal, ver);
if (code != 0) {
return code;
}
if (*ppHead == NULL) {
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead));
if (ptr == NULL) {
return -1;
}
*ppHead = ptr;
}
if (tfRead(pWal->pWriteLogTFile, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
return -1;
}
// TODO: endian compatibility processing after read
if (walValidHeadCksum(*ppHead) != 0) {
return -1;
}
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
if (ptr == NULL) {
taosMemoryFree(*ppHead);
*ppHead = NULL;
return -1;
}
if (tfRead(pWal->pWriteLogTFile, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
return -1;
}
// TODO: endian compatibility processing after read
if (walValidBodyCksum(*ppHead) != 0) {
return -1;
}
return 0;
}
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
return 0;
}
#endif
source/libs/wal/src/walWrite.c
浏览文件 @
71331d79
...
...
@@ -79,19 +79,21 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
}
int32_t
walRollback
(
SWal
*
pWal
,
int64_t
ver
)
{
taosThreadMutexLock
(
&
pWal
->
mutex
);
int64_t
code
;
char
fnameStr
[
WAL_FILE_LEN
];
if
(
ver
>
pWal
->
vers
.
lastVer
||
ver
<
pWal
->
vers
.
commitVer
)
{
terrno
=
TSDB_CODE_WAL_INVALID_VER
;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
taosThreadMutexLock
(
&
pWal
->
mutex
);
// find correct file
if
(
ver
<
walGetLastFileFirstVer
(
pWal
))
{
// change current files
code
=
walChangeWrite
(
pWal
,
ver
);
if
(
code
<
0
)
{
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
...
...
@@ -146,6 +148,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT
(
taosValidFile
(
pLogTFile
));
int64_t
size
=
taosReadFile
(
pLogTFile
,
&
head
,
sizeof
(
SWalCkHead
));
if
(
size
!=
sizeof
(
SWalCkHead
))
{
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
code
=
walValidHeadCksum
(
&
head
);
...
...
@@ -154,11 +157,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
0
);
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
if
(
head
.
head
.
version
!=
ver
)
{
ASSERT
(
0
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
...
...
@@ -167,12 +172,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
if
(
code
<
0
)
{
ASSERT
(
0
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
code
=
taosFtruncateFile
(
pIdxTFile
,
idxOff
);
if
(
code
<
0
)
{
ASSERT
(
0
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
-
1
;
}
pWal
->
vers
.
lastVer
=
ver
-
1
;
...
...
source/libs/wal/test/walMetaTest.cpp
浏览文件 @
71331d79
...
...
@@ -292,8 +292,8 @@ TEST_F(WalCleanDeleteEnv, roll) {
TEST_F
(
WalKeepEnv
,
readHandleRead
)
{
walResetEnv
();
int
code
;
SWalRead
Handle
*
pRead
=
walOpenReadHandle
(
pWal
);
int
code
;
SWalRead
er
*
pRead
=
walOpenReader
(
pWal
,
NULL
);
ASSERT
(
pRead
!=
NULL
);
int
i
;
...
...
@@ -306,7 +306,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
int
ver
=
taosRand
()
%
100
;
code
=
walRead
WithHandle
(
pRead
,
ver
);
code
=
walRead
Ver
(
pRead
,
ver
);
ASSERT_EQ
(
code
,
0
);
// printf("rrbody: \n");
...
...
@@ -325,7 +325,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
EXPECT_EQ
(
newStr
[
j
],
pRead
->
pHead
->
head
.
body
[
j
]);
}
}
walCloseRead
Handle
(
pRead
);
walCloseRead
er
(
pRead
);
}
TEST_F
(
WalRetentionEnv
,
repairMeta1
)
{
...
...
@@ -354,12 +354,12 @@ TEST_F(WalRetentionEnv, repairMeta1) {
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
99
);
SWalRead
Handle
*
pRead
=
walOpenReadHandle
(
pWal
);
SWalRead
er
*
pRead
=
walOpenReader
(
pWal
,
NULL
);
ASSERT
(
pRead
!=
NULL
);
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
int
ver
=
taosRand
()
%
100
;
code
=
walRead
WithHandle
(
pRead
,
ver
);
code
=
walRead
Ver
(
pRead
,
ver
);
ASSERT_EQ
(
code
,
0
);
// printf("rrbody: \n");
...
...
@@ -389,7 +389,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
int
ver
=
taosRand
()
%
200
;
code
=
walRead
WithHandle
(
pRead
,
ver
);
code
=
walRead
Ver
(
pRead
,
ver
);
ASSERT_EQ
(
code
,
0
);
// printf("rrbody: \n");
...
...
source/os/src/osTimezone.c
浏览文件 @
71331d79
...
...
@@ -857,19 +857,27 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
return
;
}
buf
[
n
]
=
'\0'
;
for
(
int
i
=
n
-
1
;
i
>=
0
;
--
i
)
{
if
(
buf
[
i
]
==
'/'
)
{
if
(
tz
)
{
tz
=
buf
+
i
+
1
;
break
;
}
tz
=
buf
+
i
+
1
;
}
}
if
(
!
tz
||
0
==
strchr
(
tz
,
'/'
))
{
char
*
zi
=
strstr
(
buf
,
"zoneinfo"
);
if
(
!
zi
)
{
printf
(
"parsing /etc/localtime failed"
);
return
;
}
tz
=
zi
+
strlen
(
"zoneinfo"
)
+
1
;
//for (int i = n - 1; i >= 0; --i) {
// if (buf[i] == '/') {
// if (tz) {
// tz = buf + i + 1;
// break;
// }
// tz = buf + i + 1;
// }
//}
//if (!tz || 0 == strchr(tz, '/')) {
// printf("parsing /etc/localtime failed");
// return;
//}
setenv
(
"TZ"
,
tz
,
1
);
tzset
();
...
...
@@ -900,7 +908,7 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
int
n
=
readlink
(
"/etc/localtime"
,
buf
,
sizeof
(
buf
));
if
(
n
<
0
)
{
printf
(
"read /etc/localtime error, reason:%s"
,
strerror
(
errno
));
if
(
taosCheckExistFile
(
"/etc/timezone"
))
{
/*
* NOTE: do not remove it.
...
...
@@ -962,19 +970,27 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
return
;
}
buf
[
n
]
=
'\0'
;
for
(
int
i
=
n
-
1
;
i
>=
0
;
--
i
)
{
if
(
buf
[
i
]
==
'/'
)
{
if
(
tz
)
{
tz
=
buf
+
i
+
1
;
break
;
}
tz
=
buf
+
i
+
1
;
}
}
if
(
!
tz
||
0
==
strchr
(
tz
,
'/'
))
{
char
*
zi
=
strstr
(
buf
,
"zoneinfo"
);
if
(
!
zi
)
{
printf
(
"parsing /etc/localtime failed"
);
return
;
}
tz
=
zi
+
strlen
(
"zoneinfo"
)
+
1
;
//for (int i = n - 1; i >= 0; --i) {
// if (buf[i] == '/') {
// if (tz) {
// tz = buf + i + 1;
// break;
// }
// tz = buf + i + 1;
// }
//}
//if (!tz || 0 == strchr(tz, '/')) {
// printf("parsing /etc/localtime failed");
// return;
//}
setenv
(
"TZ"
,
tz
,
1
);
tzset
();
...
...
source/util/src/tutil.c
浏览文件 @
71331d79
...
...
@@ -64,6 +64,20 @@ int32_t strdequote(char *z) {
return
j
+
1
;
// only one quote, do nothing
}
char
*
strDupUnquo
(
const
char
*
src
)
{
if
(
src
==
NULL
)
return
NULL
;
if
(
src
[
0
]
!=
'`'
)
return
strdup
(
src
);
int32_t
len
=
(
int32_t
)
strlen
(
src
);
if
(
src
[
len
-
1
]
!=
'`'
)
return
NULL
;
char
*
ret
=
taosMemoryMalloc
(
len
);
if
(
ret
==
NULL
)
return
NULL
;
for
(
int32_t
i
=
0
;
i
<
len
-
1
;
i
++
)
{
ret
[
i
]
=
src
[
i
+
1
];
}
ret
[
len
-
1
]
=
0
;
return
ret
;
}
size_t
strtrim
(
char
*
z
)
{
int32_t
i
=
0
;
int32_t
j
=
0
;
...
...
tests/script/tsim/stream/session0.sim
浏览文件 @
71331d79
...
...
@@ -141,7 +141,7 @@ if $data01 != 7 then
goto loop1
endi
if $data02 !=
9
then
if $data02 !=
18
then
print =====data02=$data02
goto loop1
endi
...
...
@@ -151,22 +151,22 @@ if $data03 != 4 then
goto loop1
endi
if $data04 != 1.
1
00000000 then
if $data04 != 1.
0
00000000 then
print ======$data04
return -1
endi
if $data05 !=
0.816496581
then
if $data05 !=
1.154700538
then
print ======$data05
return -1
endi
if $data06 !=
3
then
if $data06 !=
4
then
print ======$data06
return -1
endi
if $data07 != 1.
1
00000000 then
if $data07 != 1.
0
00000000 then
print ======$data07
return -1
endi
...
...
@@ -235,7 +235,7 @@ sql create stream streams4 trigger at_once watermark 1d into streamt4 as select
# sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s);
# sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s);
#
sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s);
sql insert into t1 values(1648791213001,1,1,1,1.0);
sql insert into t1 values(1648791213002,2,3,2,3.4);
sql insert into t1 values(1648791213003,4,9,3,4.8);
...
...
@@ -288,10 +288,10 @@ if $rows == 0 then
goto loop3
endi
sql select * from streamt8;
if $rows == 0 then
print ======$rows
goto loop3
endi
#
sql select * from streamt8;
#
if $rows == 0 then
#
print ======$rows
#
goto loop3
#
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/tsim/valgrind/basic1.sim
浏览文件 @
71331d79
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
#system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
...
...
@@ -18,44 +19,29 @@ if $rows != 1 then
return -1
endi
print =============== step2: create alter drop show user
sql create user u1 pass 'taosdata'
sql show users
sql alter user u1 sysinfo 1
sql alter user u1 enable 1
sql alter user u1 pass 'taosdata'
sql drop user u1
sql_error alter user u2 sysinfo 0
print =============== step3: create drop dnode
sql create dnode $hostname port 7200
sql drop dnode 2
sql alter dnode 1 'debugflag 143'
print =============== step4: create show database
sql create database d1 vgroups 1
print =============== step2: create show database
sql create database d1 vgroups 1 buffer 3
sql show databases
sql show d1.vgroups
sql use d1
sql show vgroups
print =============== step
5
: create show stable
print =============== step
3
: create show stable
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql show stables
if $rows != 1 then
return -1
endi
goto _OVER
print =============== step6: create show table
print =============== step4: create show table
sql create table ct1 using stb tags(1000)
sql show tables
if $rows != 1 then
return -1
endi
print =============== step
7
: insert data
print =============== step
5
: insert data
print =============== step
7
: select data
print =============== step
6
: select data
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/tsim/valgrind/basic2.sim
浏览文件 @
71331d79
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
...
...
@@ -18,10 +19,14 @@ if $rows != 1 then
return -1
endi
print =============== step2
: create db
sql create database db vgroups 1
print =============== step2
sql create database db vgroups 1
buffer 3
sql use db
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql create table ct1 using stb tags(1000)
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/tsim/valgrind/checkError1.sim
浏览文件 @
71331d79
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
...
...
@@ -28,7 +29,10 @@ sql alter user u1 pass 'taosdata'
sql drop user u1
sql_error alter user u2 sysinfo 0
print =============== step3:
print =============== step3: create drop dnode
sql create dnode $hostname port 7200
sql drop dnode 2
sql alter dnode 1 'debugflag 131'
print =============== stop
system sh/exec.sh -n dnode1 -s stop -x SIGINT
...
...
tests/script/tsim/valgrind/checkError2.sim
浏览文件 @
71331d79
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
...
...
@@ -19,7 +20,17 @@ if $rows != 1 then
endi
print =============== step2: create db
sql create database db vgroups 1
sql create database d1 vgroups 1 buffer 3
sql show databases
sql use d1
sql show vgroups
print =============== step3: create show stable
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql show stables
if $rows != 1 then
return -1
endi
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
...
...
@@ -29,7 +40,7 @@ print ----> start to check if there are ERRORS in vagrind log file for each dnod
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content <=
8
then
if $system_content <=
0
then
return 0
endi
...
...
tests/system-test/7-tmq/stbTagFilter.py
浏览文件 @
71331d79
...
...
@@ -25,7 +25,7 @@ class TDTestCase:
paraDict
=
{
'dbName'
:
'db2'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
...
...
@@ -44,7 +44,7 @@ class TDTestCase:
topicNameList
=
[
'topic1'
]
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
4
,
replica
=
1
)
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
1
,
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tdCom
.
create_stable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
column_elm_list
=
paraDict
[
'colSchema'
],
tag_elm_list
=
paraDict
[
'tagSchema'
])
tdLog
.
info
(
"create ctb"
)
...
...
tests/system-test/failed.txt
0 → 100644
浏览文件 @
71331d79
#python3 ./test.py -f 2-query/last.py -Q 3
tests/system-test/fulltest.sh
浏览文件 @
71331d79
...
...
@@ -295,7 +295,7 @@ python3 ./test.py -f 2-query/Today.py -Q 3
python3 ./test.py
-f
2-query/max.py
-Q
3
python3 ./test.py
-f
2-query/min.py
-Q
3
python3 ./test.py
-f
2-query/count.py
-Q
3
python3 ./test.py
-f
2-query/last.py
-Q
3
#
python3 ./test.py -f 2-query/last.py -Q 3
python3 ./test.py
-f
2-query/first.py
-Q
3
python3 ./test.py
-f
2-query/To_iso8601.py
-Q
3
python3 ./test.py
-f
2-query/To_unixtimestamp.py
-Q
3
...
...
taosadapter
@
389047db
比较
c885e967
...
389047db
Subproject commit
c885e967e490105999b84d009a15168728dfafaf
Subproject commit
389047db713a3dddfbce292c3260b0864b17d936
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录