Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
53a13db8
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
53a13db8
编写于
7月 10, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feat/tsdb_snapshot
上级
699c9572
644963b7
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
509 addition
and
387 deletion
+509
-387
include/common/tmsg.h
include/common/tmsg.h
+32
-31
include/libs/executor/executor.h
include/libs/executor/executor.h
+1
-1
include/libs/wal/wal.h
include/libs/wal/wal.h
+1
-0
include/util/taoserror.h
include/util/taoserror.h
+3
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+2
-0
source/client/src/tmq.c
source/client/src/tmq.c
+2
-2
source/common/src/tmsg.c
source/common/src/tmsg.c
+23
-30
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+35
-14
source/dnode/mnode/impl/test/stb/stb.cpp
source/dnode/mnode/impl/test/stb/stb.cpp
+1
-3
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+2
-3
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+14
-17
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+151
-135
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+11
-23
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+1
-5
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+13
-13
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+16
-3
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+50
-3
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+37
-25
source/libs/qworker/src/qwDbg.c
source/libs/qworker/src/qwDbg.c
+1
-1
source/libs/scheduler/inc/schInt.h
source/libs/scheduler/inc/schInt.h
+39
-37
source/libs/scheduler/src/schJob.c
source/libs/scheduler/src/schJob.c
+1
-1
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+1
-1
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+32
-10
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+31
-28
source/util/src/terror.c
source/util/src/terror.c
+2
-0
tests/test/c/sdbDump.c
tests/test/c/sdbDump.c
+7
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
53a13db8
...
...
@@ -169,6 +169,9 @@ typedef enum _mgmt_table {
#define TD_CHILD_TABLE TSDB_CHILD_TABLE
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
#define TD_REQ_FROM_APP 0
#define TD_REQ_FROM_TAOX 1
typedef
struct
{
int32_t
vgId
;
char
*
dbFName
;
...
...
@@ -432,30 +435,30 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW
STSchema
*
tdGetSTSChemaFromSSChema
(
SSchema
**
pSchema
,
int32_t
nCols
);
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
igExists
;
int
64_t
delay1
;
int
64_t
delay2
;
int64_t
watermark1
;
int64_t
watermark2
;
int
32_t
ttl
;
int
32_t
numOfColumns
;
int
32_t
numOfTags
;
int32_t
numOfFuncs
;
int32_t
commentLen
;
int32_t
ast1Len
;
int32_t
ast2Len
;
SArray
*
pColumns
;
// array of SField
int32_t
cVersion
;
SArray
*
pTags
;
// array of SField
int32_t
tVersio
n
;
SArray
*
pFuncs
;
char
*
pComment
;
char
*
pAst1
;
char
*
pAst2
;
tb_uid_t
suid
;
int8_t
source
;
// 1-taosX or 0-taosClient
int8_t
reserved
[
8
]
;
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
igExists
;
int
8_t
source
;
// 1-taosX or 0-taosClient
int
8_t
reserved
[
6
]
;
tb_uid_t
suid
;
int64_t
delay1
;
int
64_t
delay2
;
int
64_t
watermark1
;
int
64_t
watermark2
;
int32_t
ttl
;
int32_t
colVer
;
int32_t
tagVer
;
int32_t
numOfColumns
;
int32_t
numOfTags
;
int32_t
numOfFuncs
;
int32_t
commentLen
;
int32_t
ast1Le
n
;
int32_t
ast2Len
;
SArray
*
pColumns
;
// array of SField
SArray
*
pTags
;
// array of SField
SArray
*
pFuncs
;
char
*
pComment
;
char
*
pAst1
;
char
*
pAst2
;
}
SMCreateStbReq
;
int32_t
tSerializeSMCreateStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateStbReq
*
pReq
);
...
...
@@ -463,11 +466,11 @@ int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pR
void
tFreeSMCreateStbReq
(
SMCreateStbReq
*
pReq
);
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
igNotExists
;
tb_uid_t
suid
;
int8_t
source
;
// 1-taosX or 0-taosClient
int8_t
reserved
[
8
]
;
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
igNotExists
;
int8_t
source
;
// 1-taosX or 0-taosClient
int8_t
reserved
[
6
];
tb_uid_t
suid
;
}
SMDropStbReq
;
int32_t
tSerializeSMDropStbReq
(
void
*
buf
,
int32_t
bufLen
,
SMDropStbReq
*
pReq
);
...
...
@@ -476,8 +479,6 @@ int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
int8_t
alterType
;
int32_t
tagVer
;
int32_t
colVer
;
int32_t
numOfFields
;
SArray
*
pFields
;
int32_t
ttl
;
...
...
include/libs/executor/executor.h
浏览文件 @
53a13db8
...
...
@@ -176,7 +176,7 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
int32_t
qStreamPrepareTsdbScan
(
qTaskInfo_t
tinfo
,
uint64_t
uid
,
int64_t
ts
);
int32_t
qStreamPrepareScan
1
(
qTaskInfo_t
tinfo
,
const
STqOffsetVal
*
pOffset
);
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
const
STqOffsetVal
*
pOffset
);
int32_t
qStreamExtractOffset
(
qTaskInfo_t
tinfo
,
STqOffsetVal
*
pOffset
);
...
...
include/libs/wal/wal.h
浏览文件 @
53a13db8
...
...
@@ -133,6 +133,7 @@ typedef struct {
int64_t
curFileFirstVer
;
int64_t
curVersion
;
int64_t
capacity
;
int8_t
curInvalid
;
TdThreadMutex
mutex
;
SWalFilterCond
cond
;
SWalCkHead
*
pHead
;
...
...
include/util/taoserror.h
浏览文件 @
53a13db8
...
...
@@ -247,9 +247,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AE)
#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AF)
#define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03B0)
#define TSDB_CODE_MND_INVALID_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x03B1)
#define TSDB_CODE_MND_STABLE_UID_NOT_MATCH TAOS_DEF_ERROR_CODE(0, 0x03B2)
// mnode-infoSchema
#define TSDB_CODE_MND_INVALID_SYS_TABLENAME
TAOS_DEF_ERROR_CODE(0, 0x03BA)
#define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x03BA)
// mnode-func
#define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03C0)
...
...
source/client/src/clientImpl.c
浏览文件 @
53a13db8
...
...
@@ -325,11 +325,13 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
if
(
pInfo
->
pQnodeList
)
{
taosArrayDestroy
(
pInfo
->
pQnodeList
);
pInfo
->
pQnodeList
=
NULL
;
tscDebug
(
"QnodeList cleared in cluster 0x%"
PRIx64
,
pInfo
->
clusterId
);
}
if
(
pNodeList
)
{
pInfo
->
pQnodeList
=
taosArrayDup
(
pNodeList
);
taosArraySort
(
pInfo
->
pQnodeList
,
compareQueryNodeLoad
);
tscDebug
(
"QnodeList updated in cluster 0x%"
PRIx64
", num:%d"
,
pInfo
->
clusterId
,
taosArrayGetSize
(
pInfo
->
pQnodeList
));
}
taosThreadMutexUnlock
(
&
pInfo
->
qnodeMutex
);
...
...
source/client/src/tmq.c
浏览文件 @
53a13db8
...
...
@@ -2305,8 +2305,8 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
strcpy
(
field
.
name
,
pSchema
->
name
);
taosArrayPush
(
pReq
.
pTags
,
&
field
);
}
pReq
.
c
Version
=
req
.
schemaRow
.
version
;
pReq
.
t
Version
=
req
.
schemaTag
.
version
;
pReq
.
c
olVer
=
req
.
schemaRow
.
version
;
pReq
.
t
agVer
=
req
.
schemaTag
.
version
;
pReq
.
numOfColumns
=
req
.
schemaRow
.
nCols
;
pReq
.
numOfTags
=
req
.
schemaTag
.
nCols
;
pReq
.
commentLen
=
-
1
;
...
...
source/common/src/tmsg.c
浏览文件 @
53a13db8
...
...
@@ -381,9 +381,11 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp)
if
(
pQnodeNum
>
0
)
{
pRsp
->
query
->
pQnodeList
=
taosArrayInit
(
pQnodeNum
,
sizeof
(
SQueryNodeLoad
));
if
(
NULL
==
pRsp
->
query
->
pQnodeList
)
return
-
1
;
SQueryNodeLoad
load
=
{
0
};
if
(
tDecodeSQueryNodeLoad
(
pDecoder
,
&
load
)
<
0
)
return
-
1
;
taosArrayPush
(
pRsp
->
query
->
pQnodeList
,
&
load
);
for
(
int32_t
i
=
0
;
i
<
pQnodeNum
;
++
i
)
{
SQueryNodeLoad
load
=
{
0
};
if
(
tDecodeSQueryNodeLoad
(
pDecoder
,
&
load
)
<
0
)
return
-
1
;
taosArrayPush
(
pRsp
->
query
->
pQnodeList
,
&
load
);
}
}
}
...
...
@@ -496,11 +498,18 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
source
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sizeof
(
pReq
->
reserved
)
/
sizeof
(
int8_t
);
++
i
)
{
if
(
tEncodeI8
(
&
encoder
,
pReq
->
reserved
[
i
])
<
0
)
return
-
1
;
}
if
(
tEncodeI64
(
&
encoder
,
pReq
->
suid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
delay1
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
delay2
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
watermark1
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
watermark2
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
ttl
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
colVer
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
tagVer
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfColumns
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfTags
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfFuncs
)
<
0
)
return
-
1
;
...
...
@@ -516,8 +525,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
tEncodeCStr
(
&
encoder
,
pField
->
name
)
<
0
)
return
-
1
;
}
if
(
tEncodeI32
(
&
encoder
,
pReq
->
cVersion
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfTags
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pReq
->
pTags
,
i
);
if
(
tEncodeI8
(
&
encoder
,
pField
->
type
)
<
0
)
return
-
1
;
...
...
@@ -526,8 +533,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
tEncodeCStr
(
&
encoder
,
pField
->
name
)
<
0
)
return
-
1
;
}
if
(
tEncodeI32
(
&
encoder
,
pReq
->
tVersion
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfFuncs
;
++
i
)
{
const
char
*
pFunc
=
taosArrayGet
(
pReq
->
pFuncs
,
i
);
if
(
tEncodeCStr
(
&
encoder
,
pFunc
)
<
0
)
return
-
1
;
...
...
@@ -542,11 +547,6 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if
(
pReq
->
ast2Len
>
0
)
{
if
(
tEncodeBinary
(
&
encoder
,
pReq
->
pAst2
,
pReq
->
ast2Len
)
<
0
)
return
-
1
;
}
if
(
tEncodeI64
(
&
encoder
,
pReq
->
suid
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
source
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sizeof
(
pReq
->
reserved
)
/
sizeof
(
int8_t
);
++
i
)
{
if
(
tEncodeI8
(
&
encoder
,
pReq
->
reserved
[
i
])
<
0
)
return
-
1
;
}
tEndEncode
(
&
encoder
);
...
...
@@ -562,11 +562,18 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
source
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sizeof
(
pReq
->
reserved
)
/
sizeof
(
int8_t
);
++
i
)
{
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
reserved
[
i
])
<
0
)
return
-
1
;
}
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
suid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
delay1
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
delay2
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
watermark1
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
watermark2
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
ttl
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
colVer
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
tagVer
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfColumns
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfTags
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfFuncs
)
<
0
)
return
-
1
;
...
...
@@ -594,8 +601,6 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
}
}
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
cVersion
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfTags
;
++
i
)
{
SField
field
=
{
0
};
if
(
tDecodeI8
(
&
decoder
,
&
field
.
type
)
<
0
)
return
-
1
;
...
...
@@ -608,8 +613,6 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
}
}
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
tVersion
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfFuncs
;
++
i
)
{
char
pFunc
[
TSDB_FUNC_NAME_LEN
]
=
{
0
};
if
(
tDecodeCStrTo
(
&
decoder
,
pFunc
)
<
0
)
return
-
1
;
...
...
@@ -637,12 +640,6 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
pAst2
)
<
0
)
return
-
1
;
}
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
suid
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
source
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sizeof
(
pReq
->
reserved
)
/
sizeof
(
int8_t
);
++
i
)
{
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
reserved
[
i
])
<
0
)
return
-
1
;
}
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
...
...
@@ -664,11 +661,11 @@ int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igNotExists
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
suid
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
source
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sizeof
(
pReq
->
reserved
)
/
sizeof
(
int8_t
);
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
sizeof
(
pReq
->
reserved
)
/
sizeof
(
int8_t
);
++
i
)
{
if
(
tEncodeI8
(
&
encoder
,
pReq
->
reserved
[
i
])
<
0
)
return
-
1
;
}
if
(
tEncodeI64
(
&
encoder
,
pReq
->
suid
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
...
...
@@ -683,11 +680,11 @@ int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq)
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
igNotExists
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
suid
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
source
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
sizeof
(
pReq
->
reserved
)
/
sizeof
(
int8_t
);
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
sizeof
(
pReq
->
reserved
)
/
sizeof
(
int8_t
);
++
i
)
{
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
reserved
[
i
])
<
0
)
return
-
1
;
}
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
suid
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
...
...
@@ -702,8 +699,6 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq)
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
alterType
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
tagVer
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
colVer
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfFields
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfFields
;
++
i
)
{
SField
*
pField
=
taosArrayGet
(
pReq
->
pFields
,
i
);
...
...
@@ -730,8 +725,6 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
name
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
alterType
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
tagVer
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
colVer
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfFields
)
<
0
)
return
-
1
;
pReq
->
pFields
=
taosArrayInit
(
pReq
->
numOfFields
,
sizeof
(
SField
));
if
(
pReq
->
pFields
==
NULL
)
{
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
53a13db8
...
...
@@ -707,8 +707,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pDst
->
updateTime
=
pDst
->
createdTime
;
pDst
->
uid
=
(
pCreate
->
source
==
1
)
?
pCreate
->
suid
:
mndGenerateUid
(
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
pDst
->
dbUid
=
pDb
->
uid
;
pDst
->
tagVer
=
(
pCreate
->
source
==
1
)
?
pCreate
->
tVersion
:
1
;
pDst
->
colVer
=
(
pCreate
->
source
==
1
)
?
pCreate
->
cVersion
:
1
;
pDst
->
tagVer
=
(
pCreate
->
source
!=
TD_REQ_FROM_APP
)
?
pCreate
->
tagVer
:
1
;
pDst
->
colVer
=
(
pCreate
->
source
!=
TD_REQ_FROM_APP
)
?
pCreate
->
colVer
:
1
;
pDst
->
smaVer
=
1
;
pDst
->
nextColId
=
1
;
pDst
->
maxdelay
[
0
]
=
pCreate
->
delay1
;
...
...
@@ -869,9 +869,38 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
pStb
=
mndAcquireStb
(
pMnode
,
createReq
.
name
);
if
(
pStb
!=
NULL
)
{
if
(
createReq
.
igExists
)
{
mDebug
(
"stb:%s, already exist, ignore exist is set"
,
createReq
.
name
);
code
=
0
;
goto
_OVER
;
if
(
createReq
.
source
==
TD_REQ_FROM_APP
)
{
mDebug
(
"stb:%s, already exist, ignore exist is set"
,
createReq
.
name
);
code
=
0
;
goto
_OVER
;
}
else
if
(
pStb
->
uid
!=
createReq
.
suid
)
{
mError
(
"stb:%s, already exist while create, input suid:%"
PRId64
" not match with exist suid:%"
PRId64
,
createReq
.
name
,
createReq
.
suid
,
pStb
->
uid
);
terrno
=
TSDB_CODE_MND_STABLE_UID_NOT_MATCH
;
goto
_OVER
;
}
else
if
(
createReq
.
tagVer
>
0
||
createReq
.
colVer
>
0
)
{
int32_t
tagDelta
=
pStb
->
tagVer
-
createReq
.
tagVer
;
int32_t
colDelta
=
pStb
->
colVer
-
createReq
.
colVer
;
int32_t
verDelta
=
tagDelta
+
verDelta
;
mInfo
(
"stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d"
,
createReq
.
name
,
createReq
.
tagVer
,
createReq
.
colVer
,
pStb
->
tagVer
,
pStb
->
colVer
);
if
(
tagDelta
<=
0
&&
colDelta
<=
0
)
{
mInfo
(
"stb:%s, schema version is not incremented and nothing needs to be done"
,
createReq
.
name
);
code
=
0
;
goto
_OVER
;
}
else
if
((
tagDelta
==
1
||
colDelta
==
1
)
&&
(
verDelta
==
1
))
{
mInfo
(
"stb:%s, schema version is only increased by 1 number, do alter operation"
,
createReq
.
name
);
}
else
{
mError
(
"stb:%s, schema version increase more than 1 number, error is returned"
,
createReq
.
name
);
terrno
=
TSDB_CODE_MND_INVALID_SCHEMA_VER
;
goto
_OVER
;
}
}
else
{
mError
(
"stb:%s, already exist while create, input tagVer:%d colVer:%d is invalid"
,
createReq
.
name
,
createReq
.
tagVer
,
createReq
.
colVer
,
pStb
->
tagVer
,
pStb
->
colVer
);
terrno
=
TSDB_CODE_MND_INVALID_SCHEMA_VER
;
goto
_OVER
;
}
}
else
{
terrno
=
TSDB_CODE_MND_STB_ALREADY_EXIST
;
goto
_OVER
;
...
...
@@ -1614,14 +1643,6 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
goto
_OVER
;
}
if
((
alterReq
.
tagVer
>
0
&&
alterReq
.
colVer
>
0
)
&&
(
alterReq
.
tagVer
<=
pStb
->
tagVer
||
alterReq
.
colVer
<=
pStb
->
colVer
))
{
mDebug
(
"stb:%s, already exist, tagVer:%d colVer:%d smaller than in mnode, tagVer:%d colVer:%d, alter success"
,
alterReq
.
name
,
alterReq
.
tagVer
,
alterReq
.
colVer
,
pStb
->
tagVer
,
pStb
->
colVer
);
code
=
0
;
goto
_OVER
;
}
if
(
mndCheckDbPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_WRITE_DB
,
pDb
)
!=
0
)
{
goto
_OVER
;
}
...
...
@@ -1752,7 +1773,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
}
}
if
(
dropReq
.
source
==
1
&&
pStb
->
uid
!=
dropReq
.
suid
)
{
if
(
dropReq
.
source
!=
TD_REQ_FROM_APP
&&
pStb
->
uid
!=
dropReq
.
suid
)
{
terrno
=
TSDB_CODE_MND_STB_NOT_EXIST
;
goto
_OVER
;
}
...
...
source/dnode/mnode/impl/test/stb/stb.cpp
浏览文件 @
53a13db8
...
...
@@ -277,8 +277,6 @@ void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const c
req
.
numOfFields
=
1
;
req
.
pFields
=
taosArrayInit
(
1
,
sizeof
(
SField
));
req
.
alterType
=
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
;
req
.
tagVer
=
verInBlock
;
req
.
colVer
=
verInBlock
;
SField
field
=
{
0
};
field
.
bytes
=
bytes
;
...
...
@@ -818,7 +816,7 @@ TEST_F(MndTestStb, 08_Alter_Stb_AlterTagBytes) {
{
void
*
pReq
=
BuildAlterStbUpdateColumnBytesReq
(
stbname
,
"col_not_exist"
,
20
,
&
contLen
,
1
);
SRpcMsg
*
pRsp
=
test
.
SendReq
(
TDMT_MND_ALTER_STB
,
pReq
,
contLen
);
ASSERT_EQ
(
pRsp
->
code
,
0
);
ASSERT_EQ
(
pRsp
->
code
,
TSDB_CODE_MND_COLUMN_NOT_EXIST
);
test
.
SendShowReq
(
TSDB_MGMT_TABLE_STB
,
"user_stables"
,
dbname
);
EXPECT_EQ
(
test
.
GetShowRows
(),
1
);
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
53a13db8
...
...
@@ -49,6 +49,7 @@ struct SSmaEnv {
typedef
struct
{
int8_t
inited
;
int32_t
rsetId
;
void
*
tmrHandle
;
// shared by all fetch tasks
}
SSmaMgmt
;
#define SMA_ENV_LOCK(env) ((env)->lock)
...
...
@@ -65,7 +66,6 @@ struct SRSmaStat {
SSma
*
pSma
;
int64_t
submitVer
;
int64_t
refId
;
// shared by fetch tasks
void
*
tmrHandle
;
// shared by fetch tasks
int8_t
triggerStat
;
// shared by fetch tasks
int8_t
runningStat
;
// for persistence task
SHashObj
*
rsmaInfoHash
;
// key: stbUid, value: SRSmaInfo;
...
...
@@ -82,7 +82,6 @@ struct SSmaStat {
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
#define RSMA_TMR_HANDLE(r) ((r)->tmrHandle)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_RUNNING_STAT(r) (&(r)->runningStat)
#define RSMA_REF_ID(r) ((r)->refId)
...
...
@@ -189,7 +188,7 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
static
int32_t
tdDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
);
void
*
tdFreeSmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
);
void
*
tdFreeRSmaInfo
(
SRSmaInfo
*
pInfo
);
void
*
tdFreeRSmaInfo
(
S
Sma
*
pSma
,
S
RSmaInfo
*
pInfo
);
int32_t
tdRSmaPersistExecImpl
(
SRSmaStat
*
pRSmaStat
);
int32_t
tdProcessRSmaCreateImpl
(
SSma
*
pSma
,
SRSmaParam
*
param
,
int64_t
suid
,
const
char
*
tbName
);
...
...
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
53a13db8
...
...
@@ -49,16 +49,26 @@ int32_t smaInit() {
}
if
(
old
==
0
)
{
// init tref rset
smaMgmt
.
rsetId
=
taosOpenRef
(
SMA_MGMT_REF_NUM
,
tdDestroyRSmaStat
);
if
(
smaMgmt
.
rsetId
<
0
)
{
atomic_store_8
(
&
smaMgmt
.
inited
,
0
);
smaError
(
"failed to init sma rset since %s"
,
terrstr
());
return
TSDB_CODE_FAILED
;
}
// init fetch timer handle
smaMgmt
.
tmrHandle
=
taosTmrInit
(
10000
,
100
,
10000
,
"RSMA"
);
if
(
!
smaMgmt
.
tmrHandle
)
{
taosCloseRef
(
smaMgmt
.
rsetId
);
atomic_store_8
(
&
smaMgmt
.
inited
,
0
);
smaError
(
"failed to init sma tmr hanle since %s"
,
terrstr
());
return
TSDB_CODE_FAILED
;
}
smaInfo
(
"sma rset is initialized, rsetId:%d"
,
smaMgmt
.
rsetId
);
atomic_store_8
(
&
smaMgmt
.
inited
,
1
);
smaInfo
(
"sma mgmt env is initialized, rsetId:%d, tmrHandle:%p"
,
smaMgmt
.
rsetId
,
smaMgmt
.
tmrHandle
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -81,8 +91,9 @@ void smaCleanUp() {
}
if
(
old
==
1
)
{
smaInfo
(
"sma rset is cleaned up, resetId:%d"
,
smaMgmt
.
rsetId
);
taosCloseRef
(
smaMgmt
.
rsetId
);
taosTmrCleanUp
(
smaMgmt
.
tmrHandle
);
smaInfo
(
"sma mgmt env is cleaned up, rsetId:%d, tmrHandle:%p"
,
smaMgmt
.
rsetId
,
smaMgmt
.
tmrHandle
);
atomic_store_8
(
&
smaMgmt
.
inited
,
0
);
}
}
...
...
@@ -203,20 +214,11 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
}
pRSmaStat
->
refId
=
refId
;
// init timer
RSMA_TMR_HANDLE
(
pRSmaStat
)
=
taosTmrInit
(
10000
,
100
,
10000
,
"RSMA"
);
if
(
!
RSMA_TMR_HANDLE
(
pRSmaStat
))
{
taosMemoryFreeClear
(
*
pSmaStat
);
return
TSDB_CODE_FAILED
;
}
// init hash
RSMA_INFO_HASH
(
pRSmaStat
)
=
taosHashInit
(
RSMA_TASK_INFO_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
!
RSMA_INFO_HASH
(
pRSmaStat
))
{
if
(
RSMA_TMR_HANDLE
(
pRSmaStat
))
{
taosTmrCleanUp
(
RSMA_TMR_HANDLE
(
pRSmaStat
));
}
taosMemoryFreeClear
(
*
pSmaStat
);
return
TSDB_CODE_FAILED
;
}
...
...
@@ -277,7 +279,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
void
*
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pStat
),
NULL
);
while
(
infoHash
)
{
SRSmaInfo
*
pSmaInfo
=
*
(
SRSmaInfo
**
)
infoHash
;
tdFreeRSmaInfo
(
pSmaInfo
);
tdFreeRSmaInfo
(
pSma
,
pSma
Info
);
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pStat
),
infoHash
);
}
}
...
...
@@ -298,11 +300,6 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
nLoops
=
0
;
}
}
// step 6: cleanup the timer handle
if
(
RSMA_TMR_HANDLE
(
pStat
))
{
taosTmrCleanUp
(
RSMA_TMR_HANDLE
(
pStat
));
}
}
}
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
53a13db8
...
...
@@ -27,16 +27,19 @@ SSmaMgmt smaMgmt = {
typedef
struct
SRSmaQTaskInfoItem
SRSmaQTaskInfoItem
;
typedef
struct
SRSmaQTaskInfoIter
SRSmaQTaskInfoIter
;
static
int32_t
tdUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
static
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
);
static
int32_t
tdSetRSmaInfoItemParams
(
SSma
*
pSma
,
SRSmaParam
*
param
,
SRSmaInfo
*
pRSmaInfo
,
SReadHandle
*
handle
,
int8_t
idx
);
static
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
rsmaItem
,
tb_uid_t
suid
,
int8_t
level
);
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
);
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
);
static
void
*
tdRSmaPersistExec
(
void
*
param
);
static
void
tdRSmaQTaskInfoGetFName
(
int32_t
vid
,
int64_t
version
,
char
*
outputName
);
static
int32_t
tdUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
static
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
);
static
int32_t
tdSetRSmaInfoItemParams
(
SSma
*
pSma
,
SRSmaParam
*
param
,
SRSmaStat
*
pStat
,
SRSmaInfo
*
pRSmaInfo
,
SReadHandle
*
handle
,
int8_t
idx
);
static
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
rsmaItem
,
STSchema
*
pTSchema
,
tb_uid_t
suid
,
int8_t
level
);
static
SRSmaInfo
*
tdGetRSmaInfoBySuid
(
SSma
*
pSma
,
int64_t
suid
);
static
int32_t
tdRSmaFetchAndSubmitResult
(
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
int64_t
suid
,
SRSmaStat
*
pStat
,
int8_t
blkType
);
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
);
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
);
static
void
*
tdRSmaPersistExec
(
void
*
param
);
static
void
tdRSmaQTaskInfoGetFName
(
int32_t
vid
,
int64_t
version
,
char
*
outputName
);
static
int32_t
tdRSmaQTaskInfoIterInit
(
SRSmaQTaskInfoIter
*
pIter
,
STFile
*
pTFile
);
static
int32_t
tdRSmaQTaskInfoIterNextBlock
(
SRSmaQTaskInfoIter
*
pIter
,
bool
*
isFinish
);
...
...
@@ -48,25 +51,26 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed);
static
int32_t
tdRSmaRestoreTSDataReload
(
SSma
*
pSma
,
int64_t
committed
);
struct
SRSmaInfoItem
{
SRSmaInfo
*
pRsmaInfo
;
int64_t
refId
;
void
*
taskInfo
;
// qTaskInfo_t
tmr_h
tmrId
;
int8_t
level
;
int8_t
tmrInitFlag
;
int8_t
triggerStat
;
int32_t
maxDelay
;
void
*
taskInfo
;
// qTaskInfo_t
int64_t
refId
;
tmr_h
tmrId
;
int32_t
maxDelay
;
int8_t
level
;
int8_t
triggerStat
;
};
struct
SRSmaInfo
{
STSchema
*
pTSchema
;
SRSmaStat
*
pStat
;
int64_t
suid
;
SRSmaInfoItem
items
[
TSDB_RETENTION_L2
];
};
#define RSMA_INFO_SMA(r) ((r)->pStat->pSma)
#define RSMA_INFO_STAT(r) ((r)->pStat)
static
SRSmaInfo
*
tdGetRSmaInfoByItem
(
SRSmaInfoItem
*
pItem
)
{
// adapt accordingly if definition of SRSmaInfo update
int32_t
rsmaInfoHeadLen
=
sizeof
(
int64_t
)
+
sizeof
(
STSchema
*
);
ASSERT
(
pItem
->
level
==
1
||
pItem
->
level
==
2
);
return
(
SRSmaInfo
*
)
POINTER_SHIFT
(
pItem
,
-
sizeof
(
SRSmaInfoItem
)
*
(
pItem
->
level
-
1
)
-
rsmaInfoHeadLen
);
}
struct
SRSmaQTaskInfoItem
{
int32_t
len
;
...
...
@@ -108,9 +112,8 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId,
}
}
void
*
tdFreeRSmaInfo
(
SRSmaInfo
*
pInfo
)
{
void
*
tdFreeRSmaInfo
(
S
Sma
*
pSma
,
S
RSmaInfo
*
pInfo
)
{
if
(
pInfo
)
{
SSma
*
pSma
=
RSMA_INFO_SMA
(
pInfo
);
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
SRSmaInfoItem
*
pItem
=
&
pInfo
->
items
[
i
];
if
(
pItem
->
taskInfo
)
{
...
...
@@ -143,8 +146,6 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
}
static
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
)
{
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
if
(
!
suid
||
!
tbUids
)
{
...
...
@@ -153,8 +154,9 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
return
TSDB_CODE_FAILED
;
}
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
suid
,
sizeof
(
tb_uid_t
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
pRSmaInfo
=
tdGetRSmaInfoBySuid
(
pSma
,
*
suid
);
if
(
!
pRSmaInfo
)
{
smaError
(
"vgId:%d, failed to get rsma info for uid:%"
PRIi64
,
SMA_VID
(
pSma
),
*
suid
);
terrno
=
TSDB_CODE_RSMA_INVALID_STAT
;
return
TSDB_CODE_FAILED
;
...
...
@@ -252,15 +254,14 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tdSetRSmaInfoItemParams
(
SSma
*
pSma
,
SRSmaParam
*
param
,
SRSma
Info
*
pRSmaInfo
,
SReadHandle
*
pReadHandle
,
int8_t
idx
)
{
static
int32_t
tdSetRSmaInfoItemParams
(
SSma
*
pSma
,
SRSmaParam
*
param
,
SRSma
Stat
*
pStat
,
SRSmaInfo
*
pRSmaInfo
,
SReadHandle
*
pReadHandle
,
int8_t
idx
)
{
SRetention
*
pRetention
=
SMA_RETENTION
(
pSma
);
STsdbCfg
*
pTsdbCfg
=
SMA_TSDB_CFG
(
pSma
);
if
(
param
->
qmsg
[
idx
])
{
SRSmaInfoItem
*
pItem
=
&
(
pRSmaInfo
->
items
[
idx
]);
pItem
->
refId
=
RSMA_REF_ID
(
pRSmaInfo
->
pStat
);
pItem
->
pRsmaInfo
=
pRSmaInfo
;
pItem
->
refId
=
RSMA_REF_ID
(
pStat
);
pItem
->
taskInfo
=
qCreateStreamExecTaskInfo
(
param
->
qmsg
[
idx
],
pReadHandle
);
if
(
!
pItem
->
taskInfo
)
{
terrno
=
TSDB_CODE_RSMA_QTASKINFO_CREATE
;
...
...
@@ -348,14 +349,13 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto
_err
;
}
pRSmaInfo
->
pTSchema
=
pTSchema
;
pRSmaInfo
->
pStat
=
pStat
;
pRSmaInfo
->
suid
=
suid
;
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
pRSmaInfo
,
&
handle
,
0
)
<
0
)
{
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
p
Stat
,
p
RSmaInfo
,
&
handle
,
0
)
<
0
)
{
goto
_err
;
}
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
pRSmaInfo
,
&
handle
,
1
)
<
0
)
{
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
p
Stat
,
p
RSmaInfo
,
&
handle
,
1
)
<
0
)
{
goto
_err
;
}
...
...
@@ -367,7 +367,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
return
TSDB_CODE_SUCCESS
;
_err:
tdFreeRSmaInfo
(
pRSmaInfo
);
tdFreeRSmaInfo
(
p
Sma
,
p
RSmaInfo
);
taosMemoryFree
(
pReader
);
return
TSDB_CODE_FAILED
;
}
...
...
@@ -538,10 +538,10 @@ int64_t tdRSmaGetMaxSubmitVer(SSma *pSma, int8_t level) {
return
atomic_load_64
(
&
pRSmaStat
->
submitVer
);
}
static
int32_t
td
FetchAndSubmitRSmaResult
(
SRSmaInfoItem
*
pItem
,
int8_t
blkType
)
{
SArray
*
pResult
=
NULL
;
S
RSmaInfo
*
pRSmaInfo
=
pItem
->
pRsmaInfo
;
SSma
*
pSma
=
RSMA_INFO_SMA
(
pRSmaInfo
)
;
static
int32_t
td
RSmaFetchAndSubmitResult
(
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
int64_t
suid
,
SRSmaStat
*
pStat
,
int8_t
blkType
)
{
S
Array
*
pResult
=
NULL
;
SSma
*
pSma
=
pStat
->
pSma
;
while
(
1
)
{
SSDataBlock
*
output
=
NULL
;
...
...
@@ -573,16 +573,16 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
STsdb
*
sinkTsdb
=
(
pItem
->
level
==
TSDB_RETENTION_L1
?
pSma
->
pRSmaTsdb1
:
pSma
->
pRSmaTsdb2
);
SSubmitReq
*
pReq
=
NULL
;
// TODO: the schema update should be handled
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
pResult
,
p
RSmaInfo
->
pTSchema
,
SMA_VID
(
pSma
),
pRSmaInfo
->
suid
)
<
0
)
{
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
pResult
,
p
TSchema
,
SMA_VID
(
pSma
),
suid
)
<
0
)
{
smaError
(
"vgId:%d, build submit req for rsma table %"
PRIi64
"l evel %"
PRIi8
" failed since %s"
,
SMA_VID
(
pSma
),
pRSmaInfo
->
suid
,
pItem
->
level
,
terrstr
());
suid
,
pItem
->
level
,
terrstr
());
goto
_err
;
}
if
(
pReq
&&
tdProcessSubmitReq
(
sinkTsdb
,
atomic_add_fetch_64
(
&
p
RSmaInfo
->
p
Stat
->
submitVer
,
1
),
pReq
)
<
0
)
{
if
(
pReq
&&
tdProcessSubmitReq
(
sinkTsdb
,
atomic_add_fetch_64
(
&
pStat
->
submitVer
,
1
),
pReq
)
<
0
)
{
taosMemoryFreeClear
(
pReq
);
smaError
(
"vgId:%d, process submit req for rsma table %"
PRIi64
" level %"
PRIi8
" failed since %s"
,
SMA_VID
(
pSma
),
pRSmaInfo
->
suid
,
pItem
->
level
,
terrstr
());
suid
,
pItem
->
level
,
terrstr
());
goto
_err
;
}
...
...
@@ -600,84 +600,16 @@ _err:
return
TSDB_CODE_FAILED
;
}
/**
* @brief trigger to get rsma result
*
* @param param
* @param tmrId
*/
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaInfoItem
*
pItem
=
param
;
SSma
*
pSma
=
NULL
;
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
tdAcquireSmaRef
(
smaMgmt
.
rsetId
,
pItem
->
refId
,
__func__
,
__LINE__
);
if
(
!
pStat
)
{
smaDebug
(
"rsma fetch task not start since already destroyed, rsetId rsetId:%"
PRIi64
" refId:%d)"
,
smaMgmt
.
rsetId
,
pItem
->
refId
);
return
;
}
pSma
=
RSMA_INFO_SMA
(
pItem
->
pRsmaInfo
);
// if rsma trigger stat in paused, cancelled or finished, not start fetch task
int8_t
rsmaTriggerStat
=
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
));
switch
(
rsmaTriggerStat
)
{
case
TASK_TRIGGER_STAT_PAUSED
:
case
TASK_TRIGGER_STAT_CANCELLED
:
case
TASK_TRIGGER_STAT_FINISHED
:
{
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pItem
->
refId
,
__func__
,
__LINE__
);
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is %"
PRIi8
", rsetId rsetId:%"
PRIi64
" refId:%d"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
,
rsmaTriggerStat
,
smaMgmt
.
rsetId
,
pItem
->
refId
);
return
;
}
default:
break
;
}
int8_t
fetchTriggerStat
=
atomic_val_compare_exchange_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
switch
(
fetchTriggerStat
)
{
case
TASK_TRIGGER_STAT_ACTIVE
:
{
smaDebug
(
"vgId:%d, fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is active"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
tdRefSmaStat
(
pSma
,
(
SSmaStat
*
)
pStat
);
SSDataBlock
dataBlock
=
{.
info
.
type
=
STREAM_GET_ALL
};
qSetStreamInput
(
pItem
->
taskInfo
,
&
dataBlock
,
STREAM_INPUT__DATA_BLOCK
,
false
);
tdFetchAndSubmitRSmaResult
(
pItem
,
STREAM_INPUT__DATA_BLOCK
);
tdUnRefSmaStat
(
pSma
,
(
SSmaStat
*
)
pStat
);
}
break
;
case
TASK_TRIGGER_STAT_PAUSED
:
{
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is paused"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
}
break
;
case
TASK_TRIGGER_STAT_INACTIVE
:
{
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is inactive"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
}
break
;
case
TASK_TRIGGER_STAT_INIT
:
{
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is init"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
}
break
;
default:
{
smaWarn
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is unknown"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
}
break
;
}
_end:
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pItem
->
refId
,
__func__
,
__LINE__
);
}
static
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
pItem
,
tb_uid_t
suid
,
int8_t
level
)
{
static
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
pItem
,
STSchema
*
pTSchema
,
tb_uid_t
suid
,
int8_t
level
)
{
if
(
!
pItem
||
!
pItem
->
taskInfo
)
{
smaDebug
(
"vgId:%d, no qTaskInfo to execute rsma %"
PRIi8
" task for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
level
,
suid
);
return
TSDB_CODE_SUCCESS
;
}
if
(
!
pTSchema
)
{
smaWarn
(
"vgId:%d, no schema to execute rsma %"
PRIi8
" task for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
level
,
suid
);
return
TSDB_CODE_FAILED
;
}
smaDebug
(
"vgId:%d, execute rsma %"
PRIi8
" task for qTaskInfo:%p suid:%"
PRIu64
,
SMA_VID
(
pSma
),
level
,
pItem
->
taskInfo
,
suid
);
...
...
@@ -687,14 +619,14 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
return
TSDB_CODE_FAILED
;
}
tdFetchAndSubmitRSmaResult
(
pItem
,
STREAM_INPUT__DATA_SUBMIT
);
atomic_store_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
);
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pStat
=
SMA_RSMA_STAT
(
pEnv
->
pStat
);
if
(
pStat
->
tmrHandle
)
{
taosTmrReset
(
tdRSmaFetchTrigger
,
pItem
->
maxDelay
,
pItem
,
pStat
->
tmrHandle
,
&
pItem
->
tmrId
);
tdRSmaFetchAndSubmitResult
(
pItem
,
pTSchema
,
suid
,
pStat
,
STREAM_INPUT__DATA_SUBMIT
);
atomic_store_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
);
if
(
smaMgmt
.
tmrHandle
)
{
taosTmrReset
(
tdRSmaFetchTrigger
,
pItem
->
maxDelay
,
pItem
,
smaMgmt
.
tmrHandle
,
&
pItem
->
tmrId
);
}
else
{
ASSERT
(
0
);
}
...
...
@@ -702,19 +634,29 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tdExecuteRSma
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
tb_uid_t
suid
)
{
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
static
SRSmaInfo
*
tdGetRSmaInfoBySuid
(
SSma
*
pSma
,
int64_t
suid
)
{
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pStat
=
NULL
;
if
(
!
pEnv
)
{
// only applicable when rsma env exists
return
TSDB_CODE_SUCCESS
;
return
NULL
;
}
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
if
(
!
pStat
||
!
RSMA_INFO_HASH
(
pStat
))
{
return
NULL
;
}
SRSmaInfo
*
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
return
NULL
;
}
return
pRSmaInfo
;
}
static
int32_t
tdExecuteRSma
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
tb_uid_t
suid
)
{
SRSmaInfo
*
pRSmaInfo
=
tdGetRSmaInfoBySuid
(
pSma
,
suid
);
if
(
!
pRSmaInfo
)
{
smaDebug
(
"vgId:%d, return as no rsma info for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
suid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -725,8 +667,8 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
}
if
(
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
&
pRSmaInfo
->
items
[
0
],
suid
,
TSDB_RETENTION_L1
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
&
pRSmaInfo
->
items
[
1
],
suid
,
TSDB_RETENTION_L2
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
&
pRSmaInfo
->
items
[
0
],
pRSmaInfo
->
pTSchema
,
suid
,
TSDB_RETENTION_L1
);
tdExecuteRSmaImpl
(
pSma
,
pMsg
,
inputType
,
&
pRSmaInfo
->
items
[
1
],
pRSmaInfo
->
pTSchema
,
suid
,
TSDB_RETENTION_L2
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -939,13 +881,11 @@ _err:
}
static
int32_t
tdRSmaQTaskInfoItemRestore
(
SSma
*
pSma
,
const
SRSmaQTaskInfoItem
*
pItem
)
{
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
((
SSmaEnv
*
)
pSma
->
pRSmaEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
void
*
qTaskInfo
=
NULL
;
pRSmaInfo
=
taosHashGet
(
RSMA_INFO_HASH
(
pStat
),
&
pItem
->
suid
,
sizeof
(
pItem
->
suid
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
pRSmaInfo
=
tdGetRSmaInfoBySuid
(
pSma
,
pItem
->
suid
);
if
(
!
pRSmaInfo
)
{
smaDebug
(
"vgId:%d, no restore as no rsma info for table:%"
PRIu64
,
SMA_VID
(
pSma
),
pItem
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1350,3 +1290,79 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
}
taosReleaseRef
(
smaMgmt
.
rsetId
,
rsmaStat
->
refId
);
}
/**
* @brief trigger to get rsma result
*
* @param param
* @param tmrId
*/
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaInfoItem
*
pItem
=
param
;
SSma
*
pSma
=
NULL
;
SRSmaStat
*
pStat
=
(
SRSmaStat
*
)
tdAcquireSmaRef
(
smaMgmt
.
rsetId
,
pItem
->
refId
,
__func__
,
__LINE__
);
if
(
!
pStat
)
{
smaDebug
(
"rsma fetch task not start since already destroyed, rsetId rsetId:%"
PRIi64
" refId:%d)"
,
smaMgmt
.
rsetId
,
pItem
->
refId
);
return
;
}
pSma
=
pStat
->
pSma
;
// if rsma trigger stat in paused, cancelled or finished, not start fetch task
int8_t
rsmaTriggerStat
=
atomic_load_8
(
RSMA_TRIGGER_STAT
(
pStat
));
switch
(
rsmaTriggerStat
)
{
case
TASK_TRIGGER_STAT_PAUSED
:
case
TASK_TRIGGER_STAT_CANCELLED
:
case
TASK_TRIGGER_STAT_FINISHED
:
{
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pItem
->
refId
,
__func__
,
__LINE__
);
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data since stat is %"
PRIi8
", rsetId rsetId:%"
PRIi64
" refId:%d"
,
SMA_VID
(
pSma
),
pItem
->
level
,
rsmaTriggerStat
,
smaMgmt
.
rsetId
,
pItem
->
refId
);
return
;
}
default:
break
;
}
SRSmaInfo
*
pRSmaInfo
=
tdGetRSmaInfoByItem
(
pItem
);
ASSERT
(
pRSmaInfo
->
suid
>
0
);
int8_t
fetchTriggerStat
=
atomic_val_compare_exchange_8
(
&
pItem
->
triggerStat
,
TASK_TRIGGER_STAT_ACTIVE
,
TASK_TRIGGER_STAT_INACTIVE
);
switch
(
fetchTriggerStat
)
{
case
TASK_TRIGGER_STAT_ACTIVE
:
{
smaDebug
(
"vgId:%d, fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is active"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
tdRefSmaStat
(
pSma
,
(
SSmaStat
*
)
pStat
);
SSDataBlock
dataBlock
=
{.
info
.
type
=
STREAM_GET_ALL
};
qSetStreamInput
(
pItem
->
taskInfo
,
&
dataBlock
,
STREAM_INPUT__DATA_BLOCK
,
false
);
tdRSmaFetchAndSubmitResult
(
pItem
,
pRSmaInfo
->
pTSchema
,
pRSmaInfo
->
suid
,
pStat
,
STREAM_INPUT__DATA_BLOCK
);
tdUnRefSmaStat
(
pSma
,
(
SSmaStat
*
)
pStat
);
}
break
;
case
TASK_TRIGGER_STAT_PAUSED
:
{
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is paused"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
case
TASK_TRIGGER_STAT_INACTIVE
:
{
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is inactive"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
case
TASK_TRIGGER_STAT_INIT
:
{
smaDebug
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is init"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
default:
{
smaWarn
(
"vgId:%d, not fetch rsma level %"
PRIi8
" data for table:%"
PRIi64
" since stat is unknown"
,
SMA_VID
(
pSma
),
pItem
->
level
,
pRSmaInfo
->
suid
);
}
break
;
}
_end:
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pItem
->
refId
,
__func__
,
__LINE__
);
}
source/dnode/vnode/src/tq/tq.c
浏览文件 @
53a13db8
...
...
@@ -271,6 +271,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqDebug
(
"tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s"
,
consumerId
,
pReq
->
epoch
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
),
buf
);
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pReq
,
pHandle
->
execHandle
.
subType
);
// 2.reset offset if needed
if
(
reqOffset
.
type
>
0
)
{
fetchOffsetNew
=
reqOffset
;
...
...
@@ -294,41 +297,25 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqOffsetResetToLog
(
&
fetchOffsetNew
,
walGetFirstVer
(
pTq
->
pVnode
->
pWal
));
}
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_LATEST
)
{
tqOffsetResetToLog
(
&
fetchOffsetNew
,
walGetLastVer
(
pTq
->
pVnode
->
pWal
));
tqOffsetResetToLog
(
&
dataRsp
.
rspOffset
,
walGetLastVer
(
pTq
->
pVnode
->
pWal
));
tqDebug
(
"tmq poll: consumer %ld, subkey %s, offset reset to %ld"
,
consumerId
,
pHandle
->
subKey
,
fetchOffsetNew
.
version
);
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pReq
,
pHandle
->
execHandle
.
subType
);
dataRsp
.
rspOffset
=
fetchOffsetNew
;
code
=
0
;
dataRsp
.
rspOffset
.
version
);
if
(
tqSendDataRsp
(
pTq
,
pMsg
,
pReq
,
&
dataRsp
)
<
0
)
{
code
=
-
1
;
}
taosArrayDestroy
(
dataRsp
.
blockDataLen
);
taosArrayDestroyP
(
dataRsp
.
blockData
,
(
FDelete
)
taosMemoryFree
);
if
(
dataRsp
.
withSchema
)
{
taosArrayDestroyP
(
dataRsp
.
blockSchema
,
(
FDelete
)
tDeleteSSchemaWrapper
);
}
if
(
dataRsp
.
withTbName
)
{
taosArrayDestroyP
(
dataRsp
.
blockTbName
,
(
FDelete
)
taosMemoryFree
);
}
return
code
;
goto
OVER
;
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_NONE
)
{
tqError
(
"tmq poll: subkey %s, no offset committed for consumer %"
PRId64
" in vg %d, subkey %s, reset none failed"
,
pHandle
->
subKey
,
consumerId
,
TD_VID
(
pTq
->
pVnode
),
pReq
->
subKey
);
terrno
=
TSDB_CODE_TQ_NO_COMMITTED_OFFSET
;
return
-
1
;
code
=
-
1
;
goto
OVER
;
}
}
}
// 3.query
SMqDataRsp
dataRsp
=
{
0
};
tqInitDataRsp
(
&
dataRsp
,
pReq
,
pHandle
->
execHandle
.
subType
);
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
&&
fetchOffsetNew
.
type
==
TMQ_OFFSET__LOG
)
{
fetchOffsetNew
.
version
++
;
if
(
tqScanLog
(
pTq
,
&
pHandle
->
execHandle
,
&
dataRsp
,
&
fetchOffsetNew
)
<
0
)
{
...
...
@@ -337,7 +324,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
goto
OVER
;
}
if
(
dataRsp
.
blockNum
==
0
)
{
// TODO add to async task
// TODO add to async task
pool
/*dataRsp.rspOffset.version--;*/
}
if
(
tqSendDataRsp
(
pTq
,
pMsg
,
pReq
,
&
dataRsp
)
<
0
)
{
...
...
@@ -350,7 +337,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int64_t
fetchVer
=
fetchOffsetNew
.
version
+
1
;
SWalCkHead
*
pCkHead
=
taosMemoryMalloc
(
sizeof
(
SWalCkHead
)
+
2048
);
if
(
pCkHead
==
NULL
)
{
return
-
1
;
code
=
-
1
;
goto
OVER
;
}
walSetReaderCapacity
(
pHandle
->
pWalReader
,
2048
);
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
53a13db8
...
...
@@ -62,7 +62,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
int64_t
tqScanLog
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
)
{
qTaskInfo_t
task
=
pExec
->
execCol
.
task
[
0
];
if
(
qStreamPrepareScan
1
(
task
,
pOffset
)
<
0
)
{
if
(
qStreamPrepareScan
(
task
,
pOffset
)
<
0
)
{
pRsp
->
rspOffset
=
*
pOffset
;
pRsp
->
rspOffset
.
version
--
;
return
0
;
...
...
@@ -110,10 +110,6 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
ASSERT
(
pExec
->
subType
==
TOPIC_SUB_TYPE__COLUMN
);
qTaskInfo_t
task
=
pExec
->
execCol
.
task
[
workerId
];
/*if (qStreamScanSnapshot(task) < 0) {*/
/*ASSERT(0);*/
/*}*/
if
(
qStreamPrepareTsdbScan
(
task
,
offset
.
uid
,
offset
.
ts
)
<
0
)
{
ASSERT
(
0
);
}
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
53a13db8
...
...
@@ -22,8 +22,8 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
while
(
1
)
{
if
(
walFetchHead
(
pHandle
->
pWalReader
,
offset
,
*
ppCkHead
)
<
0
)
{
tqDebug
(
"tmq poll: consumer:%"
PRId64
", (epoch %d) vgId:%d offset %"
PRId64
", no more log to return"
,
pHandle
->
consumerId
,
pHandle
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
offset
);
tqDebug
(
"tmq poll: consumer:%"
PRId64
", (epoch %d) vgId:%d offset %"
PRId64
", no more log to return"
,
pHandle
->
consumerId
,
pHandle
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
offset
);
*
fetchOffset
=
offset
-
1
;
code
=
-
1
;
goto
END
;
...
...
@@ -104,8 +104,13 @@ void tqCloseReader(STqReader* pReader) {
}
int32_t
tqSeekVer
(
STqReader
*
pReader
,
int64_t
ver
)
{
//
return
walReadSeekVer
(
pReader
->
pWalReader
,
ver
);
if
(
walReadSeekVer
(
pReader
->
pWalReader
,
ver
)
<
0
)
{
ASSERT
(
pReader
->
pWalReader
->
curInvalid
);
ASSERT
(
pReader
->
pWalReader
->
curVersion
==
ver
);
return
-
1
;
}
ASSERT
(
pReader
->
pWalReader
->
curVersion
==
ver
);
return
0
;
}
int32_t
tqNextBlock
(
STqReader
*
pReader
,
SFetchRet
*
ret
)
{
...
...
@@ -114,9 +119,11 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while
(
1
)
{
if
(
!
fromProcessedMsg
)
{
if
(
walNextValidMsg
(
pReader
->
pWalReader
)
<
0
)
{
pReader
->
ver
=
pReader
->
pWalReader
->
curVersion
-
pReader
->
pWalReader
->
curInvalid
;
ret
->
offset
.
type
=
TMQ_OFFSET__LOG
;
ret
->
offset
.
version
=
pReader
->
ver
;
ret
->
fetchType
=
FETCH_TYPE__NONE
;
ASSERT
(
ret
->
offset
.
version
>=
0
);
return
-
1
;
}
void
*
body
=
pReader
->
pWalReader
->
pHead
->
head
.
body
;
...
...
@@ -131,19 +138,12 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
}
while
(
tqNextDataBlock
(
pReader
))
{
// TODO mem free
memset
(
&
ret
->
data
,
0
,
sizeof
(
SSDataBlock
));
int32_t
code
=
tqRetrieveDataBlock
(
&
ret
->
data
,
pReader
);
if
(
code
!=
0
||
ret
->
data
.
info
.
rows
==
0
)
{
ASSERT
(
0
);
continue
;
#if 0
if (fromProcessedMsg) {
ret->fetchType = FETCH_TYPE__NONE;
return 0;
} else {
break;
}
#endif
}
ret
->
fetchType
=
FETCH_TYPE__DATA
;
return
0
;
...
...
@@ -152,7 +152,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
if
(
fromProcessedMsg
)
{
ret
->
offset
.
type
=
TMQ_OFFSET__LOG
;
ret
->
offset
.
version
=
pReader
->
ver
;
ASSERT
(
pReader
->
ver
!=
-
1
);
ASSERT
(
pReader
->
ver
>=
0
);
ret
->
fetchType
=
FETCH_TYPE__NONE
;
return
0
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
53a13db8
...
...
@@ -469,11 +469,24 @@ static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **
return
code
;
}
static
int32_t
vnodeSnapshotStartWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
**
ppWriter
)
{
return
0
;
}
static
int32_t
vnodeSnapshotStartWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
**
ppWriter
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
SSnapshotParam
*
pSnapshotParam
=
pParam
;
int32_t
code
=
vnodeSnapWriterOpen
(
pVnode
,
pSnapshotParam
->
start
,
pSnapshotParam
->
end
,
(
SVSnapWriter
**
)
ppWriter
);
return
code
;
}
static
int32_t
vnodeSnapshotStopWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
bool
isApply
)
{
return
0
;
}
static
int32_t
vnodeSnapshotStopWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
bool
isApply
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
int32_t
code
=
vnodeSnapWriterClose
(
pWriter
,
isApply
);
return
code
;
}
static
int32_t
vnodeSnapshotDoWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
void
*
pBuf
,
int32_t
len
)
{
return
0
;
}
static
int32_t
vnodeSnapshotDoWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
void
*
pBuf
,
int32_t
len
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
int32_t
code
=
vnodeSnapWrite
(
pWriter
,
pBuf
,
len
);
return
code
;
}
static
SSyncFSM
*
vnodeSyncMakeFsm
(
SVnode
*
pVnode
)
{
SSyncFSM
*
pFsm
=
taosMemoryCalloc
(
1
,
sizeof
(
SSyncFSM
));
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
53a13db8
...
...
@@ -280,7 +280,7 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
return
0
;
}
int32_t
qStreamPrepareScan
1
(
qTaskInfo_t
tinfo
,
const
STqOffsetVal
*
pOffset
)
{
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
const
STqOffsetVal
*
pOffset
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
);
...
...
@@ -293,8 +293,55 @@ int32_t qStreamPrepareScan1(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
pOperator
->
status
=
OP_OPENED
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pOffset
->
version
)
<
0
)
{
return
-
1
;
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pOffset
->
version
)
<
0
)
{
return
-
1
;
}
ASSERT
(
pInfo
->
tqReader
->
pWalReader
->
curVersion
==
pOffset
->
version
);
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
pInfo
->
blockType
=
STREAM_INPUT__TABLE_SCAN
;
int64_t
uid
=
pOffset
->
uid
;
int64_t
ts
=
pOffset
->
ts
;
if
(
uid
==
0
)
{
if
(
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTableList
)
!=
0
)
{
STableKeyInfo
*
pTableInfo
=
taosArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
0
);
uid
=
pTableInfo
->
uid
;
ts
=
INT64_MIN
;
}
}
if
(
pTaskInfo
->
streamInfo
.
lastStatus
.
type
!=
TMQ_OFFSET__SNAPSHOT_DATA
||
pTaskInfo
->
streamInfo
.
lastStatus
.
uid
!=
uid
||
pTaskInfo
->
streamInfo
.
lastStatus
.
ts
!=
ts
)
{
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
int32_t
tableSz
=
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTableList
);
bool
found
=
false
;
for
(
int32_t
i
=
0
;
i
<
tableSz
;
i
++
)
{
STableKeyInfo
*
pTableInfo
=
taosArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
i
);
if
(
pTableInfo
->
uid
==
uid
)
{
found
=
true
;
pTableScanInfo
->
currentTable
=
i
;
}
}
// TODO after dropping table, table may be not found
ASSERT
(
found
);
tsdbSetTableId
(
pTableScanInfo
->
dataReader
,
uid
);
int64_t
oldSkey
=
pTableScanInfo
->
cond
.
twindows
[
0
].
skey
;
pTableScanInfo
->
cond
.
twindows
[
0
].
skey
=
ts
+
1
;
tsdbReaderReset
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
,
0
);
pTableScanInfo
->
cond
.
twindows
[
0
].
skey
=
oldSkey
;
pTableScanInfo
->
scanTimes
=
0
;
pTableScanInfo
->
curTWinIdx
=
0
;
qDebug
(
"tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d"
,
uid
,
ts
,
pTableScanInfo
->
currentTable
,
tableSz
);
}
else
{
// switch to log
}
}
else
{
ASSERT
(
0
);
}
return
0
;
}
else
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
53a13db8
...
...
@@ -40,8 +40,8 @@ static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta*
const
char
*
dbName
);
static
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
);
static
bool
processBlockWithProbability
(
const
SSampleExecInfo
*
pInfo
);
SSDataBlock
*
pBlock
,
const
char
*
idStr
);
static
bool
processBlockWithProbability
(
const
SSampleExecInfo
*
pInfo
);
bool
processBlockWithProbability
(
const
SSampleExecInfo
*
pInfo
)
{
#if 0
...
...
@@ -265,7 +265,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
SExprSupp
*
pSup
=
&
pTableScanInfo
->
pseudoSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -303,7 +304,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
}
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
)
{
SSDataBlock
*
pBlock
,
const
char
*
idStr
)
{
// currently only the tbname pseudo column
if
(
numOfPseudoExpr
==
0
)
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -313,7 +314,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
metaReaderInit
(
&
mr
,
pHandle
->
meta
,
0
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
pBlock
->
info
.
uid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
pBlock
->
info
.
uid
,
tstrerror
(
terrno
),
idStr
);
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
pBlock
->
info
.
uid
,
tstrerror
(
terrno
),
idStr
);
metaReaderClear
(
&
mr
);
return
terrno
;
}
...
...
@@ -697,7 +698,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con
metaReaderInit
(
&
mr
,
pMeta
,
0
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
uid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
uid
,
tstrerror
(
terrno
),
idstr
);
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
uid
,
tstrerror
(
terrno
),
idstr
);
metaReaderClear
(
&
mr
);
return
terrno
;
}
...
...
@@ -711,7 +712,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con
uint64_t
suid
=
mr
.
me
.
ctbEntry
.
suid
;
code
=
metaGetTableEntryByUid
(
&
mr
,
suid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
suid
,
tstrerror
(
terrno
),
idstr
);
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
suid
,
tstrerror
(
terrno
),
idstr
);
metaReaderClear
(
&
mr
);
return
terrno
;
}
...
...
@@ -738,12 +739,13 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
}
SBlockDistInfo
*
pBlockScanInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STableBlockDistInfo
blockDistInfo
=
{.
minRows
=
INT_MAX
,
.
maxRows
=
INT_MIN
};
int32_t
code
=
doGetTableRowSize
(
pBlockScanInfo
->
readHandle
.
meta
,
pBlockScanInfo
->
uid
,
&
blockDistInfo
.
rowSize
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
doGetTableRowSize
(
pBlockScanInfo
->
readHandle
.
meta
,
pBlockScanInfo
->
uid
,
&
blockDistInfo
.
rowSize
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
longjmp
(
pTaskInfo
->
env
,
code
);
}
tsdbGetFileBlocksDistInfo
(
pBlockScanInfo
->
pHandle
,
&
blockDistInfo
);
...
...
@@ -938,7 +940,8 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
setGroupId
(
pInfo
,
pSDB
,
GROUPID_COLUMN_INDEX
,
*
pRowIndex
);
(
*
pRowIndex
)
+=
updateSessionWindowInfo
(
pCurWin
,
tsCols
,
NULL
,
pSDB
->
info
.
rows
,
*
pRowIndex
,
gap
,
NULL
);
}
else
{
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
tsCols
[
*
pRowIndex
],
&
pInfo
->
interval
,
pInfo
->
interval
.
precision
,
TSDB_ORDER_ASC
);
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
tsCols
[
*
pRowIndex
],
&
pInfo
->
interval
,
pInfo
->
interval
.
precision
,
TSDB_ORDER_ASC
);
setGroupId
(
pInfo
,
pSDB
,
GROUPID_COLUMN_INDEX
,
*
pRowIndex
);
(
*
pRowIndex
)
+=
getNumOfRowsInTimeWindow
(
&
pSDB
->
info
,
tsCols
,
*
pRowIndex
,
win
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
...
...
@@ -1219,7 +1222,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column
if
(
pInfo
->
numOfPseudoExpr
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -1264,17 +1268,21 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo
->
streamInfo
.
metaBlk
=
ret
.
meta
;
return
NULL
;
}
else
if
(
ret
.
fetchType
==
FETCH_TYPE__NONE
)
{
if
(
ret
.
offset
.
version
==
-
1
)
{
pTaskInfo
->
streamInfo
.
lastStatus
.
type
=
TMQ_OFFSET__LOG
;
pTaskInfo
->
streamInfo
.
lastStatus
.
version
=
pTaskInfo
->
streamInfo
.
prepareStatus
.
version
-
1
;
}
else
{
pTaskInfo
->
streamInfo
.
lastStatus
=
ret
.
offset
;
}
/*if (ret.offset.version == -1) {*/
/*pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;*/
/*pTaskInfo->streamInfo.lastStatus.version = pTaskInfo->streamInfo.prepareStatus.version - 1;*/
/*} else {*/
pTaskInfo
->
streamInfo
.
lastStatus
=
ret
.
offset
;
ASSERT
(
pTaskInfo
->
streamInfo
.
lastStatus
.
version
+
1
>=
pTaskInfo
->
streamInfo
.
prepareStatus
.
version
);
/*}*/
return
NULL
;
}
else
{
ASSERT
(
0
);
}
}
}
else
if
(
pTaskInfo
->
streamInfo
.
prepareStatus
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
pTableScanOp
);
return
pResult
&&
pResult
->
info
.
rows
>
0
?
pResult
:
NULL
;
}
size_t
total
=
taosArrayGetSize
(
pInfo
->
pBlockLists
);
...
...
@@ -1443,7 +1451,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// currently only the tbname pseudo column
if
(
pInfo
->
numOfPseudoExpr
>
0
)
{
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -1481,6 +1490,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return
(
pBlockInfo
->
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
else
if
(
pInfo
->
blockType
==
STREAM_INPUT__TABLE_SCAN
)
{
/*ASSERT(0);*/
// check reader last status
// if not match, reset status
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
pTableScanOp
);
...
...
@@ -1873,9 +1883,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
metaReaderInit
(
&
mr
,
pInfo
->
readHandle
.
meta
,
0
);
uint64_t
suid
=
pInfo
->
pCur
->
mr
.
me
.
ctbEntry
.
suid
;
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
suid
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
suid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get super table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
suid
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
qError
(
"failed to get super table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
suid
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
metaReaderClear
(
&
mr
);
metaCloseTbCursor
(
pInfo
->
pCur
);
pInfo
->
pCur
=
NULL
;
...
...
@@ -2275,9 +2286,10 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
while
(
pInfo
->
curPos
<
size
&&
count
<
pOperator
->
resultInfo
.
capacity
)
{
STableKeyInfo
*
item
=
taosArrayGet
(
pInfo
->
pTableList
->
pTableList
,
pInfo
->
curPos
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
item
->
uid
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
item
->
uid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
item
->
uid
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
qError
(
"failed to get table meta, uid:0x%"
PRIx64
", code:%s, %s"
,
item
->
uid
,
tstrerror
(
terrno
),
GET_TASKID
(
pTaskInfo
));
metaReaderClear
(
&
mr
);
longjmp
(
pTaskInfo
->
env
,
terrno
);
}
...
...
@@ -2565,8 +2577,8 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
numOfPseudoExpr
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
pPseudoExpr
,
pTableScanInfo
->
numOfPseudoExpr
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
pPseudoExpr
,
pTableScanInfo
->
numOfPseudoExpr
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
source/libs/qworker/src/qwDbg.c
浏览文件 @
53a13db8
...
...
@@ -9,7 +9,7 @@
#include "tmsg.h"
#include "tname.h"
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
false
,
.
tmp
=
fals
e
};
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
false
,
.
tmp
=
tru
e
};
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
)
{
if
(
!
gQWDebug
.
statusEnable
)
{
...
...
source/libs/scheduler/inc/schInt.h
浏览文件 @
53a13db8
...
...
@@ -35,7 +35,6 @@ extern "C" {
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_TASK_MAX_EXEC_TIMES 8
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
enum
{
...
...
@@ -179,10 +178,10 @@ typedef struct SSchLevel {
}
SSchLevel
;
typedef
struct
SSchTaskProfile
{
int64_t
startTs
;
int64_t
execUseTime
[
SCH_TASK_MAX_EXEC_TIMES
]
;
int64_t
waitTime
;
int64_t
endTs
;
int64_t
startTs
;
int64_t
*
execTime
;
int64_t
waitTime
;
int64_t
endTs
;
}
SSchTaskProfile
;
typedef
struct
SSchTask
{
...
...
@@ -260,33 +259,7 @@ typedef struct SSchJob {
extern
SSchedulerMgmt
schMgmt
;
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us; \
if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \
} \
} while (0)
#define SCH_LOG_TASK_WAIT_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.waitTime += us - (_task)->profile.execUseTime[idx]; \
} while (0)
#define SCH_LOG_TASK_END_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us - (_task)->profile.execUseTime[idx]; \
(_task)->profile.endTs = us; \
} while (0)
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execId % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec)
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execTime[(_task)->execId % (_task)->maxExecTimes]) > (_task)->timeoutUsec)
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
...
...
@@ -320,6 +293,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_TASK_MAX_EXEC_TIMES(_levelIdx, _levelNum) (SCH_MAX_CANDIDATE_EP_NUM * ((_levelNum) - (_levelIdx)))
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
...
...
@@ -328,16 +302,43 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_
SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0
))
#define SCH_
NEED_
REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_
NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen
)))
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_
NEED_
REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_
MERGE_TASK_NETWORK_ERR(_task, _code, _len) (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task))
))
#define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_
TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) (SCH_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen)
)))
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
#define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps)
#define SCH_TASK_NUM_OF_EPS(_addr) ((_addr)->epSet.numOfEps)
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
(_task)->profile.execTime[idx] = us; \
if (0 == (_task)->execId) { \
(_task)->profile.startTs = us; \
} \
} while (0)
#define SCH_LOG_TASK_WAIT_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
(_task)->profile.waitTime += us - (_task)->profile.execTime[idx]; \
} while (0)
#define SCH_LOG_TASK_END_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
(_task)->profile.execTime[idx] = us - (_task)->profile.execTime[idx]; \
(_task)->profile.endTs = us; \
} while (0)
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
...
...
@@ -431,7 +432,8 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask);
void
schDropTaskInHashList
(
SSchJob
*
pJob
,
SHashObj
*
list
);
int32_t
schLaunchLevelTasks
(
SSchJob
*
pJob
,
SSchLevel
*
level
);
int32_t
schGetTaskFromList
(
SHashObj
*
pTaskList
,
uint64_t
taskId
,
SSchTask
**
pTask
);
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
);
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
,
int32_t
levelNum
);
int32_t
schSwitchTaskCandidateAddr
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
#ifdef __cplusplus
...
...
source/libs/scheduler/src/schJob.c
浏览文件 @
53a13db8
...
...
@@ -337,7 +337,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_SET_JOB_TYPE
(
pJob
,
plan
->
subplanType
);
SSchTask
task
=
{
0
};
SCH_ERR_JRET
(
schInitTask
(
pJob
,
&
task
,
plan
,
pLevel
));
SCH_ERR_JRET
(
schInitTask
(
pJob
,
&
task
,
plan
,
pLevel
,
levelNum
));
SSchTask
*
pTask
=
taosArrayPush
(
pLevel
->
subTasks
,
&
task
);
if
(
NULL
==
pTask
)
{
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
53a13db8
...
...
@@ -85,7 +85,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET
(
schValidateRspMsgType
(
pJob
,
pTask
,
msgType
));
int32_t
reqType
=
IsReq
(
pMsg
)
?
pMsg
->
msgType
:
(
pMsg
->
msgType
-
1
);
if
(
SCH_
NEED_REDIRECT
(
reqType
,
rspCode
,
pMsg
->
len
))
{
if
(
SCH_
TASK_NEED_REDIRECT
(
pTask
,
reqType
,
rspCode
,
pMsg
->
len
))
{
SCH_RET
(
schHandleRedirect
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
));
}
...
...
source/libs/scheduler/src/schTask.c
浏览文件 @
53a13db8
...
...
@@ -46,21 +46,31 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
}
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
)
{
int32_t
schInitTask
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
SSubplan
*
pPlan
,
SSchLevel
*
pLevel
,
int32_t
levelNum
)
{
int32_t
code
=
0
;
pTask
->
plan
=
pPlan
;
pTask
->
level
=
pLevel
;
pTask
->
execId
=
-
1
;
pTask
->
maxExecTimes
=
SCH_TASK_MAX_EXEC_TIMES
;
pTask
->
maxExecTimes
=
SCH_TASK_MAX_EXEC_TIMES
(
pLevel
->
level
,
levelNum
)
;
pTask
->
timeoutUsec
=
SCH_DEFAULT_TASK_TIMEOUT_USEC
;
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_INIT
);
pTask
->
taskId
=
schGenTaskId
();
pTask
->
execNodes
=
taosHashInit
(
SCH_MAX_CANDIDATE_EP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
if
(
NULL
==
pTask
->
execNodes
)
{
SCH_TASK_ELOG
(
"taosHashInit %d execNodes failed"
,
SCH_MAX_CANDIDATE_EP_NUM
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
pTask
->
profile
.
execTime
=
taosMemoryCalloc
(
pTask
->
maxExecTimes
,
sizeof
(
int64_t
));
if
(
NULL
==
pTask
->
execNodes
||
NULL
==
pTask
->
profile
.
execTime
)
{
SCH_ERR_
J
RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_INIT
);
return
TSDB_CODE_SUCCESS
;
_return:
taosMemoryFreeClear
(
pTask
->
profile
.
execTime
);
taosHashCleanup
(
pTask
->
execNodes
);
SCH_RET
(
code
);
}
int32_t
schRecordTaskSucceedNode
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
...
...
@@ -338,6 +348,11 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
qClearSubplanExecutionNode
(
pTask
->
plan
);
// Note: current error task and upper level merge task
if
((
pData
&&
0
==
pData
->
len
)
||
NULL
==
pData
)
{
SCH_ERR_JRET
(
schSwitchTaskCandidateAddr
(
pJob
,
pTask
));
}
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_INIT
);
int32_t
childrenNum
=
taosArrayGetSize
(
pTask
->
children
);
...
...
@@ -531,10 +546,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
))
{
SCH_SWITCH_EPSET
(
&
pTask
->
plan
->
execNode
);
}
else
{
int32_t
candidateNum
=
taosArrayGetSize
(
pTask
->
candidateAddrs
);
if
(
++
pTask
->
candidateIdx
>=
candidateNum
)
{
pTask
->
candidateIdx
=
0
;
}
SCH_ERR_RET
(
schSwitchTaskCandidateAddr
(
pJob
,
pTask
));
}
SCH_ERR_RET
(
schLaunchTask
(
pJob
,
pTask
));
...
...
@@ -633,6 +645,16 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSe
return
TSDB_CODE_SUCCESS
;
}
int32_t
schSwitchTaskCandidateAddr
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int32_t
candidateNum
=
taosArrayGetSize
(
pTask
->
candidateAddrs
);
if
(
++
pTask
->
candidateIdx
>=
candidateNum
)
{
pTask
->
candidateIdx
=
0
;
}
SCH_TASK_DLOG
(
"switch task candiateIdx to %d"
,
pTask
->
candidateIdx
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schRemoveTaskFromExecList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int32_t
code
=
taosHashRemove
(
pJob
->
execTasks
,
&
pTask
->
taskId
,
sizeof
(
pTask
->
taskId
));
...
...
source/libs/wal/src/walRead.c
浏览文件 @
53a13db8
...
...
@@ -30,8 +30,9 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
pRead
->
pWal
=
pWal
;
pRead
->
pIdxFile
=
NULL
;
pRead
->
pLogFile
=
NULL
;
pRead
->
curVersion
=
-
1
;
pRead
->
curVersion
=
-
5
;
pRead
->
curFileFirstVer
=
-
1
;
pRead
->
curInvalid
=
1
;
pRead
->
capacity
=
0
;
if
(
cond
)
pRead
->
cond
=
*
cond
;
...
...
@@ -152,13 +153,17 @@ static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
int32_t
walReadSeekVer
(
SWalReader
*
pRead
,
int64_t
ver
)
{
SWal
*
pWal
=
pRead
->
pWal
;
if
(
ver
==
pRead
->
curVersion
)
{
if
(
!
pRead
->
curInvalid
&&
ver
==
pRead
->
curVersion
)
{
wDebug
(
"wal version %ld match, no need to reset"
,
ver
);
return
0
;
}
pRead
->
curInvalid
=
1
;
pRead
->
curVersion
=
ver
;
if
(
ver
>
pWal
->
vers
.
lastVer
||
ver
<
pWal
->
vers
.
firstVer
)
{
w
Error
(
"vgId:$d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
w
Debug
(
"vgId:%d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
...
...
@@ -171,13 +176,13 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
SWalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
pRet
!=
NULL
);
if
(
pRead
->
curFileFirstVer
!=
pRet
->
firstVer
)
{
// error code set inner
// error code
was
set inner
if
(
walReadChangeFile
(
pRead
,
pRet
->
firstVer
)
<
0
)
{
return
-
1
;
}
}
// error code set inner
// error code
was
set inner
if
(
walReadSeekFilePos
(
pRead
,
pRet
->
firstVer
,
ver
)
<
0
)
{
return
-
1
;
}
...
...
@@ -193,9 +198,11 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity
static
int32_t
walFetchHeadNew
(
SWalReader
*
pRead
,
int64_t
fetchVer
)
{
int64_t
contLen
;
if
(
pRead
->
curVersion
!=
fetchVer
)
{
if
(
pRead
->
cur
Invalid
||
pRead
->
cur
Version
!=
fetchVer
)
{
if
(
walReadSeekVer
(
pRead
,
fetchVer
)
<
0
)
{
ASSERT
(
0
);
pRead
->
curVersion
=
fetchVer
;
pRead
->
curInvalid
=
1
;
return
-
1
;
}
}
...
...
@@ -207,7 +214,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
ASSERT
(
0
);
pRead
->
cur
Version
=
-
1
;
pRead
->
cur
Invalid
=
1
;
return
-
1
;
}
return
0
;
...
...
@@ -238,7 +245,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
pRead
->
pWal
->
cfg
.
vgId
,
pRead
->
pHead
->
head
.
version
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
pRead
->
cur
Version
=
-
1
;
pRead
->
cur
Invalid
=
1
;
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -246,7 +253,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
if
(
pReadHead
->
version
!=
ver
)
{
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", read request index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
pRead
->
pHead
->
head
.
version
,
ver
);
pRead
->
cur
Version
=
-
1
;
pRead
->
cur
Invalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
0
);
return
-
1
;
...
...
@@ -254,7 +261,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
if
(
walValidBodyCksum
(
pRead
->
pHead
)
!=
0
)
{
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", since body checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
pRead
->
cur
Version
=
-
1
;
pRead
->
cur
Invalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
ASSERT
(
0
);
return
-
1
;
...
...
@@ -273,7 +280,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pRead
->
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
pRead
->
cur
Version
=
-
1
;
pRead
->
cur
Invalid
=
1
;
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -292,7 +299,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
return
-
1
;
}
if
(
pRead
->
curVersion
!=
ver
)
{
if
(
pRead
->
cur
Invalid
||
pRead
->
cur
Version
!=
ver
)
{
code
=
walReadSeekVer
(
pRead
,
ver
);
if
(
code
<
0
)
return
-
1
;
}
...
...
@@ -307,8 +314,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
code
=
walValidHeadCksum
(
pHead
);
if
(
code
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since head checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since head checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
...
...
@@ -324,7 +330,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
pRead
->
cur
Version
=
-
1
;
pRead
->
cur
Invalid
=
1
;
return
-
1
;
}
...
...
@@ -356,14 +362,14 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
if
(
pReadHead
->
version
!=
ver
)
{
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", read request index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
pRead
->
pHead
->
head
.
version
,
ver
);
pRead
->
cur
Version
=
-
1
;
pRead
->
cur
Invalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
if
(
walValidBodyCksum
(
*
ppHead
)
!=
0
)
{
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", since body checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
pRead
->
cur
Version
=
-
1
;
pRead
->
cur
Invalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
...
...
@@ -380,8 +386,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
return
-
1
;
}
// TODO: check wal life
if
(
pRead
->
curVersion
!=
ver
)
{
if
(
pRead
->
curInvalid
||
pRead
->
curVersion
!=
ver
)
{
if
(
walReadSeekVer
(
pRead
,
ver
)
<
0
)
{
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since %s"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
terrstr
());
return
-
1
;
...
...
@@ -389,8 +394,8 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
}
if
(
ver
>
pRead
->
pWal
->
vers
.
lastVer
||
ver
<
pRead
->
pWal
->
vers
.
firstVer
)
{
wError
(
"vgId:%d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
lastVer
);
wError
(
"vgId:%d, invalid index:%"
PRId64
", first index:%"
PRId64
", last index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
,
pRead
->
pWal
->
vers
.
firstVer
,
pRead
->
pWal
->
vers
.
lastVer
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
...
...
@@ -410,8 +415,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
code
=
walValidHeadCksum
(
pRead
->
pHead
);
if
(
code
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since head checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since head checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
...
...
@@ -440,16 +444,15 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
if
(
pRead
->
pHead
->
head
.
version
!=
ver
)
{
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", read request index:%"
PRId64
,
pRead
->
pWal
->
cfg
.
vgId
,
pRead
->
pHead
->
head
.
version
,
ver
);
pRead
->
cur
Version
=
-
1
;
pRead
->
cur
Invalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
code
=
walValidBodyCksum
(
pRead
->
pHead
);
if
(
code
!=
0
)
{
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since body checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
pRead
->
curVersion
=
-
1
;
wError
(
"vgId:%d, unexpected wal log index:%"
PRId64
", since body checksum not passed"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
pRead
->
curInvalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
...
...
source/util/src/terror.c
浏览文件 @
53a13db8
...
...
@@ -251,6 +251,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREADY_EXIST, "Column already exists
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_COLUMN_NOT_EXIST
,
"Column does not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC
,
"Field used by topic"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SINGLE_STB_MODE_DB
,
"Database is single stable mode"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_SCHEMA_VER
,
"Invalid schema version while alter stb"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_STABLE_UID_NOT_MATCH
,
"Invalid stable uid while alter stb"
)
// mnode-infoSchema
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_SYS_TABLENAME
,
"Invalid system table name"
)
...
...
tests/test/c/sdbDump.c
浏览文件 @
53a13db8
...
...
@@ -24,6 +24,7 @@
#define TMP_MNODE_DIR TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode"
#define TMP_SDB_DATA_DIR TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "data"
#define TMP_SDB_SYNC_DIR TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "sync"
#define TMP_SDB_MNODE_JSON TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "mnode.json"
#define TMP_SDB_DATA_FILE TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "data" TD_DIRSEP "sdb.data"
#define TMP_SDB_RAFT_CFG_FILE TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "sync" TD_DIRSEP "raft_config.json"
#define TMP_SDB_RAFT_STORE_FILE TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "sync" TD_DIRSEP "raft_store.json"
...
...
@@ -412,9 +413,11 @@ int32_t parseArgs(int32_t argc, char *argv[]) {
return
-
1
;
}
char
mnodeJson
[
PATH_MAX
]
=
{
0
};
char
dataFile
[
PATH_MAX
]
=
{
0
};
char
raftCfgFile
[
PATH_MAX
]
=
{
0
};
char
raftStoreFile
[
PATH_MAX
]
=
{
0
};
snprintf
(
mnodeJson
,
PATH_MAX
,
"%s"
TD_DIRSEP
"mnode"
TD_DIRSEP
"mnode.json"
,
tsDataDir
);
snprintf
(
dataFile
,
PATH_MAX
,
"%s"
TD_DIRSEP
"mnode"
TD_DIRSEP
"data"
TD_DIRSEP
"sdb.data"
,
tsDataDir
);
snprintf
(
raftCfgFile
,
PATH_MAX
,
"%s"
TD_DIRSEP
"mnode"
TD_DIRSEP
"sync"
TD_DIRSEP
"raft_config.json"
,
tsDataDir
);
snprintf
(
raftStoreFile
,
PATH_MAX
,
"%s"
TD_DIRSEP
"mnode"
TD_DIRSEP
"sync"
TD_DIRSEP
"raft_store.json"
,
tsDataDir
);
...
...
@@ -425,6 +428,8 @@ int32_t parseArgs(int32_t argc, char *argv[]) {
#ifdef WINDOWS
taosMulMkDir
(
TMP_SDB_DATA_DIR
);
taosMulMkDir
(
TMP_SDB_SYNC_DIR
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"cp %s %s 2>nul"
,
mnodeJson
,
TMP_SDB_MNODE_JSON
);
system
(
cmd
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"cp %s %s 2>nul"
,
dataFile
,
TMP_SDB_DATA_FILE
);
system
(
cmd
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"cp %s %s 2>nul"
,
raftCfgFile
,
TMP_SDB_RAFT_CFG_FILE
);
...
...
@@ -436,6 +441,8 @@ int32_t parseArgs(int32_t argc, char *argv[]) {
system
(
cmd
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"mkdir -p %s"
,
TMP_SDB_SYNC_DIR
);
system
(
cmd
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"cp %s %s 2>/dev/null"
,
mnodeJson
,
TMP_SDB_MNODE_JSON
);
system
(
cmd
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"cp %s %s 2>/dev/null"
,
dataFile
,
TMP_SDB_DATA_FILE
);
system
(
cmd
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"cp %s %s 2>/dev/null"
,
raftCfgFile
,
TMP_SDB_RAFT_CFG_FILE
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录