Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5fd4452e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
5fd4452e
编写于
7月 08, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into fix/tsdb_merge_bug
上级
74501e8f
d7956b6e
变更
39
隐藏空白更改
内联
并排
Showing
39 changed file
with
718 addition
and
215 deletion
+718
-215
include/common/tcommon.h
include/common/tcommon.h
+14
-0
include/libs/executor/executor.h
include/libs/executor/executor.h
+6
-3
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+0
-5
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+2
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+2
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+41
-23
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+2
-21
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+84
-1
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+6
-4
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+18
-6
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+4
-4
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+2
-2
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+115
-57
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+6
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+5
-5
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+31
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+10
-10
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+18
-23
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+11
-5
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+0
-1
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+9
-8
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+44
-4
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+8
-4
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+1
-1
source/libs/sync/src/syncRaftCfg.c
source/libs/sync/src/syncRaftCfg.c
+4
-4
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+17
-2
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+2
-2
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+1
-1
source/util/src/tpagedbuf.c
source/util/src/tpagedbuf.c
+2
-2
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+2
-2
tests/script/loop.sh
tests/script/loop.sh
+7
-1
tests/script/tsim/sync/vnodesnapshot.sim
tests/script/tsim/sync/vnodesnapshot.sim
+2
-2
tests/script/tsim/user/basic.sim
tests/script/tsim/user/basic.sim
+1
-1
tests/script/tsim/valgrind/basic1.sim
tests/script/tsim/valgrind/basic1.sim
+1
-1
tests/script/tsim/valgrind/checkError1.sim
tests/script/tsim/valgrind/checkError1.sim
+5
-3
tests/script/tsim/valgrind/checkError2.sim
tests/script/tsim/valgrind/checkError2.sim
+20
-2
tests/script/tsim/valgrind/checkError3.sim
tests/script/tsim/valgrind/checkError3.sim
+156
-0
tests/system-test/2-query/json_tag.py
tests/system-test/2-query/json_tag.py
+4
-4
tests/system-test/loop.sh
tests/system-test/loop.sh
+55
-0
未找到文件。
include/common/tcommon.h
浏览文件 @
5fd4452e
...
...
@@ -117,6 +117,20 @@ typedef struct SSDataBlock {
SDataBlockInfo
info
;
}
SSDataBlock
;
enum
{
FETCH_TYPE__DATA
=
1
,
FETCH_TYPE__META
,
FETCH_TYPE__NONE
,
};
typedef
struct
{
int8_t
fetchType
;
union
{
SSDataBlock
data
;
void
*
meta
;
};
}
SFetchRet
;
typedef
struct
SVarColAttr
{
int32_t
*
offset
;
// start position for each entry in the list
uint32_t
length
;
// used buffer size that contain the valid data
...
...
include/libs/executor/executor.h
浏览文件 @
5fd4452e
...
...
@@ -30,7 +30,7 @@ struct SRpcMsg;
struct
SSubplan
;
typedef
struct
SReadHandle
{
void
*
stream
Reader
;
void
*
tq
Reader
;
void
*
meta
;
void
*
config
;
void
*
vnode
;
...
...
@@ -38,7 +38,7 @@ typedef struct SReadHandle {
SMsgCb
*
pMsgCb
;
bool
initMetaReader
;
bool
initTableReader
;
bool
init
Stream
Reader
;
bool
init
Tq
Reader
;
}
SReadHandle
;
typedef
enum
{
...
...
@@ -52,7 +52,7 @@ typedef enum {
* @param streamReadHandle
* @return
*/
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
msg
,
void
*
streamReadHandle
);
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
);
/**
* Switch the stream scan to snapshot mode
...
...
@@ -176,6 +176,9 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
uint64_t
uid
,
int64_t
ts
);
void
*
qExtractReaderFromStreamScanner
(
void
*
scanner
);
int32_t
qExtractStreamScanner
(
qTaskInfo_t
tinfo
,
void
**
scanner
);
#ifdef __cplusplus
}
#endif
...
...
include/libs/stream/tstream.h
浏览文件 @
5fd4452e
...
...
@@ -235,11 +235,6 @@ typedef struct SStreamTask {
int8_t
taskStatus
;
int8_t
execStatus
;
// exec info
int64_t
enqueueVer
;
int64_t
processedVer
;
int64_t
checkpointVer
;
// node info
int32_t
selfChildId
;
int32_t
nodeId
;
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
5fd4452e
...
...
@@ -238,10 +238,12 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PING
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PING_REPLY
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_CLIENT_REQUEST
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_CLIENT_REQUEST_BATCH
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_CLIENT_REQUEST_REPLY
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_REQUEST_VOTE
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_REQUEST_VOTE_REPLY
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES_BATCH
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES_REPLY
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_SEND
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_RSP
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
5fd4452e
...
...
@@ -377,10 +377,12 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PING
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PING_REPLY
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_CLIENT_REQUEST
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_CLIENT_REQUEST_BATCH
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_CLIENT_REQUEST_REPLY
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_REQUEST_VOTE
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_REQUEST_VOTE_REPLY
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES_BATCH
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES_REPLY
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SET_VNODE_STANDBY
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
5fd4452e
...
...
@@ -64,8 +64,8 @@ int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32
int32_t
vnodeProcessCreateTSma
(
SVnode
*
pVnode
,
void
*
pCont
,
uint32_t
contLen
);
int32_t
vnodeGetAllTableList
(
SVnode
*
pVnode
,
uint64_t
uid
,
SArray
*
list
);
int32_t
vnodeGetCtbIdList
(
SVnode
*
pVnode
,
int64_t
suid
,
SArray
*
list
);
void
*
vnodeGetIdx
(
SVnode
*
pVnode
);
void
*
vnodeGetIvtIdx
(
SVnode
*
pVnode
);
void
*
vnodeGetIdx
(
SVnode
*
pVnode
);
void
*
vnodeGetIvtIdx
(
SVnode
*
pVnode
);
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
);
int32_t
vnodeValidateTableHash
(
SVnode
*
pVnode
,
char
*
tableFName
);
...
...
@@ -95,7 +95,7 @@ typedef struct SMetaFltParam {
tb_uid_t
suid
;
int16_t
cid
;
int16_t
type
;
char
*
val
;
char
*
val
;
bool
reverse
;
int
(
*
filterFunc
)(
void
*
a
,
void
*
b
,
int16_t
type
);
...
...
@@ -136,8 +136,8 @@ SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdLis
int32_t
tsdbReaderReset
(
STsdbReader
*
pReader
,
SQueryTableDataCond
*
pCond
,
int32_t
tWinIdx
);
int32_t
tsdbGetFileBlocksDistInfo
(
STsdbReader
*
pReader
,
STableBlockDistInfo
*
pTableBlockInfo
);
int64_t
tsdbGetNumOfRowsInMemTable
(
STsdbReader
*
pHandle
);
void
*
tsdbGetIdx
(
SMeta
*
pMeta
);
void
*
tsdbGetIvtIdx
(
SMeta
*
pMeta
);
void
*
tsdbGetIdx
(
SMeta
*
pMeta
);
void
*
tsdbGetIvtIdx
(
SMeta
*
pMeta
);
int32_t
tsdbLastRowReaderOpen
(
void
*
pVnode
,
int32_t
type
,
SArray
*
pTableIdList
,
int32_t
*
colId
,
int32_t
numOfCols
,
void
**
pReader
);
...
...
@@ -146,19 +146,37 @@ int32_t tsdbLastrowReaderClose(void *pReader);
// tq
typedef
struct
STqReadHandle
SStreamReader
;
typedef
struct
STqReader
{
int64_t
ver
;
const
SSubmitReq
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitBlkIter
blkIter
;
SStreamReader
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
)
;
SWalReader
*
pWalReader
;
void
tqReadHandleSetColIdList
(
SStreamReader
*
pReadHandle
,
SArray
*
pColIdList
);
int32_t
tqReadHandleSetTbUidList
(
SStreamReader
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleAddTbUidList
(
SStreamReader
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleRemoveTbUidList
(
SStreamReader
*
pHandle
,
const
SArray
*
tbUidList
);
SMeta
*
pVnodeMeta
;
SHashObj
*
tbIdHash
;
SArray
*
pColIdList
;
// SArray<int16_t>
int32_t
tqReadHandleSetMsg
(
SStreamReader
*
pHandle
,
SSubmitReq
*
pMsg
,
int64_t
ver
);
bool
tqNextDataBlock
(
SStreamReader
*
pHandle
);
bool
tqNextDataBlockFilterOut
(
SStreamReader
*
pHandle
,
SHashObj
*
filterOutUids
);
int32_t
tqRetrieveDataBlock
(
SSDataBlock
*
pBlock
,
SStreamReader
*
pHandle
);
int32_t
cachedSchemaVer
;
int64_t
cachedSchemaSuid
;
SSchemaWrapper
*
pSchemaWrapper
;
STSchema
*
pSchema
;
}
STqReader
;
STqReader
*
tqOpenReader
(
SVnode
*
pVnode
);
void
tqCloseReader
(
STqReader
*
);
void
tqReaderSetColIdList
(
STqReader
*
pReader
,
SArray
*
pColIdList
);
int32_t
tqReaderSetTbUidList
(
STqReader
*
pReader
,
const
SArray
*
tbUidList
);
int32_t
tqReaderAddTbUidList
(
STqReader
*
pReader
,
const
SArray
*
tbUidList
);
int32_t
tqReaderRemoveTbUidList
(
STqReader
*
pReader
,
const
SArray
*
tbUidList
);
int32_t
tqReaderSetDataMsg
(
STqReader
*
pReader
,
SSubmitReq
*
pMsg
,
int64_t
ver
);
bool
tqNextDataBlock
(
STqReader
*
pReader
);
bool
tqNextDataBlockFilterOut
(
STqReader
*
pReader
,
SHashObj
*
filterOutUids
);
int32_t
tqRetrieveDataBlock
(
SSDataBlock
*
pBlock
,
STqReader
*
pReader
);
// sma
int32_t
smaGetTSmaDays
(
SVnodeCfg
*
pCfg
,
void
*
pCont
,
uint32_t
contLen
,
int32_t
*
days
);
...
...
@@ -214,7 +232,7 @@ struct SMetaEntry {
int8_t
type
;
int8_t
flags
;
// TODO: need refactor?
tb_uid_t
uid
;
char
*
name
;
char
*
name
;
union
{
struct
{
SSchemaWrapper
schemaRow
;
...
...
@@ -225,7 +243,7 @@ struct SMetaEntry {
int64_t
ctime
;
int32_t
ttlDays
;
int32_t
commentLen
;
char
*
comment
;
char
*
comment
;
tb_uid_t
suid
;
uint8_t
*
pTags
;
}
ctbEntry
;
...
...
@@ -233,7 +251,7 @@ struct SMetaEntry {
int64_t
ctime
;
int32_t
ttlDays
;
int32_t
commentLen
;
char
*
comment
;
char
*
comment
;
int32_t
ncid
;
// next column id
SSchemaWrapper
schemaRow
;
}
ntbEntry
;
...
...
@@ -247,17 +265,17 @@ struct SMetaEntry {
struct
SMetaReader
{
int32_t
flags
;
SMeta
*
pMeta
;
SMeta
*
pMeta
;
SDecoder
coder
;
SMetaEntry
me
;
void
*
pBuf
;
void
*
pBuf
;
int32_t
szBuf
;
};
struct
SMTbCursor
{
TBC
*
pDbc
;
void
*
pKey
;
void
*
pVal
;
TBC
*
pDbc
;
void
*
pKey
;
void
*
pVal
;
int32_t
kLen
;
int32_t
vLen
;
SMetaReader
mr
;
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
5fd4452e
...
...
@@ -44,25 +44,6 @@ extern "C" {
typedef
struct
STqOffsetStore
STqOffsetStore
;
// tqRead
struct
STqReadHandle
{
int64_t
ver
;
const
SSubmitReq
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitBlkIter
blkIter
;
SMeta
*
pVnodeMeta
;
SHashObj
*
tbIdHash
;
SArray
*
pColIdList
;
// SArray<int16_t>
int32_t
cachedSchemaVer
;
int64_t
cachedSchemaSuid
;
SSchemaWrapper
*
pSchemaWrapper
;
STSchema
*
pSchema
;
};
// tqPush
typedef
struct
{
...
...
@@ -102,7 +83,7 @@ typedef struct {
typedef
struct
{
int8_t
subType
;
S
Stream
Reader
*
pExecReader
[
5
];
S
Tq
Reader
*
pExecReader
[
5
];
union
{
STqExecCol
execCol
;
STqExecTb
execTb
;
...
...
@@ -118,7 +99,7 @@ typedef struct {
int32_t
epoch
;
int8_t
fetchMeta
;
//
reader
//
TODO remove
SWalReader
*
pWalReader
;
// push
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
5fd4452e
...
...
@@ -16,6 +16,7 @@
#include "meta.h"
static
int
metaSaveJsonVarToIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pCtbEntry
,
const
SSchema
*
pSchema
);
static
int
metaDelJsonVarFromIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pCtbEntry
,
const
SSchema
*
pSchema
);
static
int
metaHandleEntry
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaSaveToTbDb
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaUpdateUidIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
...
...
@@ -92,7 +93,65 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
}
else
if
(
type
==
TSDB_DATA_TYPE_BOOL
)
{
int
val
=
*
(
int
*
)(
&
pTagVal
->
i64
);
int
len
=
sizeof
(
val
);
term
=
indexTermCreate
(
suid
,
ADD_VALUE
,
TSDB_DATA_TYPE_INT
,
key
,
nKey
,
(
const
char
*
)
&
val
,
len
);
term
=
indexTermCreate
(
suid
,
ADD_VALUE
,
TSDB_DATA_TYPE_BOOL
,
key
,
nKey
,
(
const
char
*
)
&
val
,
len
);
}
if
(
term
!=
NULL
)
{
indexMultiTermAdd
(
terms
,
term
);
}
}
indexJsonPut
(
pMeta
->
pTagIvtIdx
,
terms
,
tuid
);
indexMultiTermDestroy
(
terms
);
#endif
return
0
;
}
int
metaDelJsonVarFromIdx
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pCtbEntry
,
const
SSchema
*
pSchema
)
{
#ifdef USE_INVERTED_INDEX
if
(
pMeta
->
pTagIvtIdx
==
NULL
||
pCtbEntry
==
NULL
)
{
return
-
1
;
}
void
*
data
=
pCtbEntry
->
ctbEntry
.
pTags
;
const
char
*
tagName
=
pSchema
->
name
;
tb_uid_t
suid
=
pCtbEntry
->
ctbEntry
.
suid
;
tb_uid_t
tuid
=
pCtbEntry
->
uid
;
const
void
*
pTagData
=
pCtbEntry
->
ctbEntry
.
pTags
;
int32_t
nTagData
=
0
;
SArray
*
pTagVals
=
NULL
;
if
(
tTagToValArray
((
const
STag
*
)
data
,
&
pTagVals
)
!=
0
)
{
return
-
1
;
}
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
int16_t
nCols
=
taosArrayGetSize
(
pTagVals
);
for
(
int
i
=
0
;
i
<
nCols
;
i
++
)
{
STagVal
*
pTagVal
=
(
STagVal
*
)
taosArrayGet
(
pTagVals
,
i
);
char
type
=
pTagVal
->
type
;
char
*
key
=
pTagVal
->
pKey
;
int32_t
nKey
=
strlen
(
key
);
SIndexTerm
*
term
=
NULL
;
if
(
type
==
TSDB_DATA_TYPE_NULL
)
{
term
=
indexTermCreate
(
suid
,
DEL_VALUE
,
TSDB_DATA_TYPE_VARCHAR
,
key
,
nKey
,
NULL
,
0
);
}
else
if
(
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
pTagVal
->
nData
>
0
)
{
char
*
val
=
taosMemoryCalloc
(
1
,
pTagVal
->
nData
+
VARSTR_HEADER_SIZE
);
int32_t
len
=
taosUcs4ToMbs
((
TdUcs4
*
)
pTagVal
->
pData
,
pTagVal
->
nData
,
val
+
VARSTR_HEADER_SIZE
);
memcpy
(
val
,
(
uint16_t
*
)
&
len
,
VARSTR_HEADER_SIZE
);
type
=
TSDB_DATA_TYPE_VARCHAR
;
term
=
indexTermCreate
(
suid
,
DEL_VALUE
,
type
,
key
,
nKey
,
val
,
len
);
}
else
if
(
pTagVal
->
nData
==
0
)
{
term
=
indexTermCreate
(
suid
,
DEL_VALUE
,
TSDB_DATA_TYPE_VARCHAR
,
key
,
nKey
,
pTagVal
->
pData
,
0
);
}
}
else
if
(
type
==
TSDB_DATA_TYPE_DOUBLE
)
{
double
val
=
*
(
double
*
)(
&
pTagVal
->
i64
);
int
len
=
sizeof
(
val
);
term
=
indexTermCreate
(
suid
,
DEL_VALUE
,
type
,
key
,
nKey
,
(
const
char
*
)
&
val
,
len
);
}
else
if
(
type
==
TSDB_DATA_TYPE_BOOL
)
{
int
val
=
*
(
int
*
)(
&
pTagVal
->
i64
);
int
len
=
sizeof
(
val
);
term
=
indexTermCreate
(
suid
,
DEL_VALUE
,
TSDB_DATA_TYPE_BOOL
,
key
,
nKey
,
(
const
char
*
)
&
val
,
len
);
}
if
(
term
!=
NULL
)
{
indexMultiTermAdd
(
terms
,
term
);
...
...
@@ -434,9 +493,33 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
if
(
type
)
*
type
=
e
.
type
;
if
(
e
.
type
==
TSDB_CHILD_TABLE
)
{
void
*
tData
=
NULL
;
int
tLen
=
0
;
if
(
tdbTbGet
(
pMeta
->
pUidIdx
,
&
e
.
ctbEntry
.
suid
,
sizeof
(
tb_uid_t
),
&
tData
,
&
tLen
)
==
0
)
{
version
=
*
(
int64_t
*
)
tData
;
STbDbKey
tbDbKey
=
{.
uid
=
e
.
ctbEntry
.
suid
,
.
version
=
version
};
if
(
tdbTbGet
(
pMeta
->
pTbDb
,
&
tbDbKey
,
sizeof
(
tbDbKey
),
&
tData
,
&
tLen
)
==
0
)
{
SDecoder
tdc
=
{
0
};
SMetaEntry
stbEntry
=
{
0
};
tDecoderInit
(
&
tdc
,
tData
,
tLen
);
metaDecodeEntry
(
&
tdc
,
&
stbEntry
);
const
SSchema
*
pTagColumn
=
&
stbEntry
.
stbEntry
.
schemaTag
.
pSchema
[
0
];
if
(
pTagColumn
->
type
==
TSDB_DATA_TYPE_JSON
)
{
metaDelJsonVarFromIdx
(
pMeta
,
&
e
,
pTagColumn
);
}
tDecoderClear
(
&
tdc
);
}
tdbFree
(
tData
);
}
}
tdbTbDelete
(
pMeta
->
pTbDb
,
&
(
STbDbKey
){.
version
=
version
,
.
uid
=
uid
},
sizeof
(
STbDbKey
),
&
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pNameIdx
,
e
.
name
,
strlen
(
e
.
name
)
+
1
,
&
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pUidIdx
,
&
uid
,
sizeof
(
uid
),
&
pMeta
->
txn
);
if
(
e
.
type
!=
TSDB_SUPER_TABLE
)
metaDeleteTtlIdx
(
pMeta
,
&
e
);
if
(
e
.
type
==
TSDB_CHILD_TABLE
)
{
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
5fd4452e
...
...
@@ -277,6 +277,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo
pItem
->
maxDelay
=
TSDB_MAX_ROLLUP_MAX_DELAY
;
}
pItem
->
level
=
(
idx
==
0
?
TSDB_RETENTION_L1
:
TSDB_RETENTION_L2
);
smaInfo
(
"vgId:%d table:%"
PRIi64
" level:%"
PRIi8
" maxdelay:%"
PRIi64
" watermark:%"
PRIi64
", finally maxdelay:%"
PRIi32
,
SMA_VID
(
pSma
),
pRSmaInfo
->
suid
,
idx
+
1
,
param
->
maxdelay
[
idx
],
param
->
watermark
[
idx
],
pItem
->
maxDelay
);
}
return
TSDB_CODE_SUCCESS
;
_err:
...
...
@@ -325,14 +327,14 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return
TSDB_CODE_FAILED
;
}
S
StreamReader
*
pReadHandle
=
tqInitSubmitMsgScanner
(
pMeta
);
if
(
!
pRead
Handle
)
{
S
TqReader
*
pReader
=
tqOpenReader
(
pVnode
);
if
(
!
pRead
er
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
SReadHandle
handle
=
{
.
streamReader
=
pReadHandle
,
.
tqReader
=
pReader
,
.
meta
=
pMeta
,
.
pMsgCb
=
pMsgCb
,
.
vnode
=
pVnode
,
...
...
@@ -364,7 +366,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return
TSDB_CODE_SUCCESS
;
_err:
tdFreeRSmaInfo
(
pRSmaInfo
);
taosMemoryFree
(
pRead
Handle
);
taosMemoryFree
(
pRead
er
);
return
TSDB_CODE_FAILED
;
}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
5fd4452e
...
...
@@ -447,26 +447,38 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle
->
fetchMeta
=
req
.
withMeta
;
pHandle
->
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
pHandle
->
execHandle
.
pExecReader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
}
/*for (int32_t i = 0; i < 5; i++) {*/
/*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
pHandle
->
execHandle
.
execCol
.
qmsg
=
req
.
qmsg
;
req
.
qmsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
SReadHandle
handle
=
{
.
stream
Reader
=
pHandle
->
execHandle
.
pExecReader
[
i
],
.
tq
Reader
=
pHandle
->
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
vnode
=
pTq
->
pVnode
,
.
initTableReader
=
true
,
.
initTqReader
=
true
,
};
pHandle
->
execHandle
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
);
ASSERT
(
pHandle
->
execHandle
.
execCol
.
task
[
i
]);
void
*
scanner
=
NULL
;
qExtractStreamScanner
(
pHandle
->
execHandle
.
execCol
.
task
[
i
],
&
scanner
);
ASSERT
(
scanner
);
pHandle
->
execHandle
.
pExecReader
[
i
]
=
qExtractReaderFromStreamScanner
(
scanner
);
ASSERT
(
pHandle
->
execHandle
.
pExecReader
[
i
]);
}
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__DB
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
pHandle
->
execHandle
.
pExecReader
[
i
]
=
tqOpenReader
(
pTq
->
pVnode
);
}
pHandle
->
execHandle
.
execDb
.
pFilterOutTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
pHandle
->
execHandle
.
pExecReader
[
i
]
=
tqOpenReader
(
pTq
->
pVnode
);
}
pHandle
->
execHandle
.
execTb
.
suid
=
req
.
suid
;
SArray
*
tbUidList
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
vnodeGetCtbIdList
(
pTq
->
pVnode
,
req
.
suid
,
tbUidList
);
...
...
@@ -476,7 +488,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
tqDebug
(
"vg %d, idx %d, uid: %ld"
,
TD_VID
(
pTq
->
pVnode
),
i
,
tbUid
);
}
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
tqRead
Handle
SetTbUidList
(
pHandle
->
execHandle
.
pExecReader
[
i
],
tbUidList
);
tqRead
er
SetTbUidList
(
pHandle
->
execHandle
.
pExecReader
[
i
],
tbUidList
);
}
taosArrayDestroy
(
tbUidList
);
}
...
...
@@ -532,7 +544,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
SReadHandle
handle
=
{
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
vnode
=
pTq
->
pVnode
,
.
init
Stream
Reader
=
1
,
.
init
Tq
Reader
=
1
,
};
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
}
else
{
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
5fd4452e
...
...
@@ -135,8 +135,8 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
}
}
else
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
pRsp
->
withSchema
=
1
;
S
Stream
Reader
*
pReader
=
pExec
->
pExecReader
[
workerId
];
tqRead
HandleSet
Msg
(
pReader
,
pReq
,
0
);
S
Tq
Reader
*
pReader
=
pExec
->
pExecReader
[
workerId
];
tqRead
erSetData
Msg
(
pReader
,
pReq
,
0
);
while
(
tqNextDataBlock
(
pReader
))
{
SSDataBlock
block
=
{
0
};
if
(
tqRetrieveDataBlock
(
&
block
,
pReader
)
<
0
)
{
...
...
@@ -153,8 +153,8 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
}
}
else
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__DB
)
{
pRsp
->
withSchema
=
1
;
S
Stream
Reader
*
pReader
=
pExec
->
pExecReader
[
workerId
];
tqRead
HandleSet
Msg
(
pReader
,
pReq
,
0
);
S
Tq
Reader
*
pReader
=
pExec
->
pExecReader
[
workerId
];
tqRead
erSetData
Msg
(
pReader
,
pReq
,
0
);
while
(
tqNextDataBlockFilterOut
(
pReader
,
pExec
->
execDb
.
pFilterOutTbUid
))
{
SSDataBlock
block
=
{
0
};
if
(
tqRetrieveDataBlock
(
&
block
,
pReader
)
<
0
)
{
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
5fd4452e
...
...
@@ -79,12 +79,12 @@ int32_t tqMetaOpen(STQ* pTq) {
tDecodeSTqHandle
(
&
decoder
,
&
handle
);
handle
.
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
handle
.
execHandle
.
pExecReader
[
i
]
=
tq
InitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
handle
.
execHandle
.
pExecReader
[
i
]
=
tq
OpenReader
(
pTq
->
pVnode
);
}
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
SReadHandle
reader
=
{
.
stream
Reader
=
handle
.
execHandle
.
pExecReader
[
i
],
.
tq
Reader
=
handle
.
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
.
vnode
=
pTq
->
pVnode
,
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
5fd4452e
...
...
@@ -15,6 +15,11 @@
#include "tq.h"
int64_t
tqScanLog
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
offset
)
{
/*if ()*/
return
0
;
}
int64_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
ppCkHead
)
{
int32_t
code
=
0
;
taosThreadMutexLock
(
&
pHandle
->
pWalReader
->
mutex
);
...
...
@@ -73,53 +78,107 @@ END:
return
code
;
}
S
StreamReader
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
)
{
S
StreamReader
*
pReadHandle
=
taosMemoryMalloc
(
sizeof
(
SStream
Reader
));
if
(
pRead
Handle
==
NULL
)
{
S
TqReader
*
tqOpenReader
(
SVnode
*
pVnode
)
{
S
TqReader
*
pReader
=
taosMemoryMalloc
(
sizeof
(
STq
Reader
));
if
(
pRead
er
==
NULL
)
{
return
NULL
;
}
pReadHandle
->
pVnodeMeta
=
pMeta
;
pReadHandle
->
pMsg
=
NULL
;
pReadHandle
->
ver
=
-
1
;
pReadHandle
->
pColIdList
=
NULL
;
pReadHandle
->
cachedSchemaVer
=
0
;
pReadHandle
->
cachedSchemaSuid
=
0
;
pReadHandle
->
pSchema
=
NULL
;
pReadHandle
->
pSchemaWrapper
=
NULL
;
pReadHandle
->
tbIdHash
=
NULL
;
return
pReadHandle
;
// TODO open
/*pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);*/
pReader
->
pVnodeMeta
=
pVnode
->
pMeta
;
pReader
->
pMsg
=
NULL
;
pReader
->
ver
=
-
1
;
pReader
->
pColIdList
=
NULL
;
pReader
->
cachedSchemaVer
=
0
;
pReader
->
cachedSchemaSuid
=
0
;
pReader
->
pSchema
=
NULL
;
pReader
->
pSchemaWrapper
=
NULL
;
pReader
->
tbIdHash
=
NULL
;
return
pReader
;
}
void
tqCloseReader
(
STqReader
*
pReader
)
{
// close wal reader
// free cached schema
// free hash
taosMemoryFree
(
pReader
);
}
int32_t
tqNextBlock
(
STqReader
*
pReader
,
SFetchRet
*
ret
)
{
bool
fromProcessedMsg
=
pReader
->
pMsg
!=
NULL
;
while
(
1
)
{
if
(
!
fromProcessedMsg
)
{
if
(
walNextValidMsg
(
pReader
->
pWalReader
)
<
0
)
{
ret
->
fetchType
=
FETCH_TYPE__NONE
;
return
-
1
;
}
void
*
body
=
pReader
->
pWalReader
->
pHead
->
head
.
body
;
if
(
pReader
->
pWalReader
->
pHead
->
head
.
msgType
!=
TDMT_VND_SUBMIT
)
{
// TODO do filter
ret
->
fetchType
=
FETCH_TYPE__META
;
ret
->
meta
=
pReader
->
pWalReader
->
pHead
->
head
.
body
;
return
0
;
}
else
{
tqReaderSetDataMsg
(
pReader
,
body
,
pReader
->
pWalReader
->
pHead
->
head
.
version
);
}
}
while
(
tqNextDataBlock
(
pReader
))
{
memset
(
&
ret
->
data
,
0
,
sizeof
(
SSDataBlock
));
int32_t
code
=
tqRetrieveDataBlock
(
&
ret
->
data
,
pReader
);
if
(
code
!=
0
||
ret
->
data
.
info
.
rows
==
0
)
{
if
(
fromProcessedMsg
)
{
ret
->
fetchType
=
FETCH_TYPE__NONE
;
return
0
;
}
else
{
break
;
}
}
ret
->
fetchType
=
FETCH_TYPE__DATA
;
return
0
;
}
if
(
fromProcessedMsg
)
{
ret
->
fetchType
=
FETCH_TYPE__NONE
;
return
0
;
}
}
}
int32_t
tqRead
HandleSetMsg
(
SStreamReader
*
pReadHandle
,
SSubmitReq
*
pMsg
,
int64_t
ver
)
{
pRead
Handle
->
pMsg
=
pMsg
;
int32_t
tqRead
erSetDataMsg
(
STqReader
*
pReader
,
SSubmitReq
*
pMsg
,
int64_t
ver
)
{
pRead
er
->
pMsg
=
pMsg
;
if
(
tInitSubmitMsgIter
(
pMsg
,
&
pRead
Handle
->
msgIter
)
<
0
)
return
-
1
;
if
(
tInitSubmitMsgIter
(
pMsg
,
&
pRead
er
->
msgIter
)
<
0
)
return
-
1
;
while
(
true
)
{
if
(
tGetSubmitMsgNext
(
&
pRead
Handle
->
msgIter
,
&
pReadHandle
->
pBlock
)
<
0
)
return
-
1
;
if
(
pRead
Handle
->
pBlock
==
NULL
)
break
;
if
(
tGetSubmitMsgNext
(
&
pRead
er
->
msgIter
,
&
pReader
->
pBlock
)
<
0
)
return
-
1
;
if
(
pRead
er
->
pBlock
==
NULL
)
break
;
}
if
(
tInitSubmitMsgIter
(
pMsg
,
&
pRead
Handle
->
msgIter
)
<
0
)
return
-
1
;
pRead
Handle
->
ver
=
ver
;
memset
(
&
pRead
Handle
->
blkIter
,
0
,
sizeof
(
SSubmitBlkIter
));
if
(
tInitSubmitMsgIter
(
pMsg
,
&
pRead
er
->
msgIter
)
<
0
)
return
-
1
;
pRead
er
->
ver
=
ver
;
memset
(
&
pRead
er
->
blkIter
,
0
,
sizeof
(
SSubmitBlkIter
));
return
0
;
}
bool
tqNextDataBlock
(
S
StreamReader
*
pHandle
)
{
if
(
p
Handle
->
pMsg
==
NULL
)
return
false
;
bool
tqNextDataBlock
(
S
TqReader
*
pReader
)
{
if
(
p
Reader
->
pMsg
==
NULL
)
return
false
;
while
(
1
)
{
if
(
tGetSubmitMsgNext
(
&
p
Handle
->
msgIter
,
&
pHandle
->
pBlock
)
<
0
)
{
if
(
tGetSubmitMsgNext
(
&
p
Reader
->
msgIter
,
&
pReader
->
pBlock
)
<
0
)
{
return
false
;
}
if
(
p
Handle
->
pBlock
==
NULL
)
{
p
Handle
->
pMsg
=
NULL
;
if
(
p
Reader
->
pBlock
==
NULL
)
{
p
Reader
->
pMsg
=
NULL
;
return
false
;
}
if
(
p
Handle
->
tbIdHash
==
NULL
)
{
if
(
p
Reader
->
tbIdHash
==
NULL
)
{
return
true
;
}
void
*
ret
=
taosHashGet
(
p
Handle
->
tbIdHash
,
&
pHandle
->
msgIter
.
uid
,
sizeof
(
int64_t
));
void
*
ret
=
taosHashGet
(
p
Reader
->
tbIdHash
,
&
pReader
->
msgIter
.
uid
,
sizeof
(
int64_t
));
/*tqDebug("search uid %ld", pHandle->msgIter.uid);*/
if
(
ret
!=
NULL
)
{
/*tqDebug("find uid %ld", pHandle->msgIter.uid);*/
...
...
@@ -129,7 +188,7 @@ bool tqNextDataBlock(SStreamReader* pHandle) {
return
false
;
}
bool
tqNextDataBlockFilterOut
(
S
Stream
Reader
*
pHandle
,
SHashObj
*
filterOutUids
)
{
bool
tqNextDataBlockFilterOut
(
S
Tq
Reader
*
pHandle
,
SHashObj
*
filterOutUids
)
{
while
(
1
)
{
if
(
tGetSubmitMsgNext
(
&
pHandle
->
msgIter
,
&
pHandle
->
pBlock
)
<
0
)
{
return
false
;
...
...
@@ -145,38 +204,38 @@ bool tqNextDataBlockFilterOut(SStreamReader* pHandle, SHashObj* filterOutUids) {
return
false
;
}
int32_t
tqRetrieveDataBlock
(
SSDataBlock
*
pBlock
,
S
StreamReader
*
pHandle
)
{
int32_t
tqRetrieveDataBlock
(
SSDataBlock
*
pBlock
,
S
TqReader
*
pReader
)
{
// TODO: cache multiple schema
int32_t
sversion
=
htonl
(
p
Handle
->
pBlock
->
sversion
);
if
(
p
Handle
->
cachedSchemaSuid
==
0
||
pHandle
->
cachedSchemaVer
!=
sversion
||
p
Handle
->
cachedSchemaSuid
!=
pHandle
->
msgIter
.
suid
)
{
if
(
p
Handle
->
pSchema
)
taosMemoryFree
(
pHandle
->
pSchema
);
p
Handle
->
pSchema
=
metaGetTbTSchema
(
pHandle
->
pVnodeMeta
,
pHandle
->
msgIter
.
uid
,
sversion
);
if
(
p
Handle
->
pSchema
==
NULL
)
{
int32_t
sversion
=
htonl
(
p
Reader
->
pBlock
->
sversion
);
if
(
p
Reader
->
cachedSchemaSuid
==
0
||
pReader
->
cachedSchemaVer
!=
sversion
||
p
Reader
->
cachedSchemaSuid
!=
pReader
->
msgIter
.
suid
)
{
if
(
p
Reader
->
pSchema
)
taosMemoryFree
(
pReader
->
pSchema
);
p
Reader
->
pSchema
=
metaGetTbTSchema
(
pReader
->
pVnodeMeta
,
pReader
->
msgIter
.
uid
,
sversion
);
if
(
p
Reader
->
pSchema
==
NULL
)
{
tqWarn
(
"cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table"
,
p
Handle
->
msgIter
.
uid
,
pHandle
->
msgIter
.
suid
,
pHandle
->
cachedSchemaVer
);
p
Reader
->
msgIter
.
uid
,
pReader
->
msgIter
.
suid
,
pReader
->
cachedSchemaVer
);
/*ASSERT(0);*/
terrno
=
TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND
;
return
-
1
;
}
if
(
p
Handle
->
pSchemaWrapper
)
tDeleteSSchemaWrapper
(
pHandle
->
pSchemaWrapper
);
p
Handle
->
pSchemaWrapper
=
metaGetTableSchema
(
pHandle
->
pVnodeMeta
,
pHandle
->
msgIter
.
uid
,
sversion
,
true
);
if
(
p
Handle
->
pSchemaWrapper
==
NULL
)
{
if
(
p
Reader
->
pSchemaWrapper
)
tDeleteSSchemaWrapper
(
pReader
->
pSchemaWrapper
);
p
Reader
->
pSchemaWrapper
=
metaGetTableSchema
(
pReader
->
pVnodeMeta
,
pReader
->
msgIter
.
uid
,
sversion
,
true
);
if
(
p
Reader
->
pSchemaWrapper
==
NULL
)
{
tqWarn
(
"cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table"
,
p
Handle
->
msgIter
.
uid
,
pHandle
->
cachedSchemaVer
);
p
Reader
->
msgIter
.
uid
,
pReader
->
cachedSchemaVer
);
/*ASSERT(0);*/
terrno
=
TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND
;
return
-
1
;
}
p
Handle
->
cachedSchemaVer
=
sversion
;
p
Handle
->
cachedSchemaSuid
=
pHandle
->
msgIter
.
suid
;
p
Reader
->
cachedSchemaVer
=
sversion
;
p
Reader
->
cachedSchemaSuid
=
pReader
->
msgIter
.
suid
;
}
STSchema
*
pTschema
=
p
Handle
->
pSchema
;
SSchemaWrapper
*
pSchemaWrapper
=
p
Handle
->
pSchemaWrapper
;
STSchema
*
pTschema
=
p
Reader
->
pSchema
;
SSchemaWrapper
*
pSchemaWrapper
=
p
Reader
->
pSchemaWrapper
;
int32_t
colNumNeed
=
taosArrayGetSize
(
p
Handle
->
pColIdList
);
int32_t
colNumNeed
=
taosArrayGetSize
(
p
Reader
->
pColIdList
);
if
(
colNumNeed
==
0
)
{
int32_t
colMeta
=
0
;
...
...
@@ -199,7 +258,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
while
(
colMeta
<
pSchemaWrapper
->
nCols
&&
colNeed
<
colNumNeed
)
{
SSchema
*
pColSchema
=
&
pSchemaWrapper
->
pSchema
[
colMeta
];
col_id_t
colIdSchema
=
pColSchema
->
colId
;
col_id_t
colIdNeed
=
*
(
col_id_t
*
)
taosArrayGet
(
p
Handle
->
pColIdList
,
colNeed
);
col_id_t
colIdNeed
=
*
(
col_id_t
*
)
taosArrayGet
(
p
Reader
->
pColIdList
,
colNeed
);
if
(
colIdSchema
<
colIdNeed
)
{
colMeta
++
;
}
else
if
(
colIdSchema
>
colIdNeed
)
{
...
...
@@ -216,7 +275,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
}
}
if
(
blockDataEnsureCapacity
(
pBlock
,
p
Handle
->
msgIter
.
numOfRows
)
<
0
)
{
if
(
blockDataEnsureCapacity
(
pBlock
,
p
Reader
->
msgIter
.
numOfRows
)
<
0
)
{
goto
FAIL
;
}
...
...
@@ -227,13 +286,12 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
STSRow
*
row
;
int32_t
curRow
=
0
;
tInitSubmitBlkIter
(
&
p
Handle
->
msgIter
,
pHandle
->
pBlock
,
&
pHandle
->
blkIter
);
tInitSubmitBlkIter
(
&
p
Reader
->
msgIter
,
pReader
->
pBlock
,
&
pReader
->
blkIter
);
pBlock
->
info
.
groupId
=
0
;
pBlock
->
info
.
uid
=
pHandle
->
msgIter
.
uid
;
pBlock
->
info
.
rows
=
pHandle
->
msgIter
.
numOfRows
;
pBlock
->
info
.
uid
=
pReader
->
msgIter
.
uid
;
pBlock
->
info
.
rows
=
pReader
->
msgIter
.
numOfRows
;
while
((
row
=
tGetSubmitBlkNext
(
&
p
Handle
->
blkIter
))
!=
NULL
)
{
while
((
row
=
tGetSubmitBlkNext
(
&
p
Reader
->
blkIter
))
!=
NULL
)
{
tdSTSRowIterReset
(
&
iter
,
row
);
// get all wanted col of that block
for
(
int32_t
i
=
0
;
i
<
colActual
;
i
++
)
{
...
...
@@ -255,9 +313,9 @@ FAIL:
return
-
1
;
}
void
tqRead
HandleSetColIdList
(
SStream
Reader
*
pReadHandle
,
SArray
*
pColIdList
)
{
pReadHandle
->
pColIdList
=
pColIdList
;
}
void
tqRead
erSetColIdList
(
STq
Reader
*
pReadHandle
,
SArray
*
pColIdList
)
{
pReadHandle
->
pColIdList
=
pColIdList
;
}
int
tqRead
HandleSetTbUidList
(
SStream
Reader
*
pHandle
,
const
SArray
*
tbUidList
)
{
int
tqRead
erSetTbUidList
(
STq
Reader
*
pHandle
,
const
SArray
*
tbUidList
)
{
if
(
pHandle
->
tbIdHash
)
{
taosHashClear
(
pHandle
->
tbIdHash
);
}
...
...
@@ -276,7 +334,7 @@ int tqReadHandleSetTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
return
0
;
}
int
tqRead
HandleAddTbUidList
(
SStream
Reader
*
pHandle
,
const
SArray
*
tbUidList
)
{
int
tqRead
erAddTbUidList
(
STq
Reader
*
pHandle
,
const
SArray
*
tbUidList
)
{
if
(
pHandle
->
tbIdHash
==
NULL
)
{
pHandle
->
tbIdHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_NO_LOCK
);
if
(
pHandle
->
tbIdHash
==
NULL
)
{
...
...
@@ -293,7 +351,7 @@ int tqReadHandleAddTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
return
0
;
}
int
tqRead
HandleRemoveTbUidList
(
SStream
Reader
*
pHandle
,
const
SArray
*
tbUidList
)
{
int
tqRead
erRemoveTbUidList
(
STq
Reader
*
pHandle
,
const
SArray
*
tbUidList
)
{
ASSERT
(
pHandle
->
tbIdHash
!=
NULL
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
5fd4452e
...
...
@@ -365,7 +365,7 @@ typedef struct SStreamScanInfo {
int32_t
blockType
;
// current block type
int32_t
validBlockIndex
;
// Is current data has returned?
uint64_t
numOfExec
;
// execution times
void
*
streamReader
;
// stream block reader handle
STqReader
*
tqReader
;
int32_t
tsArrayIndex
;
SArray
*
tsArray
;
...
...
@@ -383,6 +383,11 @@ typedef struct SStreamScanInfo {
SSDataBlock
*
pPullDataRes
;
// pull data SSDataBlock
SSDataBlock
*
pDeleteDataRes
;
// delete data SSDataBlock
int32_t
deleteDataIndex
;
// status for tmq
//SSchemaWrapper schema;
STqOffset
offset
;
}
SStreamScanInfo
;
typedef
struct
SSysTableScanInfo
{
...
...
source/libs/executor/src/executor.c
浏览文件 @
5fd4452e
...
...
@@ -45,7 +45,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pInfo
->
blockType
=
type
;
if
(
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
if
(
tqRead
HandleSetMsg
(
pInfo
->
stream
Reader
,
input
,
0
)
<
0
)
{
if
(
tqRead
erSetDataMsg
(
pInfo
->
tq
Reader
,
input
,
0
)
<
0
)
{
qError
(
"submit msg messed up when initing stream block, %s"
PRIx64
,
id
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
...
...
@@ -105,7 +105,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return
code
;
}
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
msg
,
void
*
streamReadHandle
)
{
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
)
{
if
(
msg
==
NULL
)
{
return
NULL
;
}
...
...
@@ -120,7 +120,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
}
qTaskInfo_t
pTaskInfo
=
NULL
;
code
=
qCreateExecTask
(
streamReadHandle
,
0
,
0
,
plan
,
&
pTaskInfo
,
NULL
,
NULL
,
OPTR_EXEC_MODEL_STREAM
);
code
=
qCreateExecTask
(
readers
,
0
,
0
,
plan
,
&
pTaskInfo
,
NULL
,
NULL
,
OPTR_EXEC_MODEL_STREAM
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// TODO: destroy SSubplan & pTaskInfo
terrno
=
code
;
...
...
@@ -174,11 +174,11 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
);
qDebug
(
" %d qualified child tables added into stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
qa
));
code
=
tqRead
HandleAddTbUidList
(
pScanInfo
->
stream
Reader
,
qa
);
code
=
tqRead
erAddTbUidList
(
pScanInfo
->
tq
Reader
,
qa
);
taosArrayDestroy
(
qa
);
}
else
{
// remove the table id in current list
qDebug
(
" %d remove child tables from the stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
tableIdList
));
code
=
tqRead
HandleRemoveTbUidList
(
pScanInfo
->
stream
Reader
,
tableIdList
);
code
=
tqRead
erRemoveTbUidList
(
pScanInfo
->
tq
Reader
,
tableIdList
);
}
return
code
;
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
5fd4452e
...
...
@@ -236,6 +236,37 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
return
decodeOperator
(
pTaskInfo
->
pRoot
,
pInput
,
len
);
}
int32_t
qExtractStreamScanner
(
qTaskInfo_t
tinfo
,
void
**
scanner
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
while
(
1
)
{
uint8_t
type
=
pOperator
->
operatorType
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
*
scanner
=
pOperator
->
info
;
return
0
;
}
else
{
ASSERT
(
pOperator
->
numOfDownstream
==
1
);
pOperator
=
pOperator
->
pDownstream
[
0
];
}
}
}
void
*
qExtractReaderFromStreamScanner
(
void
*
scanner
)
{
SStreamScanInfo
*
pInfo
=
scanner
;
return
(
void
*
)
pInfo
->
tqReader
;
}
const
SSchemaWrapper
*
qExtractSchemaFromStreamScanner
(
void
*
scanner
)
{
SStreamScanInfo
*
pInfo
=
scanner
;
return
pInfo
->
tqReader
->
pSchemaWrapper
;
}
const
STqOffset
*
qExtractStatusFromStreamScanner
(
void
*
scanner
)
{
SStreamScanInfo
*
pInfo
=
scanner
;
return
&
pInfo
->
offset
;
}
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
uint64_t
uid
,
int64_t
ts
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
5fd4452e
...
...
@@ -2844,7 +2844,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
}
int32_t
doPrepareScan
(
SOperatorInfo
*
pOperator
,
uint64_t
uid
,
int64_t
ts
)
{
int32
_t
type
=
pOperator
->
operatorType
;
uint8
_t
type
=
pOperator
->
operatorType
;
pOperator
->
status
=
OP_OPENED
;
...
...
@@ -4346,11 +4346,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN
==
type
)
{
SLastRowScanPhysiNode
*
pScanNode
=
(
SLastRowScanPhysiNode
*
)
pPhyNode
;
// int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
// if (code) {
// pTaskInfo->code = code;
// return NULL;
// }
// int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
// if (code) {
// pTaskInfo->code = code;
// return NULL;
// }
int32_t
code
=
extractTableSchemaVersion
(
pHandle
,
pScanNode
->
uid
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4407,8 +4407,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr
=
createGroupOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
pColList
,
pAggNode
->
node
.
pConditions
,
pScalarExprInfo
,
numOfScalarExpr
,
pTaskInfo
);
}
else
{
pOptr
=
createAggregateOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
pAggNode
->
node
.
pConditions
,
pScalarExprInfo
,
numOfScalarExpr
,
pTaskInfo
);
pOptr
=
createAggregateOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
pAggNode
->
node
.
pConditions
,
pScalarExprInfo
,
numOfScalarExpr
,
pTaskInfo
);
}
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
==
type
||
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
==
type
)
{
SIntervalPhysiNode
*
pIntervalPhyNode
=
(
SIntervalPhysiNode
*
)
pPhyNode
;
...
...
@@ -4575,7 +4575,8 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
return
pList
;
}
STsdbReader
*
doCreateDataReader
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
const
char
*
idstr
)
{
STsdbReader
*
doCreateDataReader
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
const
char
*
idstr
)
{
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
&
pTableScanNode
->
scan
,
pTableListInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
@@ -4819,7 +4820,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(
*
pTaskInfo
)
->
pRoot
=
createOperatorTree
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
&
(
*
pTaskInfo
)
->
tableqinfoList
,
pPlan
->
user
);
if
(
NULL
==
(
*
pTaskInfo
)
->
pRoot
)
{
code
=
(
*
pTaskInfo
)
->
code
;
goto
_complete
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
5fd4452e
...
...
@@ -1232,38 +1232,33 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
blockDataCleanup
(
pInfo
->
pRes
);
while
(
tqNextDataBlock
(
pInfo
->
stream
Reader
))
{
while
(
tqNextDataBlock
(
pInfo
->
tq
Reader
))
{
SSDataBlock
block
=
{
0
};
// todo refactor
int32_t
code
=
tqRetrieveDataBlock
(
&
block
,
pInfo
->
stream
Reader
);
int32_t
code
=
tqRetrieveDataBlock
(
&
block
,
pInfo
->
tq
Reader
);
uint64_t
groupId
=
block
.
info
.
groupId
;
uint64_t
uid
=
block
.
info
.
uid
;
int32_t
numOfRows
=
block
.
info
.
rows
;
if
(
code
!=
TSDB_CODE_SUCCESS
||
numOfRows
==
0
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
||
block
.
info
.
rows
==
0
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
}
pInfo
->
pRes
->
info
.
groupId
=
groupId
;
pInfo
->
pRes
->
info
.
rows
=
numOfRows
;
pInfo
->
pRes
->
info
.
uid
=
uid
;
pInfo
->
pRes
->
info
.
rows
=
block
.
info
.
rows
;
pInfo
->
pRes
->
info
.
uid
=
block
.
info
.
uid
;
pInfo
->
pRes
->
info
.
type
=
STREAM_NORMAL
;
pInfo
->
pRes
->
info
.
capacity
=
numOfR
ows
;
pInfo
->
pRes
->
info
.
capacity
=
block
.
info
.
r
ows
;
// for generating rollup SMA result, each time is an independent time serie.
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
if
(
pInfo
->
assignBlockUid
)
{
pInfo
->
pRes
->
info
.
groupId
=
uid
;
}
else
{
pInfo
->
pRes
->
info
.
groupId
=
groupId
;
pInfo
->
pRes
->
info
.
groupId
=
block
.
info
.
uid
;
}
uint64_t
*
groupIdPre
=
taosHashGet
(
pOperator
->
pTaskInfo
->
tableqinfoList
.
map
,
&
uid
,
sizeof
(
int64_t
));
uint64_t
*
groupIdPre
=
taosHashGet
(
pOperator
->
pTaskInfo
->
tableqinfoList
.
map
,
&
block
.
info
.
uid
,
sizeof
(
int64_t
));
if
(
groupIdPre
)
{
pInfo
->
pRes
->
info
.
groupId
=
*
groupIdPre
;
}
else
{
pInfo
->
pRes
->
info
.
groupId
=
0
;
}
// todo extract method
...
...
@@ -1413,13 +1408,13 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
}
}
if
(
pHandle
->
init
Stream
Reader
)
{
ASSERT
(
pHandle
->
stream
Reader
==
NULL
);
pInfo
->
streamReader
=
tqInitSubmitMsgScanner
(
pHandle
->
meta
);
ASSERT
(
pInfo
->
stream
Reader
);
if
(
pHandle
->
init
Tq
Reader
)
{
ASSERT
(
pHandle
->
tq
Reader
==
NULL
);
pInfo
->
tqReader
=
tqOpenReader
(
pHandle
->
vnode
);
ASSERT
(
pInfo
->
tq
Reader
);
}
else
{
ASSERT
(
pHandle
->
stream
Reader
);
pInfo
->
streamReader
=
pHandle
->
stream
Reader
;
ASSERT
(
pHandle
->
tq
Reader
);
pInfo
->
tqReader
=
pHandle
->
tq
Reader
;
}
if
(
pSTInfo
->
interval
.
interval
>
0
)
{
...
...
@@ -1435,9 +1430,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo
->
tableUid
=
pScanPhyNode
->
uid
;
// set the extract column id to streamHandle
tqRead
HandleSetColIdList
(
pInfo
->
stream
Reader
,
pColIds
);
tqRead
erSetColIdList
(
pInfo
->
tq
Reader
,
pColIds
);
SArray
*
tableIdList
=
extractTableIdList
(
&
pTaskInfo
->
tableqinfoList
);
int32_t
code
=
tqRead
HandleSetTbUidList
(
pInfo
->
stream
Reader
,
tableIdList
);
int32_t
code
=
tqRead
erSetTbUidList
(
pInfo
->
tq
Reader
,
tableIdList
);
if
(
code
!=
0
)
{
taosArrayDestroy
(
tableIdList
);
goto
_error
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
5fd4452e
...
...
@@ -3189,7 +3189,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start
*
pIndex
=
index
+
1
;
return
pWin
;
}
else
if
(
endTs
!=
INT64_MIN
&&
isInWindow
(
pWin
,
endTs
,
gap
))
{
*
pIndex
=
index
;
*
pIndex
=
index
+
1
;
return
pWin
;
}
}
...
...
@@ -3442,7 +3442,7 @@ void deleteWindow(SArray* pWinInfos, int32_t index) {
taosArrayRemove
(
pWinInfos
,
index
);
}
static
void
doDelete
Session
Windows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
int64_t
gap
,
SArray
*
result
)
{
static
void
doDelete
Time
Windows
(
SStreamAggSupporter
*
pAggSup
,
SSDataBlock
*
pBlock
,
int64_t
gap
,
SArray
*
result
)
{
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
TSKEY
*
startDatas
=
(
TSKEY
*
)
pStartTsCol
->
pData
;
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
...
...
@@ -3700,13 +3700,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
// gap must be 0
doDelete
Session
Windows
(
&
pInfo
->
streamAggSup
,
pBlock
,
0
,
pWins
);
doDelete
Time
Windows
(
&
pInfo
->
streamAggSup
,
pBlock
,
0
,
pWins
);
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
childIndex
=
getChildIndex
(
pBlock
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
// gap must be 0
doDelete
Session
Windows
(
&
pChildInfo
->
streamAggSup
,
pBlock
,
0
,
NULL
);
doDelete
Time
Windows
(
&
pChildInfo
->
streamAggSup
,
pBlock
,
0
,
NULL
);
rebuildTimeWindow
(
pInfo
,
pWins
,
pBlock
->
info
.
groupId
,
pOperator
->
exprSupp
.
numOfExprs
,
pOperator
);
}
copyDeleteWindowInfo
(
pWins
,
pInfo
->
pStDeleted
);
...
...
@@ -3840,7 +3840,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
break
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
||
pBlock
->
info
.
type
==
STREAM_DELETE_RESULT
)
{
// gap must be 0
doDelete
Session
Windows
(
&
pInfo
->
streamAggSup
,
pBlock
,
0
,
NULL
);
doDelete
Time
Windows
(
&
pInfo
->
streamAggSup
,
pBlock
,
0
,
NULL
);
copyDataBlock
(
pInfo
->
pDelRes
,
pBlock
);
pInfo
->
pDelRes
->
info
.
type
=
STREAM_DELETE_RESULT
;
break
;
...
...
@@ -4232,6 +4232,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
doClearStateWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
pInfo
->
primaryTsIndex
,
&
pInfo
->
stateCol
,
pInfo
->
stateCol
.
slotId
,
pSeUpdated
,
pInfo
->
pSeDeleted
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_DELETE_DATA
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
doDeleteTimeWindows
(
&
pInfo
->
streamAggSup
,
pBlock
,
0
,
pWins
);
copyDeleteWindowInfo
(
pWins
,
pInfo
->
pSeDeleted
);
taosArrayDestroy
(
pWins
);
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
)
{
getAllSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
pUpdated
,
getResWinForState
);
continue
;
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
5fd4452e
...
...
@@ -67,7 +67,6 @@ typedef struct SSyncNode {
char
path
[
TSDB_FILENAME_LEN
];
char
raftStorePath
[
TSDB_FILENAME_LEN
*
2
];
char
configPath
[
TSDB_FILENAME_LEN
*
2
];
int32_t
batchSize
;
// sync io
SWal
*
pWal
;
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
5fd4452e
...
...
@@ -842,8 +842,8 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-append-entries-batch, fake match2,
pre-index:%ld, pre-term:%lu, datalen:%d
"
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
);
"recv sync-append-entries-batch, fake match2,
{pre-index:%ld, pre-term:%lu, datalen:%d, datacount:%d}
"
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
,
pMsg
->
dataCount
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
...
...
@@ -876,7 +876,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
code
=
syncNodePreCommit
(
ths
,
pAppendEntry
);
ASSERT
(
code
==
0
);
syncEntryDestory
(
pAppendEntry
);
//
syncEntryDestory(pAppendEntry);
}
// fsync once
...
...
@@ -931,8 +931,8 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-append-entries-batch, not match,
pre-index:%ld, pre-term:%lu, datalen:%d
"
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
);
"recv sync-append-entries-batch, not match,
{pre-index:%ld, pre-term:%lu, datalen:%d, datacount:%d}
"
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
,
pMsg
->
dataCount
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
...
...
@@ -976,8 +976,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-append-entries, match, pre-index:%ld, pre-term:%lu, datalen:%d"
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-append-entries, match, {pre-index:%ld, pre-term:%lu, datalen:%d, datacount:%d}"
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
,
pMsg
->
dataCount
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
...
...
@@ -999,7 +1000,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
code
=
syncNodePreCommit
(
ths
,
pAppendEntry
);
ASSERT
(
code
==
0
);
syncEntryDestory
(
pAppendEntry
);
//
syncEntryDestory(pAppendEntry);
}
// fsync once
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
5fd4452e
...
...
@@ -174,8 +174,12 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
SyncIndex
newNextIndex
=
pMsg
->
matchIndex
+
1
;
SyncIndex
newMatchIndex
=
pMsg
->
matchIndex
;
if
(
ths
->
pLogStore
->
syncLogExist
(
ths
->
pLogStore
,
newNextIndex
)
&&
ths
->
pLogStore
->
syncLogExist
(
ths
->
pLogStore
,
newNextIndex
-
1
))
{
bool
needStartSnapshot
=
false
;
if
(
newMatchIndex
>=
SYNC_INDEX_BEGIN
&&
!
ths
->
pLogStore
->
syncLogExist
(
ths
->
pLogStore
,
newMatchIndex
))
{
needStartSnapshot
=
true
;
}
if
(
!
needStartSnapshot
)
{
// update next-index, match-index
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
newNextIndex
);
syncIndexMgrSetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
),
newMatchIndex
);
...
...
@@ -197,15 +201,36 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
syncIndexMgrSetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
),
newMatchIndex
);
}
// event log, update next-index
do
{
char
host
[
64
];
int16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"reset next-index:%ld, match-index:%ld for %s:%d"
,
newNextIndex
,
newMatchIndex
,
host
,
port
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
}
else
{
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
if
(
nextIndex
>
SYNC_INDEX_BEGIN
)
{
--
nextIndex
;
if
(
ths
->
pLogStore
->
syncLogExist
(
ths
->
pLogStore
,
nextIndex
)
&&
ths
->
pLogStore
->
syncLogExist
(
ths
->
pLogStore
,
nextIndex
-
1
))
{
bool
needStartSnapshot
=
false
;
if
(
nextIndex
>=
SYNC_INDEX_BEGIN
&&
!
ths
->
pLogStore
->
syncLogExist
(
ths
->
pLogStore
,
nextIndex
))
{
needStartSnapshot
=
true
;
}
if
(
nextIndex
-
1
>=
SYNC_INDEX_BEGIN
&&
!
ths
->
pLogStore
->
syncLogExist
(
ths
->
pLogStore
,
nextIndex
-
1
))
{
needStartSnapshot
=
true
;
}
if
(
!
needStartSnapshot
)
{
// do nothing
}
else
{
SSyncRaftEntry
*
pEntry
;
int32_t
code
=
ths
->
pLogStore
->
syncLogGetEntry
(
ths
->
pLogStore
,
nextIndex
,
&
pEntry
);
...
...
@@ -227,6 +252,21 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
nextIndex
=
SYNC_INDEX_BEGIN
;
}
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
nextIndex
);
// event log, update next-index
do
{
char
host
[
64
];
int16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
SyncIndex
newNextIndex
=
nextIndex
;
SyncIndex
newMatchIndex
=
syncIndexMgrGetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
));
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"reset2 next-index:%ld, match-index:%ld for %s:%d"
,
newNextIndex
,
newMatchIndex
,
host
,
port
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
}
return
0
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
5fd4452e
...
...
@@ -1526,12 +1526,14 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
if
(
pSyncNode
!=
NULL
&&
pSyncNode
->
pRaftCfg
!=
NULL
&&
pSyncNode
->
pRaftStore
!=
NULL
)
{
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"strategy:%d, batch:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d, restore:%d, %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
,
pSyncNode
->
restoreFinish
,
printStr
);
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
pRaftCfg
->
snapshotStrategy
,
pSyncNode
->
pRaftCfg
->
batchSize
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
,
pSyncNode
->
restoreFinish
,
printStr
);
}
else
{
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"%s"
,
str
);
}
...
...
@@ -1543,12 +1545,14 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
if
(
pSyncNode
!=
NULL
&&
pSyncNode
->
pRaftCfg
!=
NULL
&&
pSyncNode
->
pRaftStore
!=
NULL
)
{
snprintf
(
s
,
len
,
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"strategy:%d, batch:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d, restore:%d, %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
,
pSyncNode
->
restoreFinish
,
printStr
);
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
pRaftCfg
->
snapshotStrategy
,
pSyncNode
->
pRaftCfg
->
batchSize
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
,
pSyncNode
->
restoreFinish
,
printStr
);
}
else
{
snprintf
(
s
,
len
,
"%s"
,
str
);
}
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
5fd4452e
...
...
@@ -1605,7 +1605,7 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
SyncAppendEntriesBatch
*
syncAppendEntriesBatchBuild
(
SSyncRaftEntry
**
entryPArr
,
int32_t
arrSize
,
int32_t
vgId
)
{
ASSERT
(
entryPArr
!=
NULL
);
ASSERT
(
arrSize
>
0
);
ASSERT
(
arrSize
>
=
0
);
int32_t
dataLen
=
0
;
int32_t
metaArrayLen
=
sizeof
(
SOffsetAndContLen
)
*
arrSize
;
// <offset, contLen>
...
...
source/libs/sync/src/syncRaftCfg.c
浏览文件 @
5fd4452e
...
...
@@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char
*
syncCfg2Str
(
SSyncCfg
*
pSyncCfg
)
{
cJSON
*
pJson
=
syncCfg2Json
(
pSyncCfg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
@@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char
*
syncCfg2SimpleStr
(
SSyncCfg
*
pSyncCfg
)
{
if
(
pSyncCfg
!=
NULL
)
{
int32_t
len
=
512
;
char
*
s
=
taosMemoryMalloc
(
len
);
char
*
s
=
taosMemoryMalloc
(
len
);
memset
(
s
,
0
,
len
);
snprintf
(
s
,
len
,
"{replica-num:%d, my-index:%d, "
,
pSyncCfg
->
replicaNum
,
pSyncCfg
->
myIndex
);
...
...
@@ -206,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char
*
raftCfg2Str
(
SRaftCfg
*
pRaftCfg
)
{
cJSON
*
pJson
=
raftCfg2Json
(
pRaftCfg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
@@ -285,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(
pRaftCfg
->
configIndexArr
)[
i
]
=
atoll
(
pIndex
->
valuestring
);
}
cJSON
*
pJsonSyncCfg
=
cJSON_GetObjectItem
(
pJson
,
"SSyncCfg"
);
cJSON
*
pJsonSyncCfg
=
cJSON_GetObjectItem
(
pJson
,
"SSyncCfg"
);
int32_t
code
=
syncCfgFromJson
(
pJsonSyncCfg
,
&
(
pRaftCfg
->
cfg
));
ASSERT
(
code
==
0
);
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
5fd4452e
...
...
@@ -148,9 +148,17 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
// get entry batch
int32_t
getCount
=
0
;
SyncIndex
getEntryIndex
=
nextIndex
;
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
batchSize
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
batchSize
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
NULL
;
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
getEntryIndex
,
&
pEntry
);
// event log
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"get index:%d, code:%d, %s"
,
getEntryIndex
,
code
,
tstrerror
(
terrno
));
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
if
(
code
==
0
)
{
ASSERT
(
pEntry
!=
NULL
);
entryPArr
[
i
]
=
pEntry
;
...
...
@@ -162,12 +170,19 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
}
}
// event log
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"build batch:%d"
,
getCount
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
// build msg
SyncAppendEntriesBatch
*
pMsg
=
syncAppendEntriesBatchBuild
(
entryPArr
,
getCount
,
pSyncNode
->
vgId
);
ASSERT
(
pMsg
!=
NULL
);
// free entries
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
batchSize
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
batchSize
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
entryPArr
[
i
];
if
(
pEntry
!=
NULL
)
{
syncEntryDestory
(
pEntry
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
5fd4452e
...
...
@@ -573,8 +573,8 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
return
;
}
if
(
nread
<
0
)
{
t
Error
(
"%s conn %p read error: %s, ref: %d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
uv_err_name
(
nread
),
T_REF_VAL_GET
(
conn
));
t
Warn
(
"%s conn %p read error: %s, ref: %d"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
uv_err_name
(
nread
),
T_REF_VAL_GET
(
conn
));
conn
->
broken
=
true
;
cliHandleExcept
(
conn
);
}
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
5fd4452e
...
...
@@ -305,7 +305,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return
;
}
t
Error
(
"%s conn %p read error: %s"
,
transLabel
(
pTransInst
),
conn
,
uv_err_name
(
nread
));
t
Warn
(
"%s conn %p read error: %s"
,
transLabel
(
pTransInst
),
conn
,
uv_err_name
(
nread
));
if
(
nread
<
0
)
{
conn
->
broken
=
true
;
if
(
conn
->
status
==
ConnAcquire
)
{
...
...
source/util/src/tpagedbuf.c
浏览文件 @
5fd4452e
...
...
@@ -518,8 +518,8 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
#ifdef BUF_PAGE_DEBUG
uDebug
(
"page_releaseBufPageInfo pageId:%d, used:%d, offset:%"
PRId64
,
pi
->
pageId
,
pi
->
used
,
pi
->
offset
);
#endif
assert
(
pi
->
pData
!=
NULL
&&
pi
->
used
==
true
);
//
assert(pi->pData != NULL);
//
assert(pi->pData != NULL && pi->used == true);
assert
(
pi
->
pData
!=
NULL
);
pi
->
used
=
false
;
pBuf
->
statis
.
releasePages
+=
1
;
}
...
...
tests/script/jenkins/basic.txt
浏览文件 @
5fd4452e
...
...
@@ -163,8 +163,8 @@
# --- sma
./test.sh -f tsim/sma/drop_sma.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
#
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
#
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
# --- valgrind
./test.sh -f tsim/valgrind/checkError1.sim
...
...
tests/script/loop.sh
浏览文件 @
5fd4452e
...
...
@@ -13,7 +13,7 @@ CMD_NAME=
LOOP_TIMES
=
5
SLEEP_TIME
=
0
while
getopts
"f:t:s:"
arg
while
getopts
"
h
f:t:s:"
arg
do
case
$arg
in
f
)
...
...
@@ -25,6 +25,12 @@ do
s
)
SLEEP_TIME
=
$OPTARG
;;
h
)
echo
"Usage:
$(
basename
$0
)
-f [cmd name] "
echo
" -t [loop times] "
echo
" -s [sleep time] "
exit
0
;;
?
)
echo
"unknow argument"
;;
...
...
tests/script/tsim/sync/vnodesnapshot.sim
浏览文件 @
5fd4452e
...
...
@@ -145,9 +145,9 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
sql flush database db;
#
sql flush database db;
system sh/exec.sh -n dnode4 -s start
#
system sh/exec.sh -n dnode4 -s start
sql insert into ct1 values(now+1s, 81, 8.1, 8.1)(now+2s, -92, -9.2, -9.2)(now+3s, -73, -7.3, -7.3)
...
...
tests/script/tsim/user/basic.sim
浏览文件 @
5fd4452e
...
...
@@ -22,7 +22,7 @@ sql_error ALTER USER root SYSINFO 1
sql_error ALTER USER root enable 0
sql_error ALTER USER root enable 1
sql_error create database db vgroups 1;
#
sql_error create database db vgroups 1;
sql_error GRANT read ON db.* to root;
sql_error GRANT read ON *.* to root;
sql_error REVOKE read ON db.* from root;
...
...
tests/script/tsim/valgrind/basic1.sim
浏览文件 @
5fd4452e
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
#
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
...
...
tests/script/tsim/valgrind/checkError1.sim
浏览文件 @
5fd4452e
...
...
@@ -5,20 +5,22 @@ system sh/exec.sh -n dnode1 -s start -v
sql connect
print =============== step1: show dnodes
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ---
-
> dnode not ready!
print ---> dnode not ready!
return -1
endi
sql show dnodes
print ---
-
> $data00 $data01 $data02 $data03 $data04 $data05
print ---> $data00 $data01 $data02 $data03 $data04 $data05
if $rows != 1 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
print =============== step2: create alter drop show user
sql create user u1 pass 'taosdata'
...
...
tests/script/tsim/valgrind/checkError2.sim
浏览文件 @
5fd4452e
...
...
@@ -10,14 +10,17 @@ step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ---
-
> dnode not ready!
print ---> dnode not ready!
return -1
endi
sql show dnodes
print ---
-
> $data00 $data01 $data02 $data03 $data04 $data05
print ---> $data00 $data01 $data02 $data03 $data04 $data05
if $rows != 1 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
print =============== step2: create db
sql create database d1 vgroups 1 buffer 3
...
...
@@ -32,6 +35,21 @@ if $rows != 1 then
return -1
endi
print =============== step3: create show table
sql create table ct1 using stb tags(1000)
#sql show tables
#if $rows != 1 then
# return -1
#endi
print =============== step5: insert data
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
print =============== step6: select data
#sql select * from ct1
#sql select * from stb
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
...
...
tests/script/tsim/valgrind/checkError3.sim
0 → 100644
浏览文件 @
5fd4452e
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start -v
system sh/exec.sh -n dnode2 -s start -v
sql connect
print =============== add dnode2 into cluster
sql create dnode $hostname port 7200
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ---> dnode not ready!
return -1
endi
sql show dnodes
print ---> $data00 $data01 $data02 $data03 $data04 $data05
print ---> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print =============== create database, stable, table
sql create database db vgroups 3
sql use db
sql create table stb (ts timestamp, c int) tags (t int)
sql create table t0 using stb tags (0)
sql create table tba (ts timestamp, c1 binary(10), c2 nchar(10));
print =============== run show xxxx
sql show dnodes
if $rows != 2 then
return -1
endi
sql show mnodes
if $rows != 1 then
return -1
endi
sql show databases
if $rows != 3 then
return -1
endi
sql show stables
if $rows != 1 then
return -1
endi
sql show tables
if $rows != 2 then
return -1
endi
sql show users
if $rows != 1 then
return -1
endi
sql show vgroups
if $rows != 3 then
return -1
endi
print =============== run select * from information_schema.xxxx
sql select * from information_schema.`dnodes`
if $rows != 2 then
return -1
endi
sql select * from information_schema.`mnodes`
if $rows != 1 then
return -1
endi
sql select * from information_schema.user_databases
if $rows != 3 then
return -1
endi
sql select * from information_schema.user_stables
if $rows != 1 then
return -1
endi
sql select * from information_schema.user_tables
if $rows != 31 then
return -1
endi
sql select * from information_schema.user_users
if $rows != 1 then
return -1
endi
sql select * from information_schema.`vgroups`
if $rows != 3 then
return -1
endi
sql show variables;
if $rows != 4 then
return -1
endi
sql show dnode 1 variables;
if $rows <= 0 then
return -1
endi
sql show local variables;
if $rows <= 0 then
return -1
endi
print ==== stop dnode1 and dnode2, and restart dnodes
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
print =============== check dnode1
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content <= 0 then
return 0
endi
$null=
if $system_content == $null then
return 0
endi
return -1
print =============== check dnode2
system_content sh/checkValgrind.sh -n dnode2
print cmd return result ----> [ $system_content ]
if $system_content <= 0 then
return 0
endi
$null=
if $system_content == $null then
return 0
endi
return -1
\ No newline at end of file
tests/system-test/2-query/json_tag.py
浏览文件 @
5fd4452e
...
...
@@ -323,12 +323,12 @@ class TDTestCase:
# where json value is bool
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'=true"
)
tdSql
.
checkRows
(
0
)
#
tdSql.query("select * from jsons1 where jtag->'tag1'=false")
#
tdSql.checkRows(1)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'=false"
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'!=false"
)
tdSql
.
checkRows
(
0
)
#
tdSql.query("select * from jsons1 where jtag->'tag1'>false")
#
tdSql.checkRows(0)
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'>false"
)
tdSql
.
checkRows
(
0
)
# where json value is null
tdSql
.
query
(
"select * from jsons1 where jtag->'tag1'=null"
)
...
...
tests/system-test/loop.sh
0 → 100755
浏览文件 @
5fd4452e
#!/bin/bash
##################################################
#
# Do simulation test
#
##################################################
set
-e
#set -x
CMD_NAME
=
LOOP_TIMES
=
5
SLEEP_TIME
=
0
while
getopts
"hf:t:s:"
arg
do
case
$arg
in
f
)
CMD_NAME
=
$OPTARG
;;
t
)
LOOP_TIMES
=
$OPTARG
;;
s
)
SLEEP_TIME
=
$OPTARG
;;
h
)
echo
"Usage:
$(
basename
$0
)
-f [cmd name] "
echo
" -t [loop times] "
echo
" -s [sleep time] "
exit
0
;;
?
)
echo
"unknow argument"
;;
esac
done
echo
LOOP_TIMES
${
LOOP_TIMES
}
echo
CMD_NAME
${
CMD_NAME
}
echo
SLEEP_TIME
${
SLEEP_TIME
}
GREEN
=
'\033[1;32m'
GREEN_DARK
=
'\033[0;32m'
GREEN_UNDERLINE
=
'\033[4;32m'
NC
=
'\033[0m'
for
((
i
=
0
;
i<
$LOOP_TIMES
;
i++
))
do
echo
-e
$GREEN
loop
$i
$NC
echo
-e
$GREEN
cmd
$CMD_NAME
$NC
$CMD_NAME
sleep
${
SLEEP_TIME
}
done
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录