Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
de4571fb
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
de4571fb
编写于
12月 07, 2022
作者:
G
Ganlin Zhao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into fix/TD-19481
上级
387983f6
88c76fe0
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
393 addition
and
116 deletion
+393
-116
include/libs/sync/sync.h
include/libs/sync/sync.h
+1
-1
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+6
-0
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+3
-4
source/common/src/tglobal.c
source/common/src/tglobal.c
+1
-1
source/dnode/mgmt/node_mgmt/src/dmTransport.c
source/dnode/mgmt/node_mgmt/src/dmTransport.c
+9
-0
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+2
-2
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+51
-24
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+66
-12
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+4
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+43
-19
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+1
-1
source/libs/scheduler/inc/schInt.h
source/libs/scheduler/inc/schInt.h
+1
-1
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+2
-0
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+1
-6
source/libs/sync/inc/syncMessage.h
source/libs/sync/inc/syncMessage.h
+12
-0
source/libs/sync/src/syncTimeout.c
source/libs/sync/src/syncTimeout.c
+1
-2
source/libs/transport/inc/transportInt.h
source/libs/transport/inc/transportInt.h
+4
-0
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+4
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+59
-3
source/os/src/osTime.c
source/os/src/osTime.c
+1
-1
source/util/src/tlog.c
source/util/src/tlog.c
+1
-1
tests/script/api/batchprepare.c
tests/script/api/batchprepare.c
+115
-33
tests/script/tsim/vnode/replica3_repeat.sim
tests/script/tsim/vnode/replica3_repeat.sim
+4
-3
未找到文件。
include/libs/sync/sync.h
浏览文件 @
de4571fb
...
@@ -25,7 +25,7 @@ extern "C" {
...
@@ -25,7 +25,7 @@ extern "C" {
#include "tlrucache.h"
#include "tlrucache.h"
#include "tmsgcb.h"
#include "tmsgcb.h"
#define SYNC_RESP_TTL_MS
1000
0000
#define SYNC_RESP_TTL_MS
3
0000
#define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SLOW_DOWN_RANGE 100
#define SYNC_SLOW_DOWN_RANGE 100
...
...
include/libs/transport/trpc.h
浏览文件 @
de4571fb
...
@@ -72,6 +72,7 @@ typedef struct SRpcMsg {
...
@@ -72,6 +72,7 @@ typedef struct SRpcMsg {
typedef
void
(
*
RpcCfp
)(
void
*
parent
,
SRpcMsg
*
,
SEpSet
*
epset
);
typedef
void
(
*
RpcCfp
)(
void
*
parent
,
SRpcMsg
*
,
SEpSet
*
epset
);
typedef
bool
(
*
RpcRfp
)(
int32_t
code
,
tmsg_t
msgType
);
typedef
bool
(
*
RpcRfp
)(
int32_t
code
,
tmsg_t
msgType
);
typedef
bool
(
*
RpcTfp
)(
int32_t
code
,
tmsg_t
msgType
);
typedef
bool
(
*
RpcTfp
)(
int32_t
code
,
tmsg_t
msgType
);
typedef
bool
(
*
RpcFFfp
)(
tmsg_t
msgType
);
typedef
void
(
*
RpcDfp
)(
void
*
ahandle
);
typedef
void
(
*
RpcDfp
)(
void
*
ahandle
);
typedef
struct
SRpcInit
{
typedef
struct
SRpcInit
{
...
@@ -90,6 +91,9 @@ typedef struct SRpcInit {
...
@@ -90,6 +91,9 @@ typedef struct SRpcInit {
int32_t
retryMaxInterval
;
// retry max interval
int32_t
retryMaxInterval
;
// retry max interval
int64_t
retryMaxTimouet
;
int64_t
retryMaxTimouet
;
int32_t
failFastThreshold
;
int32_t
failFastInterval
;
int32_t
compressSize
;
// -1: no compress, 0 : all data compressed, size: compress data if larger than size
int32_t
compressSize
;
// -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t
encryption
;
// encrypt or not
int8_t
encryption
;
// encrypt or not
...
@@ -107,6 +111,8 @@ typedef struct SRpcInit {
...
@@ -107,6 +111,8 @@ typedef struct SRpcInit {
// destroy client ahandle;
// destroy client ahandle;
RpcDfp
dfp
;
RpcDfp
dfp
;
// fail fast fp
RpcFFfp
ffp
;
void
*
parent
;
void
*
parent
;
}
SRpcInit
;
}
SRpcInit
;
...
...
source/client/src/clientEnv.c
浏览文件 @
de4571fb
...
@@ -231,10 +231,9 @@ void destroyTscObj(void *pObj) {
...
@@ -231,10 +231,9 @@ void destroyTscObj(void *pObj) {
tscDebug
(
"connObj 0x%"
PRIx64
" p:%p destroyed, remain inst totalConn:%"
PRId64
,
pTscObj
->
id
,
pTscObj
,
tscDebug
(
"connObj 0x%"
PRIx64
" p:%p destroyed, remain inst totalConn:%"
PRId64
,
pTscObj
->
id
,
pTscObj
,
pTscObj
->
pAppInfo
->
numOfConns
);
pTscObj
->
pAppInfo
->
numOfConns
);
int64_t
connNum
=
atomic_sub_fetch_64
(
&
pTscObj
->
pAppInfo
->
numOfConns
,
1
);
// In any cases, we should not free app inst here. Or an race condition rises.
if
(
0
==
connNum
)
{
/*int64_t connNum = */
atomic_sub_fetch_64
(
&
pTscObj
->
pAppInfo
->
numOfConns
,
1
);
destroyAppInst
(
pTscObj
->
pAppInfo
);
}
taosThreadMutexDestroy
(
&
pTscObj
->
mutex
);
taosThreadMutexDestroy
(
&
pTscObj
->
mutex
);
taosMemoryFree
(
pTscObj
);
taosMemoryFree
(
pTscObj
);
...
...
source/common/src/tglobal.c
浏览文件 @
de4571fb
...
@@ -407,7 +407,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
...
@@ -407,7 +407,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfQnodeQueryThreads
=
tsNumOfCores
*
2
;
tsNumOfQnodeQueryThreads
=
tsNumOfCores
*
2
;
tsNumOfQnodeQueryThreads
=
TMAX
(
tsNumOfQnodeQueryThreads
,
4
);
tsNumOfQnodeQueryThreads
=
TMAX
(
tsNumOfQnodeQueryThreads
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfQnodeQueryThreads"
,
tsNumOfQnodeQueryThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfQnodeQueryThreads"
,
tsNumOfQnodeQueryThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
...
...
source/dnode/mgmt/node_mgmt/src/dmTransport.c
浏览文件 @
de4571fb
...
@@ -48,6 +48,11 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
...
@@ -48,6 +48,11 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
return
(
*
msgFp
)(
pWrapper
->
pMgmt
,
pMsg
);
return
(
*
msgFp
)(
pWrapper
->
pMgmt
,
pMsg
);
}
}
static
bool
dmFailFastFp
(
tmsg_t
msgType
)
{
// add more msg type later
return
msgType
==
TDMT_SYNC_HEARTBEAT
||
msgType
==
TDMT_SYNC_APPEND_ENTRIES
;
}
static
void
dmProcessRpcMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpc
,
SEpSet
*
pEpSet
)
{
static
void
dmProcessRpcMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpc
,
SEpSet
*
pEpSet
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
...
@@ -260,6 +265,10 @@ int32_t dmInitClient(SDnode *pDnode) {
...
@@ -260,6 +265,10 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit
.
retryMaxInterval
=
tsRedirectMaxPeriod
;
rpcInit
.
retryMaxInterval
=
tsRedirectMaxPeriod
;
rpcInit
.
retryMaxTimouet
=
tsMaxRetryWaitTime
;
rpcInit
.
retryMaxTimouet
=
tsMaxRetryWaitTime
;
rpcInit
.
failFastInterval
=
1000
;
// interval threshold(ms)
rpcInit
.
failFastThreshold
=
3
;
// failed threshold
rpcInit
.
ffp
=
dmFailFastFp
;
pTrans
->
clientRpc
=
rpcOpen
(
&
rpcInit
);
pTrans
->
clientRpc
=
rpcOpen
(
&
rpcInit
);
if
(
pTrans
->
clientRpc
==
NULL
)
{
if
(
pTrans
->
clientRpc
==
NULL
)
{
dError
(
"failed to init dnode rpc client"
);
dError
(
"failed to init dnode rpc client"
);
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
de4571fb
...
@@ -383,9 +383,9 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
...
@@ -383,9 +383,9 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pGid
->
syncCanRead
!=
pVload
->
syncCanRead
)
{
pGid
->
syncCanRead
!=
pVload
->
syncCanRead
)
{
mInfo
(
mInfo
(
"vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
"vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
"canRead:%d"
,
"canRead:%d
, dnode:%d
"
,
pVgroup
->
vgId
,
syncStr
(
pGid
->
syncState
),
pGid
->
syncRestore
,
pGid
->
syncCanRead
,
pVgroup
->
vgId
,
syncStr
(
pGid
->
syncState
),
pGid
->
syncRestore
,
pGid
->
syncCanRead
,
syncStr
(
pVload
->
syncState
),
pVload
->
syncRestore
,
pVload
->
syncCanRead
);
syncStr
(
pVload
->
syncState
),
pVload
->
syncRestore
,
pVload
->
syncCanRead
,
pDnode
->
id
);
pGid
->
syncState
=
pVload
->
syncState
;
pGid
->
syncState
=
pVload
->
syncState
;
pGid
->
syncRestore
=
pVload
->
syncRestore
;
pGid
->
syncRestore
=
pVload
->
syncRestore
;
pGid
->
syncCanRead
=
pVload
->
syncCanRead
;
pGid
->
syncCanRead
=
pVload
->
syncCanRead
;
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
de4571fb
...
@@ -1126,34 +1126,61 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
...
@@ -1126,34 +1126,61 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
}
}
if
(
!
force
)
{
if
(
!
force
)
{
mInfo
(
"vgId:%d, will add 1 vnode"
,
pVgroup
->
vgId
);
if
(
newVg
.
replica
==
1
)
{
if
(
mndAddVnodeToVgroup
(
pMnode
,
pTrans
,
&
newVg
,
pArray
)
!=
0
)
return
-
1
;
mInfo
(
"vgId:%d, will add 1 vnode, replca:1"
,
pVgroup
->
vgId
);
for
(
int32_t
i
=
0
;
i
<
newVg
.
replica
-
1
;
++
i
)
{
if
(
mndAddVnodeToVgroup
(
pMnode
,
pTrans
,
&
newVg
,
pArray
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
newVg
.
vnodeGid
[
i
].
dnodeId
)
!=
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
newVg
.
replica
-
1
;
++
i
)
{
}
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
newVg
.
vnodeGid
[
i
].
dnodeId
)
!=
0
)
return
-
1
;
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
&
newVg
.
vnodeGid
[
newVg
.
replica
-
1
])
!=
0
)
return
-
1
;
}
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
)
!=
0
)
return
-
1
;
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
&
newVg
.
vnodeGid
[
newVg
.
replica
-
1
])
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
)
!=
0
)
return
-
1
;
mInfo
(
"vgId:%d, will remove 1 vnode, replca:2"
,
pVgroup
->
vgId
);
newVg
.
replica
--
;
SVnodeGid
del
=
newVg
.
vnodeGid
[
vnIndex
];
newVg
.
vnodeGid
[
vnIndex
]
=
newVg
.
vnodeGid
[
newVg
.
replica
];
memset
(
&
newVg
.
vnodeGid
[
newVg
.
replica
],
0
,
sizeof
(
SVnodeGid
));
{
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRaw
)
!=
0
)
{
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
}
mInfo
(
"vgId:%d, will remove 1 vnode"
,
pVgroup
->
vgId
);
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
&
del
,
true
)
!=
0
)
return
-
1
;
newVg
.
replica
--
;
for
(
int32_t
i
=
0
;
i
<
newVg
.
replica
;
++
i
)
{
SVnodeGid
del
=
newVg
.
vnodeGid
[
vnIndex
];
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
newVg
.
vnodeGid
[
i
].
dnodeId
)
!=
0
)
return
-
1
;
newVg
.
vnodeGid
[
vnIndex
]
=
newVg
.
vnodeGid
[
newVg
.
replica
];
}
memset
(
&
newVg
.
vnodeGid
[
newVg
.
replica
],
0
,
sizeof
(
SVnodeGid
));
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
)
!=
0
)
return
-
1
;
{
}
else
{
// new replica == 3
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
mInfo
(
"vgId:%d, will add 1 vnode, replca:3"
,
pVgroup
->
vgId
);
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
mndAddVnodeToVgroup
(
pMnode
,
pTrans
,
&
newVg
,
pArray
)
!=
0
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRaw
)
!=
0
)
{
mInfo
(
"vgId:%d, will remove 1 vnode, replca:4"
,
pVgroup
->
vgId
);
sdbFreeRaw
(
pRaw
);
newVg
.
replica
--
;
return
-
1
;
SVnodeGid
del
=
newVg
.
vnodeGid
[
vnIndex
];
newVg
.
vnodeGid
[
vnIndex
]
=
newVg
.
vnodeGid
[
newVg
.
replica
];
memset
(
&
newVg
.
vnodeGid
[
newVg
.
replica
],
0
,
sizeof
(
SVnodeGid
));
{
SSdbRaw
*
pRaw
=
mndVgroupActionEncode
(
&
newVg
);
if
(
pRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRaw
)
!=
0
)
{
sdbFreeRaw
(
pRaw
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
}
}
(
void
)
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
}
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
&
del
,
true
)
!=
0
)
return
-
1
;
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
&
del
,
true
)
!=
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
newVg
.
replica
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
newVg
.
replica
;
++
i
)
{
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
newVg
.
vnodeGid
[
i
].
dnodeId
)
!=
0
)
return
-
1
;
if
(
i
==
vnIndex
)
continue
;
if
(
mndAddAlterVnodeReplicaAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
newVg
.
vnodeGid
[
i
].
dnodeId
)
!=
0
)
return
-
1
;
}
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
,
&
newVg
.
vnodeGid
[
vnIndex
])
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
)
!=
0
)
return
-
1
;
}
}
if
(
mndAddAlterVnodeConfirmAction
(
pMnode
,
pTrans
,
pDb
,
&
newVg
)
!=
0
)
return
-
1
;
}
else
{
}
else
{
mInfo
(
"vgId:%d, will add 1 vnode and force remove 1 vnode"
,
pVgroup
->
vgId
);
mInfo
(
"vgId:%d, will add 1 vnode and force remove 1 vnode"
,
pVgroup
->
vgId
);
if
(
mndAddVnodeToVgroup
(
pMnode
,
pTrans
,
&
newVg
,
pArray
)
!=
0
)
return
-
1
;
if
(
mndAddVnodeToVgroup
(
pMnode
,
pTrans
,
&
newVg
,
pArray
)
!=
0
)
return
-
1
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
de4571fb
...
@@ -214,6 +214,7 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
...
@@ -214,6 +214,7 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static
int32_t
doBuildDataBlock
(
STsdbReader
*
pReader
);
static
int32_t
doBuildDataBlock
(
STsdbReader
*
pReader
);
static
TSDBKEY
getCurrentKeyInBuf
(
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
);
static
TSDBKEY
getCurrentKeyInBuf
(
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
);
static
bool
hasDataInFileBlock
(
const
SBlockData
*
pBlockData
,
const
SFileBlockDumpInfo
*
pDumpInfo
);
static
bool
hasDataInFileBlock
(
const
SBlockData
*
pBlockData
,
const
SFileBlockDumpInfo
*
pDumpInfo
);
static
void
initBlockDumpInfo
(
STsdbReader
*
pReader
,
SDataBlockIter
*
pBlockIter
);
static
bool
outOfTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
)
{
return
(
ts
>
pWindow
->
ekey
)
||
(
ts
<
pWindow
->
skey
);
}
static
bool
outOfTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
)
{
return
(
ts
>
pWindow
->
ekey
)
||
(
ts
<
pWindow
->
skey
);
}
...
@@ -2477,8 +2478,39 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
...
@@ -2477,8 +2478,39 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SDataBlk
*
pBlock
=
getCurrentBlock
(
&
pReader
->
status
.
blockIter
);
SDataBlk
*
pBlock
=
getCurrentBlock
(
&
pReader
->
status
.
blockIter
);
if
(
pDumpInfo
->
rowIndex
>=
pBlock
->
nRow
||
pDumpInfo
->
rowIndex
<
0
)
{
if
(
pDumpInfo
->
rowIndex
>=
pBlock
->
nRow
||
pDumpInfo
->
rowIndex
<
0
)
{
setBlockAllDumped
(
pDumpInfo
,
pBlock
->
maxKey
.
ts
,
pReader
->
order
);
break
;
int32_t
nextIndex
=
-
1
;
SBlockIndex
bIndex
=
{
0
};
bool
hasNeighbor
=
getNeighborBlockOfSameTable
(
pBlockInfo
,
pBlockScanInfo
,
&
nextIndex
,
pReader
->
order
,
&
bIndex
);
if
(
!
hasNeighbor
)
{
// do nothing
setBlockAllDumped
(
pDumpInfo
,
pBlock
->
maxKey
.
ts
,
pReader
->
order
);
break
;
}
if
(
overlapWithNeighborBlock
(
pBlock
,
&
bIndex
,
pReader
->
order
))
{
// load next block
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
SDataBlockIter
*
pBlockIter
=
&
pStatus
->
blockIter
;
// 1. find the next neighbor block in the scan block list
SFileDataBlockInfo
fb
=
{.
uid
=
pBlockInfo
->
uid
,
.
tbBlockIdx
=
nextIndex
};
int32_t
neighborIndex
=
findFileBlockInfoIndex
(
pBlockIter
,
&
fb
);
// 2. remove it from the scan block list
setFileBlockActiveInBlockIter
(
pBlockIter
,
neighborIndex
,
step
);
// 3. load the neighbor block, and set it to be the currently accessed file data block
code
=
doLoadFileBlockData
(
pReader
,
pBlockIter
,
&
pStatus
->
fileBlockData
,
pBlockInfo
->
uid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
setBlockAllDumped
(
pDumpInfo
,
pBlock
->
maxKey
.
ts
,
pReader
->
order
);
break
;
}
// 4. check the data values
initBlockDumpInfo
(
pReader
,
pBlockIter
);
}
else
{
setBlockAllDumped
(
pDumpInfo
,
pBlock
->
maxKey
.
ts
,
pReader
->
order
);
break
;
}
}
}
}
}
}
}
...
@@ -2888,7 +2920,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
...
@@ -2888,7 +2920,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
}
}
// set the correct start position in case of the first/last file block, according to the query time window
// set the correct start position in case of the first/last file block, according to the query time window
static
void
initBlockDumpInfo
(
STsdbReader
*
pReader
,
SDataBlockIter
*
pBlockIter
)
{
void
initBlockDumpInfo
(
STsdbReader
*
pReader
,
SDataBlockIter
*
pBlockIter
)
{
SDataBlk
*
pBlock
=
getCurrentBlock
(
pBlockIter
);
SDataBlk
*
pBlock
=
getCurrentBlock
(
pBlockIter
);
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
SReaderStatus
*
pStatus
=
&
pReader
->
status
;
...
@@ -4055,6 +4087,31 @@ void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64
...
@@ -4055,6 +4087,31 @@ void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64
}
}
}
}
static
void
doFillNullColSMA
(
SBlockLoadSuppInfo
*
pSup
,
int32_t
numOfRows
,
int32_t
numOfCols
,
SColumnDataAgg
*
pTsAgg
)
{
// do fill all null column value SMA info
int32_t
i
=
0
,
j
=
0
;
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pSup
->
pColAgg
);
taosArrayInsert
(
pSup
->
pColAgg
,
0
,
pTsAgg
);
while
(
j
<
numOfCols
&&
i
<
size
)
{
SColumnDataAgg
*
pAgg
=
taosArrayGet
(
pSup
->
pColAgg
,
i
);
if
(
pAgg
->
colId
==
pSup
->
colId
[
j
])
{
i
+=
1
;
j
+=
1
;
}
else
if
(
pAgg
->
colId
<
pSup
->
colId
[
j
])
{
i
+=
1
;
}
else
if
(
pSup
->
colId
[
j
]
<
pAgg
->
colId
)
{
if
(
pSup
->
colId
[
j
]
!=
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
SColumnDataAgg
nullColAgg
=
{.
colId
=
pSup
->
colId
[
j
],
.
numOfNull
=
numOfRows
};
taosArrayInsert
(
pSup
->
pColAgg
,
i
,
&
nullColAgg
);
}
j
+=
1
;
}
}
}
int32_t
tsdbRetrieveDatablockSMA
(
STsdbReader
*
pReader
,
SColumnDataAgg
***
pBlockSMA
,
bool
*
allHave
)
{
int32_t
tsdbRetrieveDatablockSMA
(
STsdbReader
*
pReader
,
SColumnDataAgg
***
pBlockSMA
,
bool
*
allHave
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
*
allHave
=
false
;
*
allHave
=
false
;
...
@@ -4110,6 +4167,10 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockS
...
@@ -4110,6 +4167,10 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockS
pResBlock
->
pBlockAgg
=
taosMemoryCalloc
(
num
,
sizeof
(
SColumnDataAgg
));
pResBlock
->
pBlockAgg
=
taosMemoryCalloc
(
num
,
sizeof
(
SColumnDataAgg
));
}
}
// do fill all null column value SMA info
doFillNullColSMA
(
pSup
,
pBlock
->
nRow
,
numOfCols
,
pTsAgg
);
i
=
0
,
j
=
0
;
while
(
j
<
numOfCols
&&
i
<
size
)
{
while
(
j
<
numOfCols
&&
i
<
size
)
{
SColumnDataAgg
*
pAgg
=
taosArrayGet
(
pSup
->
pColAgg
,
i
);
SColumnDataAgg
*
pAgg
=
taosArrayGet
(
pSup
->
pColAgg
,
i
);
if
(
pAgg
->
colId
==
pSup
->
colId
[
j
])
{
if
(
pAgg
->
colId
==
pSup
->
colId
[
j
])
{
...
@@ -4119,15 +4180,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockS
...
@@ -4119,15 +4180,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg ***pBlockS
}
else
if
(
pAgg
->
colId
<
pSup
->
colId
[
j
])
{
}
else
if
(
pAgg
->
colId
<
pSup
->
colId
[
j
])
{
i
+=
1
;
i
+=
1
;
}
else
if
(
pSup
->
colId
[
j
]
<
pAgg
->
colId
)
{
}
else
if
(
pSup
->
colId
[
j
]
<
pAgg
->
colId
)
{
if
(
pSup
->
colId
[
j
]
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
ASSERT
(
pSup
->
colId
[
j
]
==
PRIMARYKEY_TIMESTAMP_COL_ID
);
pResBlock
->
pBlockAgg
[
pSup
->
slotId
[
j
]]
=
&
pSup
->
tsColAgg
;
pResBlock
->
pBlockAgg
[
pSup
->
slotId
[
j
]]
=
&
pSup
->
tsColAgg
;
}
else
{
// all date in this block are null
SColumnDataAgg
nullColAgg
=
{.
colId
=
pSup
->
colId
[
j
],
.
numOfNull
=
pBlock
->
nRow
};
taosArrayPush
(
pSup
->
pColAgg
,
&
nullColAgg
);
pResBlock
->
pBlockAgg
[
pSup
->
slotId
[
j
]]
=
taosArrayGetLast
(
pSup
->
pColAgg
);
}
j
+=
1
;
j
+=
1
;
}
}
}
}
...
...
source/libs/executor/src/executil.c
浏览文件 @
de4571fb
...
@@ -439,7 +439,9 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
...
@@ -439,7 +439,9 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
goto
end
;
goto
end
;
}
}
}
}
removeInvalidTable
(
uidList
,
tags
);
if
(
suid
!=
0
)
{
removeInvalidTable
(
uidList
,
tags
);
}
int32_t
rows
=
taosArrayGetSize
(
uidList
);
int32_t
rows
=
taosArrayGetSize
(
uidList
);
if
(
rows
==
0
)
{
if
(
rows
==
0
)
{
...
@@ -1604,7 +1606,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
...
@@ -1604,7 +1606,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
pCond
->
numOfCols
=
LIST_LENGTH
(
pTableScanNode
->
scan
.
pScanCols
);
pCond
->
numOfCols
=
LIST_LENGTH
(
pTableScanNode
->
scan
.
pScanCols
);
pCond
->
colList
=
taosMemoryCalloc
(
pCond
->
numOfCols
,
sizeof
(
SColumnInfo
));
pCond
->
colList
=
taosMemoryCalloc
(
pCond
->
numOfCols
,
sizeof
(
SColumnInfo
));
pCond
->
pSlotList
=
taosMemoryMalloc
(
sizeof
(
int32_t
)
*
pCond
->
numOfCols
);
pCond
->
pSlotList
=
taosMemoryMalloc
(
sizeof
(
int32_t
)
*
pCond
->
numOfCols
);
if
(
pCond
->
colList
==
NULL
||
pCond
->
pSlotList
==
NULL
)
{
if
(
pCond
->
colList
==
NULL
||
pCond
->
pSlotList
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFreeClear
(
pCond
->
colList
);
taosMemoryFreeClear
(
pCond
->
colList
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
de4571fb
...
@@ -1580,6 +1580,7 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) {
...
@@ -1580,6 +1580,7 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) {
int32_t
optrDefaultBufFn
(
SOperatorInfo
*
pOperator
)
{
int32_t
optrDefaultBufFn
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
blocking
)
{
if
(
pOperator
->
blocking
)
{
ASSERT
(
0
);
ASSERT
(
0
);
return
0
;
}
else
{
}
else
{
return
0
;
return
0
;
}
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
de4571fb
...
@@ -1218,27 +1218,56 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
...
@@ -1218,27 +1218,56 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
if
(
rows
==
0
)
{
if
(
rows
==
0
)
{
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
code
=
blockDataEnsureCapacity
(
pDestBlock
,
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
SColumnInfoData
*
pSrcStartTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcStartTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcEndTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcEndTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pSrcUidCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
SColumnInfoData
*
pSrcUidCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
uint64_t
*
srcUidData
=
(
uint64_t
*
)
pSrcUidCol
->
pData
;
SColumnInfoData
*
pSrcGpCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pSrcGpCol
=
taosArrayGet
(
pSrcBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
uint64_t
*
srcGp
=
(
uint64_t
*
)
pSrcGpCol
->
pData
;
uint64_t
*
srcUidData
=
(
uint64_t
*
)
pSrcUidCol
->
pData
;
ASSERT
(
pSrcStartTsCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
ASSERT
(
pSrcStartTsCol
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
TSKEY
*
srcStartTsCol
=
(
TSKEY
*
)
pSrcStartTsCol
->
pData
;
TSKEY
*
srcStartTsCol
=
(
TSKEY
*
)
pSrcStartTsCol
->
pData
;
TSKEY
*
srcEndTsCol
=
(
TSKEY
*
)
pSrcEndTsCol
->
pData
;
TSKEY
*
srcEndTsCol
=
(
TSKEY
*
)
pSrcEndTsCol
->
pData
;
int64_t
version
=
pSrcBlock
->
info
.
version
-
1
;
if
(
pInfo
->
partitionSup
.
needCalc
&&
srcStartTsCol
[
0
]
!=
srcEndTsCol
[
0
])
{
uint64_t
srcUid
=
srcUidData
[
0
];
TSKEY
startTs
=
srcStartTsCol
[
0
];
TSKEY
endTs
=
srcEndTsCol
[
0
];
SSDataBlock
*
pPreRes
=
readPreVersionData
(
pInfo
->
pTableScanOp
,
srcUid
,
startTs
,
endTs
,
version
);
printDataBlock
(
pPreRes
,
"pre res"
);
blockDataCleanup
(
pSrcBlock
);
int32_t
code
=
blockDataEnsureCapacity
(
pSrcBlock
,
pPreRes
->
info
.
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
SColumnInfoData
*
pTsCol
=
(
SColumnInfoData
*
)
taosArrayGet
(
pPreRes
->
pDataBlock
,
pInfo
->
primaryTsIndex
);
rows
=
pPreRes
->
info
.
rows
;
for
(
int32_t
i
=
0
;
i
<
rows
;
i
++
)
{
uint64_t
groupId
=
calGroupIdByData
(
&
pInfo
->
partitionSup
,
pInfo
->
pPartScalarSup
,
pPreRes
,
i
);
appendOneRowToStreamSpecialBlock
(
pSrcBlock
,
((
TSKEY
*
)
pTsCol
->
pData
)
+
i
,
((
TSKEY
*
)
pTsCol
->
pData
)
+
i
,
&
srcUid
,
&
groupId
,
NULL
);
}
printDataBlock
(
pSrcBlock
,
"new delete"
);
}
uint64_t
*
srcGp
=
(
uint64_t
*
)
pSrcGpCol
->
pData
;
srcStartTsCol
=
(
TSKEY
*
)
pSrcStartTsCol
->
pData
;
srcEndTsCol
=
(
TSKEY
*
)
pSrcEndTsCol
->
pData
;
srcUidData
=
(
uint64_t
*
)
pSrcUidCol
->
pData
;
int32_t
code
=
blockDataEnsureCapacity
(
pDestBlock
,
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDeUidCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
SColumnInfoData
*
pDeUidCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
SColumnInfoData
*
pGpCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pGpCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pCalStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pCalStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pCalEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
);
SColumnInfoData
*
pCalEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
);
int64_t
version
=
pSrcBlock
->
info
.
version
-
1
;
for
(
int32_t
i
=
0
;
i
<
rows
;)
{
for
(
int32_t
i
=
0
;
i
<
rows
;)
{
uint64_t
srcUid
=
srcUidData
[
i
];
uint64_t
srcUid
=
srcUidData
[
i
];
uint64_t
groupId
=
srcGp
[
i
];
uint64_t
groupId
=
srcGp
[
i
];
...
@@ -1653,13 +1682,6 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
...
@@ -1653,13 +1682,6 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
uint64_t
groupId
=
getGroupIdByUid
(
pInfo
,
uidCol
[
i
]);
uint64_t
groupId
=
getGroupIdByUid
(
pInfo
,
uidCol
[
i
]);
colDataAppend
(
pGpCol
,
i
,
(
const
char
*
)
&
groupId
,
false
);
colDataAppend
(
pGpCol
,
i
,
(
const
char
*
)
&
groupId
,
false
);
}
}
}
else
{
// SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uidCol[i], startTsCol, ts, maxVersion);
// if (!pPreRes || pPreRes->info.rows == 0) {
// return 0;
// }
// ASSERT(pPreRes->info.rows == 1);
// return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}
}
}
}
...
@@ -3032,8 +3054,10 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char*
...
@@ -3032,8 +3054,10 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char*
if
(
pSupp
->
dbNameSlotId
!=
-
1
)
{
if
(
pSupp
->
dbNameSlotId
!=
-
1
)
{
ASSERT
(
strlen
(
dbName
));
ASSERT
(
strlen
(
dbName
));
SColumnInfoData
*
colInfoData
=
taosArrayGet
(
pRes
->
pDataBlock
,
pSupp
->
dbNameSlotId
);
SColumnInfoData
*
colInfoData
=
taosArrayGet
(
pRes
->
pDataBlock
,
pSupp
->
dbNameSlotId
);
char
varDbName
[
TSDB_DB_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
strncpy
(
varDataVal
(
varDbName
),
dbName
,
strlen
(
dbName
));
char
varDbName
[
TSDB_DB_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tstrncpy
(
varDataVal
(
varDbName
),
dbName
,
TSDB_DB_NAME_LEN
);
varDataSetLen
(
varDbName
,
strlen
(
dbName
));
varDataSetLen
(
varDbName
,
strlen
(
dbName
));
colDataAppend
(
colInfoData
,
0
,
varDbName
,
false
);
colDataAppend
(
colInfoData
,
0
,
varDbName
,
false
);
}
}
...
@@ -3042,7 +3066,7 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char*
...
@@ -3042,7 +3066,7 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char*
SColumnInfoData
*
colInfoData
=
taosArrayGet
(
pRes
->
pDataBlock
,
pSupp
->
stbNameSlotId
);
SColumnInfoData
*
colInfoData
=
taosArrayGet
(
pRes
->
pDataBlock
,
pSupp
->
stbNameSlotId
);
if
(
strlen
(
stbName
)
!=
0
)
{
if
(
strlen
(
stbName
)
!=
0
)
{
char
varStbName
[
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
varStbName
[
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
strncpy
(
varDataVal
(
varStbName
),
stbName
,
strlen
(
stbName
)
);
strncpy
(
varDataVal
(
varStbName
),
stbName
,
TSDB_TABLE_NAME_LEN
);
varDataSetLen
(
varStbName
,
strlen
(
stbName
));
varDataSetLen
(
varStbName
,
strlen
(
stbName
));
colDataAppend
(
colInfoData
,
0
,
varStbName
,
false
);
colDataAppend
(
colInfoData
,
0
,
varStbName
,
false
);
}
else
{
}
else
{
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
de4571fb
...
@@ -504,7 +504,7 @@ static int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
...
@@ -504,7 +504,7 @@ static int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
*/
*/
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
SColumnInfoData
*
pInputCol
=
pInput
->
pData
[
0
];
if
(
pInput
->
colDataSMAIsSet
&&
pInput
->
totalRows
==
pInput
->
numOfRows
)
{
if
(
pInput
->
colDataSMAIsSet
&&
pInput
->
totalRows
==
pInput
->
numOfRows
&&
!
IS_VAR_DATA_TYPE
(
pInputCol
->
info
.
type
)
)
{
numOfElem
=
pInput
->
numOfRows
-
pInput
->
pColumnDataAgg
[
0
]
->
numOfNull
;
numOfElem
=
pInput
->
numOfRows
-
pInput
->
pColumnDataAgg
[
0
]
->
numOfNull
;
ASSERT
(
numOfElem
>=
0
);
ASSERT
(
numOfElem
>=
0
);
}
else
{
}
else
{
...
...
source/libs/scheduler/inc/schInt.h
浏览文件 @
de4571fb
...
@@ -377,7 +377,7 @@ extern SSchedulerMgmt schMgmt;
...
@@ -377,7 +377,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
(SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task))))
(SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task))
|| (_task)->redirectCtx.inRedirect
))
#define SCH_REDIRECT_MSGTYPE(_msgType) \
#define SCH_REDIRECT_MSGTYPE(_msgType) \
((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
(_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
(_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
de4571fb
...
@@ -156,6 +156,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
...
@@ -156,6 +156,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_RET
(
schHandleRedirect
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
));
SCH_RET
(
schHandleRedirect
(
pJob
,
pTask
,
(
SDataBuf
*
)
pMsg
,
rspCode
));
}
}
pTask
->
redirectCtx
.
inRedirect
=
false
;
switch
(
msgType
)
{
switch
(
msgType
)
{
case
TDMT_VND_COMMIT_RSP
:
{
case
TDMT_VND_COMMIT_RSP
:
{
SCH_ERR_JRET
(
rspCode
);
SCH_ERR_JRET
(
rspCode
);
...
...
source/libs/scheduler/src/schTask.c
浏览文件 @
de4571fb
...
@@ -362,17 +362,12 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
...
@@ -362,17 +362,12 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
}
}
pCtx
->
totalTimes
++
;
pCtx
->
totalTimes
++
;
pCtx
->
roundTimes
++
;
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
)
&&
pEpSet
)
{
if
(
SCH_IS_DATA_BIND_TASK
(
pTask
)
&&
pEpSet
)
{
pCtx
->
roundTotal
=
pEpSet
->
numOfEps
;
pCtx
->
roundTotal
=
pEpSet
->
numOfEps
;
pCtx
->
roundTimes
=
0
;
pTask
->
delayExecMs
=
0
;
goto
_return
;
}
}
pCtx
->
roundTimes
++
;
if
(
pCtx
->
roundTimes
>=
pCtx
->
roundTotal
)
{
if
(
pCtx
->
roundTimes
>=
pCtx
->
roundTotal
)
{
int64_t
nowTs
=
taosGetTimestampMs
();
int64_t
nowTs
=
taosGetTimestampMs
();
...
...
source/libs/sync/inc/syncMessage.h
浏览文件 @
de4571fb
...
@@ -46,6 +46,7 @@ typedef struct SyncClientRequest {
...
@@ -46,6 +46,7 @@ typedef struct SyncClientRequest {
uint32_t
originalRpcType
;
// origin RpcMsg msgType
uint32_t
originalRpcType
;
// origin RpcMsg msgType
uint64_t
seqNum
;
uint64_t
seqNum
;
bool
isWeak
;
bool
isWeak
;
int16_t
reserved
;
uint32_t
dataLen
;
// origin RpcMsg.contLen
uint32_t
dataLen
;
// origin RpcMsg.contLen
char
data
[];
// origin RpcMsg.pCont
char
data
[];
// origin RpcMsg.pCont
}
SyncClientRequest
;
}
SyncClientRequest
;
...
@@ -56,6 +57,7 @@ typedef struct SyncClientRequestReply {
...
@@ -56,6 +57,7 @@ typedef struct SyncClientRequestReply {
uint32_t
msgType
;
uint32_t
msgType
;
int32_t
errCode
;
int32_t
errCode
;
SRaftId
leaderHint
;
SRaftId
leaderHint
;
int16_t
reserved
;
}
SyncClientRequestReply
;
}
SyncClientRequestReply
;
typedef
struct
SyncRequestVote
{
typedef
struct
SyncRequestVote
{
...
@@ -68,6 +70,7 @@ typedef struct SyncRequestVote {
...
@@ -68,6 +70,7 @@ typedef struct SyncRequestVote {
SyncTerm
term
;
SyncTerm
term
;
SyncIndex
lastLogIndex
;
SyncIndex
lastLogIndex
;
SyncTerm
lastLogTerm
;
SyncTerm
lastLogTerm
;
int16_t
reserved
;
}
SyncRequestVote
;
}
SyncRequestVote
;
typedef
struct
SyncRequestVoteReply
{
typedef
struct
SyncRequestVoteReply
{
...
@@ -79,6 +82,7 @@ typedef struct SyncRequestVoteReply {
...
@@ -79,6 +82,7 @@ typedef struct SyncRequestVoteReply {
// private data
// private data
SyncTerm
term
;
SyncTerm
term
;
bool
voteGranted
;
bool
voteGranted
;
int16_t
reserved
;
}
SyncRequestVoteReply
;
}
SyncRequestVoteReply
;
typedef
struct
SyncAppendEntries
{
typedef
struct
SyncAppendEntries
{
...
@@ -94,6 +98,7 @@ typedef struct SyncAppendEntries {
...
@@ -94,6 +98,7 @@ typedef struct SyncAppendEntries {
SyncTerm
prevLogTerm
;
SyncTerm
prevLogTerm
;
SyncIndex
commitIndex
;
SyncIndex
commitIndex
;
SyncTerm
privateTerm
;
SyncTerm
privateTerm
;
int16_t
reserved
;
uint32_t
dataLen
;
uint32_t
dataLen
;
char
data
[];
char
data
[];
}
SyncAppendEntries
;
}
SyncAppendEntries
;
...
@@ -111,6 +116,7 @@ typedef struct SyncAppendEntriesReply {
...
@@ -111,6 +116,7 @@ typedef struct SyncAppendEntriesReply {
SyncIndex
matchIndex
;
SyncIndex
matchIndex
;
SyncIndex
lastSendIndex
;
SyncIndex
lastSendIndex
;
int64_t
startTime
;
int64_t
startTime
;
int16_t
reserved
;
}
SyncAppendEntriesReply
;
}
SyncAppendEntriesReply
;
typedef
struct
SyncHeartbeat
{
typedef
struct
SyncHeartbeat
{
...
@@ -126,6 +132,7 @@ typedef struct SyncHeartbeat {
...
@@ -126,6 +132,7 @@ typedef struct SyncHeartbeat {
SyncTerm
privateTerm
;
SyncTerm
privateTerm
;
SyncTerm
minMatchIndex
;
SyncTerm
minMatchIndex
;
int64_t
timeStamp
;
int64_t
timeStamp
;
int16_t
reserved
;
}
SyncHeartbeat
;
}
SyncHeartbeat
;
typedef
struct
SyncHeartbeatReply
{
typedef
struct
SyncHeartbeatReply
{
...
@@ -140,6 +147,7 @@ typedef struct SyncHeartbeatReply {
...
@@ -140,6 +147,7 @@ typedef struct SyncHeartbeatReply {
SyncTerm
privateTerm
;
SyncTerm
privateTerm
;
int64_t
startTime
;
int64_t
startTime
;
int64_t
timeStamp
;
int64_t
timeStamp
;
int16_t
reserved
;
}
SyncHeartbeatReply
;
}
SyncHeartbeatReply
;
typedef
struct
SyncPreSnapshot
{
typedef
struct
SyncPreSnapshot
{
...
@@ -151,6 +159,7 @@ typedef struct SyncPreSnapshot {
...
@@ -151,6 +159,7 @@ typedef struct SyncPreSnapshot {
// private data
// private data
SyncTerm
term
;
SyncTerm
term
;
int16_t
reserved
;
}
SyncPreSnapshot
;
}
SyncPreSnapshot
;
typedef
struct
SyncPreSnapshotReply
{
typedef
struct
SyncPreSnapshotReply
{
...
@@ -163,6 +172,7 @@ typedef struct SyncPreSnapshotReply {
...
@@ -163,6 +172,7 @@ typedef struct SyncPreSnapshotReply {
// private data
// private data
SyncTerm
term
;
SyncTerm
term
;
SyncIndex
snapStart
;
SyncIndex
snapStart
;
int16_t
reserved
;
}
SyncPreSnapshotReply
;
}
SyncPreSnapshotReply
;
typedef
struct
SyncApplyMsg
{
typedef
struct
SyncApplyMsg
{
...
@@ -190,6 +200,7 @@ typedef struct SyncSnapshotSend {
...
@@ -190,6 +200,7 @@ typedef struct SyncSnapshotSend {
SSyncCfg
lastConfig
;
SSyncCfg
lastConfig
;
int64_t
startTime
;
int64_t
startTime
;
int32_t
seq
;
int32_t
seq
;
int16_t
reserved
;
uint32_t
dataLen
;
uint32_t
dataLen
;
char
data
[];
char
data
[];
}
SyncSnapshotSend
;
}
SyncSnapshotSend
;
...
@@ -208,6 +219,7 @@ typedef struct SyncSnapshotRsp {
...
@@ -208,6 +219,7 @@ typedef struct SyncSnapshotRsp {
int32_t
ack
;
int32_t
ack
;
int32_t
code
;
int32_t
code
;
SyncIndex
snapBeginIndex
;
// when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid
SyncIndex
snapBeginIndex
;
// when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid
int16_t
reserved
;
}
SyncSnapshotRsp
;
}
SyncSnapshotRsp
;
typedef
struct
SyncLeaderTransfer
{
typedef
struct
SyncLeaderTransfer
{
...
...
source/libs/sync/src/syncTimeout.c
浏览文件 @
de4571fb
...
@@ -19,6 +19,7 @@
...
@@ -19,6 +19,7 @@
#include "syncRaftCfg.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftLog.h"
#include "syncReplication.h"
#include "syncReplication.h"
#include "syncRespMgr.h"
#include "syncUtil.h"
#include "syncUtil.h"
static
void
syncNodeCleanConfigIndex
(
SSyncNode
*
ths
)
{
static
void
syncNodeCleanConfigIndex
(
SSyncNode
*
ths
)
{
...
@@ -85,11 +86,9 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
...
@@ -85,11 +86,9 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
}
}
}
}
#if 0
if
(
!
syncNodeIsMnode
(
ths
))
{
if
(
!
syncNodeIsMnode
(
ths
))
{
syncRespClean
(
ths
->
pSyncRespMgr
);
syncRespClean
(
ths
->
pSyncRespMgr
);
}
}
#endif
return
0
;
return
0
;
}
}
...
...
source/libs/transport/inc/transportInt.h
浏览文件 @
de4571fb
...
@@ -57,10 +57,14 @@ typedef struct {
...
@@ -57,10 +57,14 @@ typedef struct {
int32_t
retryMaxInterval
;
// retry max interval
int32_t
retryMaxInterval
;
// retry max interval
int32_t
retryMaxTimouet
;
int32_t
retryMaxTimouet
;
int32_t
failFastThreshold
;
int32_t
failFastInterval
;
void
(
*
cfp
)(
void
*
parent
,
SRpcMsg
*
,
SEpSet
*
);
void
(
*
cfp
)(
void
*
parent
,
SRpcMsg
*
,
SEpSet
*
);
bool
(
*
retry
)(
int32_t
code
,
tmsg_t
msgType
);
bool
(
*
retry
)(
int32_t
code
,
tmsg_t
msgType
);
bool
(
*
startTimer
)(
int32_t
code
,
tmsg_t
msgType
);
bool
(
*
startTimer
)(
int32_t
code
,
tmsg_t
msgType
);
void
(
*
destroyFp
)(
void
*
ahandle
);
void
(
*
destroyFp
)(
void
*
ahandle
);
bool
(
*
failFastFp
)(
tmsg_t
msgType
);
int
index
;
int
index
;
void
*
parent
;
void
*
parent
;
...
...
source/libs/transport/src/trans.c
浏览文件 @
de4571fb
...
@@ -56,11 +56,15 @@ void* rpcOpen(const SRpcInit* pInit) {
...
@@ -56,11 +56,15 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc
->
retryMaxInterval
=
pInit
->
retryMaxInterval
;
pRpc
->
retryMaxInterval
=
pInit
->
retryMaxInterval
;
pRpc
->
retryMaxTimouet
=
pInit
->
retryMaxTimouet
;
pRpc
->
retryMaxTimouet
=
pInit
->
retryMaxTimouet
;
pRpc
->
failFastThreshold
=
pInit
->
failFastThreshold
;
pRpc
->
failFastInterval
=
pInit
->
failFastInterval
;
// register callback handle
// register callback handle
pRpc
->
cfp
=
pInit
->
cfp
;
pRpc
->
cfp
=
pInit
->
cfp
;
pRpc
->
retry
=
pInit
->
rfp
;
pRpc
->
retry
=
pInit
->
rfp
;
pRpc
->
startTimer
=
pInit
->
tfp
;
pRpc
->
startTimer
=
pInit
->
tfp
;
pRpc
->
destroyFp
=
pInit
->
dfp
;
pRpc
->
destroyFp
=
pInit
->
dfp
;
pRpc
->
failFastFp
=
pInit
->
ffp
;
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
if
(
pRpc
->
numOfThreads
<=
0
)
{
if
(
pRpc
->
numOfThreads
<=
0
)
{
...
...
source/libs/transport/src/transCli.c
浏览文件 @
de4571fb
...
@@ -84,6 +84,8 @@ typedef struct SCliThrd {
...
@@ -84,6 +84,8 @@ typedef struct SCliThrd {
SHashObj
*
fqdn2ipCache
;
SHashObj
*
fqdn2ipCache
;
SCvtAddr
cvtAddr
;
SCvtAddr
cvtAddr
;
SHashObj
*
failFastCache
;
SCliMsg
*
stopMsg
;
SCliMsg
*
stopMsg
;
bool
quit
;
bool
quit
;
...
@@ -96,6 +98,13 @@ typedef struct SCliObj {
...
@@ -96,6 +98,13 @@ typedef struct SCliObj {
SCliThrd
**
pThreadObj
;
SCliThrd
**
pThreadObj
;
}
SCliObj
;
}
SCliObj
;
typedef
struct
{
int32_t
reinit
;
int64_t
timestamp
;
int32_t
count
;
int32_t
threshold
;
int64_t
interval
;
}
SFailFastItem
;
// conn pool
// conn pool
// add expire timeout and capacity limit
// add expire timeout and capacity limit
static
void
*
createConnPool
(
int
size
);
static
void
*
createConnPool
(
int
size
);
...
@@ -853,7 +862,7 @@ void cliSend(SCliConn* pConn) {
...
@@ -853,7 +862,7 @@ void cliSend(SCliConn* pConn) {
int
status
=
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
cliSendCb
);
int
status
=
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
cliSendCb
);
if
(
status
!=
0
)
{
if
(
status
!=
0
)
{
tGError
(
"%s conn %p failed to sen
t
msg:%s, errmsg:%s"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
TMSG_INFO
(
pMsg
->
msgType
),
tGError
(
"%s conn %p failed to sen
d
msg:%s, errmsg:%s"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
TMSG_INFO
(
pMsg
->
msgType
),
uv_err_name
(
status
));
uv_err_name
(
status
));
cliHandleExcept
(
pConn
);
cliHandleExcept
(
pConn
);
}
}
...
@@ -863,7 +872,6 @@ _RETURN:
...
@@ -863,7 +872,6 @@ _RETURN:
}
}
void
cliConnCb
(
uv_connect_t
*
req
,
int
status
)
{
void
cliConnCb
(
uv_connect_t
*
req
,
int
status
)
{
// impl later
SCliConn
*
pConn
=
req
->
data
;
SCliConn
*
pConn
=
req
->
data
;
SCliThrd
*
pThrd
=
pConn
->
hostThrd
;
SCliThrd
*
pThrd
=
pConn
->
hostThrd
;
...
@@ -875,7 +883,33 @@ void cliConnCb(uv_connect_t* req, int status) {
...
@@ -875,7 +883,33 @@ void cliConnCb(uv_connect_t* req, int status) {
}
}
if
(
status
!=
0
)
{
if
(
status
!=
0
)
{
tError
(
"%s conn %p failed to connect server:%s"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
uv_strerror
(
status
));
SCliMsg
*
pMsg
=
transQueueGet
(
&
pConn
->
cliMsgs
,
0
);
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
tError
(
"%s msg %s failed to send, conn %p failed to connect to %s:%d, reason: %s"
,
CONN_GET_INST_LABEL
(
pConn
),
pMsg
?
TMSG_INFO
(
pMsg
->
msg
.
msgType
)
:
0
,
pConn
,
pConn
->
ip
,
pConn
->
port
,
uv_strerror
(
status
));
if
(
pMsg
!=
NULL
&&
REQUEST_NO_RESP
(
&
pMsg
->
msg
)
&&
(
pTransInst
->
failFastFp
!=
NULL
&&
pTransInst
->
failFastFp
(
pMsg
->
msg
.
msgType
)))
{
char
*
ip
=
pConn
->
ip
;
uint32_t
port
=
pConn
->
port
;
char
key
[
TSDB_FQDN_LEN
+
64
]
=
{
0
};
CONN_CONSTRUCT_HASH_KEY
(
key
,
ip
,
port
);
SFailFastItem
*
item
=
taosHashGet
(
pThrd
->
failFastCache
,
key
,
strlen
(
key
));
int64_t
cTimestamp
=
taosGetTimestampMs
();
if
(
item
!=
NULL
)
{
int32_t
elapse
=
cTimestamp
-
item
->
timestamp
;
if
(
elapse
>=
0
&&
elapse
<=
pTransInst
->
failFastInterval
)
{
item
->
count
++
;
}
else
{
item
->
count
=
1
;
item
->
timestamp
=
cTimestamp
;
}
}
else
{
SFailFastItem
item
=
{.
count
=
1
,
.
timestamp
=
cTimestamp
};
taosHashPut
(
pThrd
->
failFastCache
,
key
,
strlen
(
key
),
&
item
,
sizeof
(
SFailFastItem
));
}
}
cliHandleExcept
(
pConn
);
cliHandleExcept
(
pConn
);
return
;
return
;
}
}
...
@@ -1027,6 +1061,25 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
...
@@ -1027,6 +1061,25 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
return
;
return
;
}
}
if
(
REQUEST_NO_RESP
(
&
pMsg
->
msg
)
&&
(
pTransInst
->
failFastFp
!=
NULL
&&
pTransInst
->
failFastFp
(
pMsg
->
msg
.
msgType
)))
{
char
*
ip
=
EPSET_GET_INUSE_IP
(
&
pCtx
->
epSet
);
uint32_t
port
=
EPSET_GET_INUSE_PORT
(
&
pCtx
->
epSet
);
char
key
[
TSDB_FQDN_LEN
+
64
]
=
{
0
};
CONN_CONSTRUCT_HASH_KEY
(
key
,
ip
,
port
);
SFailFastItem
*
item
=
taosHashGet
(
pThrd
->
failFastCache
,
key
,
strlen
(
key
));
if
(
item
!=
NULL
)
{
int32_t
elapse
=
(
int32_t
)(
taosGetTimestampMs
()
-
item
->
timestamp
);
if
(
item
->
count
>=
pTransInst
->
failFastThreshold
&&
(
elapse
>=
0
&&
elapse
<=
pTransInst
->
failFastInterval
))
{
STraceId
*
trace
=
&
(
pMsg
->
msg
.
info
.
traceId
);
tGTrace
(
"%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d"
,
pTransInst
->
label
,
TMSG_INFO
(
pMsg
->
msg
.
msgType
),
ip
,
port
,
item
->
count
,
elapse
);
destroyCmsg
(
pMsg
);
return
;
}
}
}
bool
ignore
=
false
;
bool
ignore
=
false
;
SCliConn
*
conn
=
cliGetConn
(
pMsg
,
pThrd
,
&
ignore
);
SCliConn
*
conn
=
cliGetConn
(
pMsg
,
pThrd
,
&
ignore
);
if
(
ignore
==
true
)
{
if
(
ignore
==
true
)
{
...
@@ -1299,6 +1352,8 @@ static SCliThrd* createThrdObj(void* trans) {
...
@@ -1299,6 +1352,8 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd
->
destroyAhandleFp
=
pTransInst
->
destroyFp
;
pThrd
->
destroyAhandleFp
=
pTransInst
->
destroyFp
;
pThrd
->
fqdn2ipCache
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
pThrd
->
fqdn2ipCache
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
pThrd
->
failFastCache
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
pThrd
->
quit
=
false
;
pThrd
->
quit
=
false
;
return
pThrd
;
return
pThrd
;
}
}
...
@@ -1325,6 +1380,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
...
@@ -1325,6 +1380,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree
(
pThrd
->
prepare
);
taosMemoryFree
(
pThrd
->
prepare
);
taosMemoryFree
(
pThrd
->
loop
);
taosMemoryFree
(
pThrd
->
loop
);
taosHashCleanup
(
pThrd
->
fqdn2ipCache
);
taosHashCleanup
(
pThrd
->
fqdn2ipCache
);
taosHashCleanup
(
pThrd
->
failFastCache
);
taosMemoryFree
(
pThrd
);
taosMemoryFree
(
pThrd
);
}
}
...
...
source/os/src/osTime.c
浏览文件 @
de4571fb
...
@@ -572,7 +572,7 @@ int32_t taosClockGetTime(int clock_id, struct timespec *pTS) {
...
@@ -572,7 +572,7 @@ int32_t taosClockGetTime(int clock_id, struct timespec *pTS) {
offsetInitFinished
=
true
;
offsetInitFinished
=
true
;
}
else
{
}
else
{
while
(
!
offsetInitFinished
)
while
(
!
offsetInitFinished
)
;
// Ensure initialization is completed.
;
// Ensure initialization is completed.
}
}
GetSystemTimeAsFileTime
(
&
f
);
GetSystemTimeAsFileTime
(
&
f
);
...
...
source/util/src/tlog.c
浏览文件 @
de4571fb
...
@@ -496,7 +496,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
...
@@ -496,7 +496,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
if
(
!
osLogSpaceAvailable
())
return
;
if
(
!
osLogSpaceAvailable
())
return
;
if
(
!
(
dflag
&
DEBUG_FILE
)
&&
!
(
dflag
&
DEBUG_SCREEN
))
return
;
if
(
!
(
dflag
&
DEBUG_FILE
)
&&
!
(
dflag
&
DEBUG_SCREEN
))
return
;
char
*
buffer
=
taosMemoryMalloc
(
LOG_MAX_LINE_DUMP_BUFFER_SIZE
);
char
*
buffer
=
taosMemoryMalloc
(
LOG_MAX_LINE_DUMP_BUFFER_SIZE
);
int32_t
len
=
taosBuildLogHead
(
buffer
,
flags
);
int32_t
len
=
taosBuildLogHead
(
buffer
,
flags
);
va_list
argpointer
;
va_list
argpointer
;
...
...
tests/script/api/batchprepare.c
浏览文件 @
de4571fb
...
@@ -79,6 +79,9 @@ int64_t bpTs;
...
@@ -79,6 +79,9 @@ int64_t bpTs;
#define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t)))
#define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t)))
typedef
struct
{
typedef
struct
{
bool
singleTbInsert
;
int32_t
singleTbIdx
;
int64_t
*
tsData
;
int64_t
*
tsData
;
bool
*
boolData
;
bool
*
boolData
;
int8_t
*
tinyData
;
int8_t
*
tinyData
;
...
@@ -116,6 +119,7 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos);
...
@@ -116,6 +119,7 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos);
int
insertMPMETest1
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
insertMPMETest1
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
insertAUTOTest1
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
insertAUTOTest1
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
insertAUTOTest2
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
insertAUTOTest2
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
insertAUTOTest3
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
queryColumnTest
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
queryColumnTest
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
queryMiscTest
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
int
queryMiscTest
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
);
...
@@ -130,6 +134,7 @@ typedef struct {
...
@@ -130,6 +134,7 @@ typedef struct {
int32_t
*
colList
;
// full table column list
int32_t
*
colList
;
// full table column list
int32_t
testType
;
int32_t
testType
;
int32_t
autoCreateTbl
;
int32_t
autoCreateTbl
;
bool
duplicateValue
;
bool
fullCol
;
bool
fullCol
;
int32_t
(
*
runFn
)(
TAOS_STMT
*
,
TAOS
*
);
int32_t
(
*
runFn
)(
TAOS_STMT
*
,
TAOS
*
);
int32_t
tblNum
;
int32_t
tblNum
;
...
@@ -143,46 +148,47 @@ typedef struct {
...
@@ -143,46 +148,47 @@ typedef struct {
}
CaseCfg
;
}
CaseCfg
;
CaseCfg
gCase
[]
=
{
CaseCfg
gCase
[]
=
{
{
"insert:MBSE0-FULL"
,
tListLen
(
shortColList
),
shortColList
,
TTYPE_INSERT
,
0
,
true
,
insertMBSETest1
,
1
,
10
,
10
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE0-FULL"
,
tListLen
(
shortColList
),
shortColList
,
TTYPE_INSERT
,
0
,
false
,
true
,
insertMBSETest1
,
1
,
10
,
10
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE0-FULL"
,
tListLen
(
shortColList
),
shortColList
,
TTYPE_INSERT
,
0
,
true
,
insertMBSETest1
,
10
,
100
,
10
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE0-FULL"
,
tListLen
(
shortColList
),
shortColList
,
TTYPE_INSERT
,
0
,
false
,
true
,
insertMBSETest1
,
10
,
100
,
10
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE1-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
true
,
insertMBSETest1
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE1-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
true
,
insertMBSETest1
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE1-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBSETest1
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE1-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBSETest1
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE1-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBSETest1
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE1-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBSETest1
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE2-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
true
,
insertMBSETest2
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE2-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
true
,
insertMBSETest2
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE2-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBSETest2
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE2-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBSETest2
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE2-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBSETest2
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
{
"insert:MBSE2-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBSETest2
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME1-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
true
,
insertMBMETest1
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME1-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
true
,
insertMBMETest1
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME1-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBMETest1
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME1-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBMETest1
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME1-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBMETest1
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME1-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBMETest1
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
// 11
// 11
{
"insert:MBME2-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
true
,
insertMBMETest2
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME2-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
true
,
insertMBMETest2
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME2-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBMETest2
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME2-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBMETest2
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME2-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBMETest2
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME2-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBMETest2
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME3-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
true
,
insertMBMETest3
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME3-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
true
,
insertMBMETest3
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME3-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBMETest3
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME3-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBMETest3
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME3-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBMETest3
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME3-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBMETest3
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME4-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
true
,
insertMBMETest4
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME4-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
true
,
insertMBMETest4
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME4-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBMETest4
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME4-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBMETest4
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME4-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMBMETest4
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
{
"insert:MBME4-C002"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMBMETest4
,
10
,
10
,
2
,
2
,
0
,
0
,
1
,
-
1
},
{
"insert:MPME1-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
true
,
insertMPMETest1
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MPME1-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
true
,
insertMPMETest1
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:MPME1-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
insertMPMETest1
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
{
"insert:MPME1-C012"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
0
,
false
,
false
,
insertMPMETest1
,
10
,
10
,
2
,
12
,
0
,
0
,
1
,
-
1
},
// 22
// 22
{
"insert:AUTO1-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
1
,
true
,
insertAUTOTest1
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:AUTO1-FULL"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
1
,
false
,
true
,
insertAUTOTest1
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:AUTO1-TBEXISTS"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
3
,
true
,
insertAUTOTest2
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
{
"insert:AUTO2-TBEXISTS"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_INSERT
,
3
,
false
,
true
,
insertAUTOTest2
,
10
,
10
,
2
,
0
,
0
,
0
,
1
,
-
1
},
// {"insert:AUTO3-NTB", tListLen(fullColList), fullColList, TTYPE_INSERT, 0, true, true, insertAUTOTest3, 10, 10, 2, 0, 0, 0, 1, -1},
{
"query:SUBT-COLUMN"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_QUERY
,
0
,
false
,
queryColumnTest
,
10
,
10
,
1
,
3
,
0
,
0
,
1
,
2
},
{
"query:SUBT-COLUMN"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_QUERY
,
0
,
false
,
false
,
queryColumnTest
,
10
,
10
,
1
,
3
,
0
,
0
,
1
,
2
},
{
"query:SUBT-MISC"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_QUERY
,
0
,
false
,
queryMiscTest
,
10
,
10
,
1
,
3
,
0
,
0
,
1
,
2
},
{
"query:SUBT-MISC"
,
tListLen
(
fullColList
),
fullColList
,
TTYPE_QUERY
,
0
,
false
,
false
,
queryMiscTest
,
10
,
10
,
1
,
3
,
0
,
0
,
1
,
2
},
// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2},
// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false,
false,
queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2},
// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2},
// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, 0, false,
false,
queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2},
};
};
...
@@ -232,7 +238,7 @@ CaseCtrl gCaseCtrl = {
...
@@ -232,7 +238,7 @@ CaseCtrl gCaseCtrl = {
.numericParam = false,
.numericParam = false,
.rowNum = 0,
.rowNum = 0,
.bindColNum = 0,
.bindColNum = 0,
.bindTagNum =
14
,
.bindTagNum =
0
,
.bindRowNum = 0,
.bindRowNum = 0,
.bindColTypeNum = 0,
.bindColTypeNum = 0,
.bindColTypeList = NULL,
.bindColTypeList = NULL,
...
@@ -244,7 +250,7 @@ CaseCtrl gCaseCtrl = {
...
@@ -244,7 +250,7 @@ CaseCtrl gCaseCtrl = {
.funcIdxList = NULL,
.funcIdxList = NULL,
.checkParamNum = false,
.checkParamNum = false,
.runTimes = 0,
.runTimes = 0,
.caseIdx = 2
3
,
.caseIdx = 2
4
,
.caseNum = 1,
.caseNum = 1,
.caseRunIdx = -1,
.caseRunIdx = -1,
.caseRunNum = -1,
.caseRunNum = -1,
...
@@ -382,7 +388,11 @@ bool colExists(TAOS_MULTI_BIND* pBind, int32_t dataType) {
...
@@ -382,7 +388,11 @@ bool colExists(TAOS_MULTI_BIND* pBind, int32_t dataType) {
void
generateInsertSQL
(
BindData
*
data
)
{
void
generateInsertSQL
(
BindData
*
data
)
{
int32_t
len
=
0
;
int32_t
len
=
0
;
if
(
gCurCase
->
tblNum
>
1
)
{
if
(
gCurCase
->
tblNum
>
1
)
{
len
=
sprintf
(
data
->
sql
,
"insert into ? "
);
if
(
data
->
singleTbInsert
)
{
len
=
sprintf
(
data
->
sql
,
"insert into %s%d "
,
bpTbPrefix
,
data
->
singleTbIdx
);
}
else
{
len
=
sprintf
(
data
->
sql
,
"insert into ? "
);
}
}
else
{
}
else
{
len
=
sprintf
(
data
->
sql
,
"insert into %s0 "
,
bpTbPrefix
);
len
=
sprintf
(
data
->
sql
,
"insert into %s0 "
,
bpTbPrefix
);
}
}
...
@@ -938,7 +948,14 @@ int32_t prepareInsertData(BindData *data) {
...
@@ -938,7 +948,14 @@ int32_t prepareInsertData(BindData *data) {
}
}
for
(
int32_t
i
=
0
;
i
<
allRowNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
allRowNum
;
++
i
)
{
data
->
tsData
[
i
]
=
bpTs
++
;
if
(
gCurCase
->
duplicateValue
)
{
data
->
tsData
[
i
]
=
bpTs
;
if
(
i
%
2
==
1
)
{
bpTs
++
;
}
}
else
{
data
->
tsData
[
i
]
=
bpTs
++
;
}
data
->
boolData
[
i
]
=
(
bool
)(
i
%
2
);
data
->
boolData
[
i
]
=
(
bool
)(
i
%
2
);
data
->
tinyData
[
i
]
=
(
int8_t
)
i
;
data
->
tinyData
[
i
]
=
(
int8_t
)
i
;
data
->
utinyData
[
i
]
=
(
uint8_t
)(
i
+
1
);
data
->
utinyData
[
i
]
=
(
uint8_t
)(
i
+
1
);
...
@@ -1251,6 +1268,9 @@ void bpCheckParamNum(TAOS_STMT *stmt) {
...
@@ -1251,6 +1268,9 @@ void bpCheckParamNum(TAOS_STMT *stmt) {
void
bpCheckAffectedRows
(
TAOS_STMT
*
stmt
,
int32_t
times
)
{
void
bpCheckAffectedRows
(
TAOS_STMT
*
stmt
,
int32_t
times
)
{
int32_t
rows
=
taos_stmt_affected_rows
(
stmt
);
int32_t
rows
=
taos_stmt_affected_rows
(
stmt
);
int32_t
insertNum
=
gCurCase
->
rowNum
*
gCurCase
->
tblNum
*
times
;
int32_t
insertNum
=
gCurCase
->
rowNum
*
gCurCase
->
tblNum
*
times
;
if
(
gCurCase
->
duplicateValue
)
{
insertNum
/=
2
;
}
if
(
insertNum
!=
rows
)
{
if
(
insertNum
!=
rows
)
{
printf
(
"!!!affected rows %d mis-match with insert num %d
\n
"
,
rows
,
insertNum
);
printf
(
"!!!affected rows %d mis-match with insert num %d
\n
"
,
rows
,
insertNum
);
exit
(
1
);
exit
(
1
);
...
@@ -2014,6 +2034,65 @@ int insertAUTOTest2(TAOS_STMT *stmt, TAOS *taos) {
...
@@ -2014,6 +2034,65 @@ int insertAUTOTest2(TAOS_STMT *stmt, TAOS *taos) {
return
0
;
return
0
;
}
}
/* normal table [prepare [bind add exec]] */
int
insertAUTOTest3
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
)
{
int32_t
loop
=
0
;
while
(
gCurCase
->
bindColNum
>
0
)
{
BindData
data
=
{
0
};
data
.
singleTbInsert
=
true
;
prepareInsertData
(
&
data
);
int32_t
bindTimes
=
gCurCase
->
rowNum
/
gCurCase
->
bindRowNum
;
for
(
int32_t
t
=
0
;
t
<
gCurCase
->
tblNum
;
++
t
)
{
data
.
singleTbIdx
=
t
;
generateInsertSQL
(
&
data
);
int
code
=
taos_stmt_prepare
(
stmt
,
data
.
sql
,
0
);
if
(
code
!=
0
){
printf
(
"!!!failed to execute taos_stmt_prepare. error:%s
\n
"
,
taos_stmt_errstr
(
stmt
));
exit
(
1
);
}
for
(
int32_t
b
=
0
;
b
<
bindTimes
;
++
b
)
{
bpCheckIsInsert
(
stmt
,
1
);
if
(
gCaseCtrl
.
checkParamNum
)
{
bpCheckParamNum
(
stmt
);
}
if
(
bpBindParam
(
stmt
,
data
.
pBind
+
t
*
bindTimes
*
gCurCase
->
bindColNum
+
b
*
gCurCase
->
bindColNum
))
{
exit
(
1
);
}
if
(
taos_stmt_add_batch
(
stmt
))
{
printf
(
"!!!taos_stmt_add_batch error:%s
\n
"
,
taos_stmt_errstr
(
stmt
));
exit
(
1
);
}
if
(
taos_stmt_execute
(
stmt
)
!=
0
)
{
printf
(
"!!!taos_stmt_execute error:%s
\n
"
,
taos_stmt_errstr
(
stmt
));
exit
(
1
);
}
}
}
bpCheckIsInsert
(
stmt
,
1
);
destroyData
(
&
data
);
gCurCase
->
bindColNum
-=
2
;
gCurCase
->
fullCol
=
false
;
loop
++
;
}
bpCheckAffectedRows
(
stmt
,
loop
);
gExecLoopTimes
=
loop
;
return
0
;
}
/* select * from table */
/* select * from table */
int
queryColumnTest
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
)
{
int
queryColumnTest
(
TAOS_STMT
*
stmt
,
TAOS
*
taos
)
{
...
@@ -2160,7 +2239,7 @@ void prepareCheckResult(TAOS *taos, bool silent) {
...
@@ -2160,7 +2239,7 @@ void prepareCheckResult(TAOS *taos, bool silent) {
sprintf
(
buf
,
"%s%d"
,
bpTbPrefix
,
0
);
sprintf
(
buf
,
"%s%d"
,
bpTbPrefix
,
0
);
}
}
prepareCheckResultImpl
(
taos
,
buf
,
gCaseCtrl
.
printRes
,
gCurCase
->
rowNum
*
gExecLoopTimes
,
silent
);
prepareCheckResultImpl
(
taos
,
buf
,
gCaseCtrl
.
printRes
,
gCurCase
->
duplicateValue
?
(
gCurCase
->
rowNum
*
gExecLoopTimes
/
2
)
:
(
gCurCase
->
rowNum
*
gExecLoopTimes
)
,
silent
);
}
}
gExecLoopTimes
=
1
;
gExecLoopTimes
=
1
;
...
@@ -2749,6 +2828,7 @@ void runAll(TAOS *taos) {
...
@@ -2749,6 +2828,7 @@ void runAll(TAOS *taos) {
printf
(
"%s Begin
\n
"
,
gCaseCtrl
.
caseCatalog
);
printf
(
"%s Begin
\n
"
,
gCaseCtrl
.
caseCatalog
);
runCaseList
(
taos
);
runCaseList
(
taos
);
#if 0
strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test");
strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test");
printf("%s Begin\n", gCaseCtrl.caseCatalog);
printf("%s Begin\n", gCaseCtrl.caseCatalog);
gCaseCtrl.precision = TIME_PRECISION_MICRO;
gCaseCtrl.precision = TIME_PRECISION_MICRO;
...
@@ -2805,6 +2885,8 @@ void runAll(TAOS *taos) {
...
@@ -2805,6 +2885,8 @@ void runAll(TAOS *taos) {
runCaseList(taos);
runCaseList(taos);
gCaseCtrl.bindColNum = 0;
gCaseCtrl.bindColNum = 0;
#endif
/*
/*
strcpy(gCaseCtrl.caseCatalog, "Bind Col Type Test");
strcpy(gCaseCtrl.caseCatalog, "Bind Col Type Test");
printf("%s Begin\n", gCaseCtrl.caseCatalog);
printf("%s Begin\n", gCaseCtrl.caseCatalog);
...
...
tests/script/tsim/vnode/replica3_repeat.sim
浏览文件 @
de4571fb
...
@@ -85,6 +85,7 @@ print ======== step3
...
@@ -85,6 +85,7 @@ print ======== step3
system sh/exec.sh -n dnode2 -s stop
system sh/exec.sh -n dnode2 -s stop
sleep 3000
sleep 3000
$t = 0
$x = 0
$x = 0
loop:
loop:
...
@@ -126,8 +127,8 @@ print ======== step8
...
@@ -126,8 +127,8 @@ print ======== step8
$lastRows = $data00
$lastRows = $data00
print ======== loop Times $x
print ======== loop Times $x
if $
x
< 2 then
if $
t
< 2 then
$
x = $x
+ 1
$
t = $t
+ 1
goto loop
goto loop
endi
endi
...
@@ -138,4 +139,4 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT
...
@@ -138,4 +139,4 @@ system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录