Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8e263537
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
8e263537
编写于
9月 06, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
9月 06, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16700 from taosdata/3.0
release: build 3.0.1.0
上级
3e8368e9
49326aa9
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
340 addition
and
201 deletion
+340
-201
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+2
-0
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+1
-0
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+14
-1
source/dnode/vnode/src/sma/smaCommit.c
source/dnode/vnode/src/sma/smaCommit.c
+16
-5
source/dnode/vnode/src/sma/smaFS.c
source/dnode/vnode/src/sma/smaFS.c
+13
-1
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+25
-15
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+133
-64
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+30
-23
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+12
-2
source/libs/function/src/tudf.c
source/libs/function/src/tudf.c
+9
-2
source/libs/index/src/indexComm.c
source/libs/index/src/indexComm.c
+6
-6
source/libs/index/src/indexFilter.c
source/libs/index/src/indexFilter.c
+33
-34
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+1
-1
source/libs/transport/src/thttp.c
source/libs/transport/src/thttp.c
+5
-5
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+26
-29
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+1
-1
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+11
-11
tests/script/tsim/parser/commit.sim
tests/script/tsim/parser/commit.sim
+1
-0
未找到文件。
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
8e263537
...
...
@@ -1592,6 +1592,8 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb,
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
if
(
i
==
0
)
{
colDataAppend
(
pColInfo
,
rows
,
buf
,
false
);
}
else
if
(
i
==
1
)
{
colDataAppend
(
pColInfo
,
rows
,
(
const
char
*
)
&
pDb
->
createdTime
,
false
);
}
else
if
(
i
==
3
)
{
colDataAppend
(
pColInfo
,
rows
,
(
const
char
*
)
&
numOfTables
,
false
);
}
else
if
(
i
==
14
)
{
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
8e263537
...
...
@@ -211,6 +211,7 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version);
void
tdRSmaFSClose
(
SRSmaFS
*
fs
);
int32_t
tdRSmaFSRef
(
SSma
*
pSma
,
SRSmaStat
*
pStat
,
int64_t
version
);
void
tdRSmaFSUnRef
(
SSma
*
pSma
,
SRSmaStat
*
pStat
,
int64_t
version
);
int64_t
tdRSmaFSMaxVer
(
SSma
*
pSma
,
SRSmaStat
*
pStat
);
int32_t
tdRSmaFSUpsertQTaskFile
(
SRSmaFS
*
pFS
,
SQTaskFile
*
qTaskFile
);
int32_t
tdRSmaRestore
(
SSma
*
pSma
,
int8_t
type
,
int64_t
committedVer
);
int32_t
tdRSmaProcessCreateImpl
(
SSma
*
pSma
,
SRSmaParam
*
param
,
int64_t
suid
,
const
char
*
tbName
);
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
8e263537
...
...
@@ -643,20 +643,33 @@ typedef struct {
TSDBROW
row
;
}
SRowInfo
;
typedef
struct
SSttBlockLoadInfo
{
SBlockData
blockData
[
2
];
SArray
*
aSttBlk
;
int32_t
blockIndex
[
2
];
// to denote the loaded block in the corresponding position.
int32_t
currentLoadBlockIndex
;
}
SSttBlockLoadInfo
;
typedef
struct
SMergeTree
{
int8_t
backward
;
SRBTree
rbt
;
SArray
*
pIterList
;
SLDataIter
*
pIter
;
bool
destroyLoadInfo
;
SSttBlockLoadInfo
*
pLoadInfo
;
}
SMergeTree
;
int32_t
tMergeTreeOpen
(
SMergeTree
*
pMTree
,
int8_t
backward
,
SDataFReader
*
pFReader
,
uint64_t
suid
,
uint64_t
uid
,
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
);
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
,
void
*
pLoadInfo
);
void
tMergeTreeAddIter
(
SMergeTree
*
pMTree
,
SLDataIter
*
pIter
);
bool
tMergeTreeNext
(
SMergeTree
*
pMTree
);
TSDBROW
tMergeTreeGetRow
(
SMergeTree
*
pMTree
);
void
tMergeTreeClose
(
SMergeTree
*
pMTree
);
SSttBlockLoadInfo
*
tCreateLastBlockLoadInfo
();
void
resetLastBlockLoadInfo
(
SSttBlockLoadInfo
*
pLoadInfo
);
void
*
destroyLastBlockLoadInfo
(
SSttBlockLoadInfo
*
pLoadInfo
);
// ========== inline functions ==========
static
FORCE_INLINE
int32_t
tsdbKeyCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
TSDBKEY
*
pKey1
=
(
TSDBKEY
*
)
p1
;
...
...
source/dnode/vnode/src/sma/smaCommit.c
浏览文件 @
8e263537
...
...
@@ -182,6 +182,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
SVnode
*
pVnode
=
pSma
->
pVnode
;
SRSmaFS
*
pFS
=
RSMA_FS
(
pStat
);
int64_t
committed
=
pStat
->
commitAppliedVer
;
int64_t
fsMaxVer
=
-
1
;
char
qTaskInfoFullName
[
TSDB_FILENAME_LEN
];
taosWLockLatch
(
RSMA_FS_LOCK
(
pStat
));
...
...
@@ -204,10 +205,20 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
++
i
;
}
SQTaskFile
qFile
=
{.
nRef
=
1
,
.
padding
=
0
,
.
version
=
committed
,
.
size
=
0
};
if
(
tdRSmaFSUpsertQTaskFile
(
pFS
,
&
qFile
)
<
0
)
{
taosWUnLockLatch
(
RSMA_FS_LOCK
(
pStat
));
return
TSDB_CODE_FAILED
;
if
(
taosArrayGetSize
(
pFS
->
aQTaskInf
)
>
0
)
{
fsMaxVer
=
((
SQTaskFile
*
)
taosArrayGetLast
(
pFS
->
aQTaskInf
))
->
version
;
}
if
(
fsMaxVer
<
committed
)
{
SQTaskFile
qFile
=
{.
nRef
=
1
,
.
padding
=
0
,
.
version
=
committed
,
.
size
=
0
};
if
(
taosArrayPush
(
pFS
->
aQTaskInf
,
&
qFile
)
<
0
)
{
taosWUnLockLatch
(
RSMA_FS_LOCK
(
pStat
));
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
}
else
{
smaDebug
(
"vgId:%d, update qinf, no need as committed %"
PRIi64
" not larger than fsMaxVer %"
PRIi64
,
TD_VID
(
pVnode
),
committed
,
fsMaxVer
);
}
taosWUnLockLatch
(
RSMA_FS_LOCK
(
pStat
));
...
...
@@ -365,7 +376,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
return
TSDB_CODE_SUCCESS
;
}
SRSmaStat
*
pRSmaStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
SRSmaStat
*
pRSmaStat
=
(
SRSmaStat
*
)
SMA_ENV_STAT
(
pEnv
);
// step 1: merge qTaskInfo and iQTaskInfo
// lock
...
...
source/dnode/vnode/src/sma/smaFS.c
浏览文件 @
8e263537
...
...
@@ -49,7 +49,7 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) {
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
output
);
++
i
)
{
int32_t
vid
=
0
;
int64_t
version
=
-
1
;
sscanf
((
const
char
*
)
taosArrayGetP
(
output
,
i
),
"v%dqinf
o
.v%"
PRIi64
,
&
vid
,
&
version
);
sscanf
((
const
char
*
)
taosArrayGetP
(
output
,
i
),
"v%dqinf.v%"
PRIi64
,
&
vid
,
&
version
);
SQTaskFile
qTaskFile
=
{.
version
=
version
,
.
nRef
=
1
};
if
((
terrno
=
tdRSmaFSUpsertQTaskFile
(
RSMA_FS
(
pStat
),
&
qTaskFile
))
<
0
)
{
goto
_end
;
...
...
@@ -96,6 +96,18 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
return
oldVal
;
}
int64_t
tdRSmaFSMaxVer
(
SSma
*
pSma
,
SRSmaStat
*
pStat
)
{
SArray
*
aQTaskInf
=
RSMA_FS
(
pStat
)
->
aQTaskInf
;
int64_t
version
=
-
1
;
taosRLockLatch
(
RSMA_FS_LOCK
(
pStat
));
if
(
taosArrayGetSize
(
aQTaskInf
)
>
0
)
{
version
=
((
SQTaskFile
*
)
taosArrayGetLast
(
aQTaskInf
))
->
version
;
}
taosRUnLockLatch
(
RSMA_FS_LOCK
(
pStat
));
return
version
;
}
void
tdRSmaFSUnRef
(
SSma
*
pSma
,
SRSmaStat
*
pStat
,
int64_t
version
)
{
SVnode
*
pVnode
=
pSma
->
pVnode
;
SArray
*
aQTaskInf
=
RSMA_FS
(
pStat
)
->
aQTaskInf
;
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
8e263537
...
...
@@ -1342,29 +1342,31 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
return
TSDB_CODE_FAILED
;
}
if
(
tdReadTFile
(
pTFile
,
pIter
->
q
Buf
,
nBytes
)
!=
nBytes
)
{
if
(
tdReadTFile
(
pTFile
,
pIter
->
p
Buf
,
nBytes
)
!=
nBytes
)
{
return
TSDB_CODE_FAILED
;
}
int32_t
infoLen
=
0
;
taosDecodeFixedI32
(
pIter
->
q
Buf
,
&
infoLen
);
taosDecodeFixedI32
(
pIter
->
p
Buf
,
&
infoLen
);
if
(
infoLen
>
nBytes
)
{
if
(
infoLen
<=
RSMA_QTASKINFO_BUFSIZE
)
{
terrno
=
TSDB_CODE_RSMA_FILE_CORRUPTED
;
smaError
(
"iterate rsma qtaskinfo file %s failed since %s"
,
TD_TFILE_FULL_NAME
(
pIter
->
pTFile
),
terrstr
());
return
TSDB_CODE_FAILED
;
}
pIter
->
nAlloc
=
infoLen
;
void
*
pBuf
=
taosMemoryRealloc
(
pIter
->
pBuf
,
infoLen
);
if
(
!
pBuf
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
if
(
pIter
->
nAlloc
<
infoLen
)
{
pIter
->
nAlloc
=
infoLen
;
void
*
pBuf
=
taosMemoryRealloc
(
pIter
->
pBuf
,
infoLen
);
if
(
!
pBuf
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
pIter
->
pBuf
=
pBuf
;
}
pIter
->
pBuf
=
pBuf
;
pIter
->
qBuf
=
pIter
->
pBuf
;
nBytes
=
infoLen
;
if
(
tdSeekTFile
(
pTFile
,
pIter
->
offset
,
SEEK_SET
))
{
if
(
tdSeekTFile
(
pTFile
,
pIter
->
offset
,
SEEK_SET
)
<
0
)
{
return
TSDB_CODE_FAILED
;
}
...
...
@@ -1373,6 +1375,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
}
}
pIter
->
qBuf
=
pIter
->
pBuf
;
pIter
->
offset
+=
nBytes
;
pIter
->
nBytes
=
nBytes
;
pIter
->
nBufPos
=
0
;
...
...
@@ -1450,17 +1453,24 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
return
TSDB_CODE_SUCCESS
;
}
int64_t
fsMaxVer
=
tdRSmaFSMaxVer
(
pSma
,
pRSmaStat
);
if
(
pRSmaStat
->
commitAppliedVer
<=
fsMaxVer
)
{
smaDebug
(
"vgId:%d, rsma persist, no need as applied %"
PRIi64
" not larger than fsMaxVer %"
PRIi64
,
vid
,
pRSmaStat
->
commitAppliedVer
,
fsMaxVer
);
return
TSDB_CODE_SUCCESS
;
}
STFile
tFile
=
{
0
};
#if 0
if (pRSmaStat->commitAppliedVer > 0) {
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
smaError("vgId:%d, rsma persi
s
t, init %s failed since %s", vid, qTaskInfoFName, terrstr());
goto _err;
}
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
smaError("vgId:%d, rsma persi
s
t, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
goto _err;
}
smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));
...
...
@@ -1510,11 +1520,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
char
qTaskInfoFName
[
TSDB_FILENAME_LEN
];
tdRSmaQTaskInfoGetFileName
(
vid
,
pRSmaStat
->
commitAppliedVer
,
qTaskInfoFName
);
if
(
tdInitTFile
(
&
tFile
,
tfsGetPrimaryPath
(
pVnode
->
pTfs
),
qTaskInfoFName
)
<
0
)
{
smaError
(
"vgId:%d, rsma persit, init %s failed since %s"
,
vid
,
qTaskInfoFName
,
terrstr
());
smaError
(
"vgId:%d, rsma persi
s
t, init %s failed since %s"
,
vid
,
qTaskInfoFName
,
terrstr
());
goto
_err
;
}
if
(
tdCreateTFile
(
&
tFile
,
true
,
TD_FTYPE_RSMA_QTASKINFO
)
<
0
)
{
smaError
(
"vgId:%d, rsma persit, create %s failed since %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
terrstr
());
smaError
(
"vgId:%d, rsma persi
s
t, create %s failed since %s"
,
vid
,
TD_TFILE_FULL_NAME
(
&
tFile
),
terrstr
());
goto
_err
;
}
smaDebug
(
"vgId:%d, rsma, table %"
PRIi64
" serialize qTaskInfo, file %s created"
,
vid
,
pRSmaInfo
->
suid
,
...
...
@@ -1558,7 +1568,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
}
return
TSDB_CODE_SUCCESS
;
_err:
smaError
(
"vgId:%d, rsma persit failed since %s"
,
vid
,
terrstr
());
smaError
(
"vgId:%d, rsma persi
s
t failed since %s"
,
vid
,
terrstr
());
if
(
isFileCreated
)
{
tdRemoveTFile
(
&
tFile
);
tdDestroyTFile
(
&
tFile
);
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
8e263537
...
...
@@ -457,7 +457,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
tMergeTreeOpen
(
&
state
->
mergeTree
,
1
,
state
->
pDataFReader
,
state
->
suid
,
state
->
uid
,
&
(
STimeWindow
){.
skey
=
TSKEY_MIN
,
.
ekey
=
TSKEY_MAX
},
&
(
SVersionRange
){.
minVer
=
0
,
.
maxVer
=
UINT64_MAX
});
&
(
SVersionRange
){.
minVer
=
0
,
.
maxVer
=
UINT64_MAX
}
,
NULL
);
bool
hasVal
=
tMergeTreeNext
(
&
state
->
mergeTree
);
if
(
!
hasVal
)
{
state
->
state
=
SFSLASTNEXTROW_FILESET
;
...
...
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
浏览文件 @
8e263537
...
...
@@ -22,26 +22,106 @@ struct SLDataIter {
SDataFReader
*
pReader
;
int32_t
iStt
;
int8_t
backward
;
SArray
*
aSttBlk
;
int32_t
iSttBlk
;
SBlockData
bData
[
2
];
int32_t
loadIndex
;
int32_t
iRow
;
SRowInfo
rInfo
;
uint64_t
uid
;
STimeWindow
timeWindow
;
SVersionRange
verRange
;
SSttBlockLoadInfo
*
pBlockLoadInfo
;
};
static
SBlockData
*
getCurrentBlock
(
SLDataIter
*
pIter
)
{
return
&
pIter
->
bData
[
pIter
->
loadIndex
];
}
SSttBlockLoadInfo
*
tCreateLastBlockLoadInfo
()
{
SSttBlockLoadInfo
*
pLoadInfo
=
taosMemoryCalloc
(
TSDB_DEFAULT_STT_FILE
,
sizeof
(
SSttBlockLoadInfo
));
if
(
pLoadInfo
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
TSDB_DEFAULT_STT_FILE
;
++
i
)
{
pLoadInfo
[
i
].
blockIndex
[
0
]
=
-
1
;
pLoadInfo
[
i
].
blockIndex
[
1
]
=
-
1
;
pLoadInfo
[
i
].
currentLoadBlockIndex
=
1
;
int32_t
code
=
tBlockDataCreate
(
&
pLoadInfo
[
i
].
blockData
[
0
]);
if
(
code
)
{
terrno
=
code
;
}
code
=
tBlockDataCreate
(
&
pLoadInfo
[
i
].
blockData
[
1
]);
if
(
code
)
{
terrno
=
code
;
}
pLoadInfo
[
i
].
aSttBlk
=
taosArrayInit
(
4
,
sizeof
(
SSttBlk
));
}
return
pLoadInfo
;
}
void
resetLastBlockLoadInfo
(
SSttBlockLoadInfo
*
pLoadInfo
)
{
for
(
int32_t
i
=
0
;
i
<
TSDB_DEFAULT_STT_FILE
;
++
i
)
{
pLoadInfo
[
i
].
currentLoadBlockIndex
=
1
;
pLoadInfo
[
i
].
blockIndex
[
0
]
=
-
1
;
pLoadInfo
[
i
].
blockIndex
[
1
]
=
-
1
;
static
SBlockData
*
getNextBlock
(
SLDataIter
*
pIter
)
{
pIter
->
loadIndex
^=
1
;
return
getCurrentBlock
(
pIter
);
taosArrayClear
(
pLoadInfo
[
i
].
aSttBlk
);
}
}
void
*
destroyLastBlockLoadInfo
(
SSttBlockLoadInfo
*
pLoadInfo
)
{
for
(
int32_t
i
=
0
;
i
<
TSDB_DEFAULT_STT_FILE
;
++
i
)
{
pLoadInfo
[
i
].
currentLoadBlockIndex
=
1
;
pLoadInfo
[
i
].
blockIndex
[
0
]
=
-
1
;
pLoadInfo
[
i
].
blockIndex
[
1
]
=
-
1
;
tBlockDataDestroy
(
&
pLoadInfo
[
i
].
blockData
[
0
],
true
);
tBlockDataDestroy
(
&
pLoadInfo
[
i
].
blockData
[
1
],
true
);
taosArrayDestroy
(
pLoadInfo
[
i
].
aSttBlk
);
}
taosMemoryFree
(
pLoadInfo
);
return
NULL
;
}
static
SBlockData
*
loadBlockIfMissing
(
SLDataIter
*
pIter
)
{
int32_t
code
=
0
;
SSttBlockLoadInfo
*
pInfo
=
pIter
->
pBlockLoadInfo
;
if
(
pInfo
->
blockIndex
[
0
]
==
pIter
->
iSttBlk
)
{
return
&
pInfo
->
blockData
[
0
];
}
if
(
pInfo
->
blockIndex
[
1
]
==
pIter
->
iSttBlk
)
{
return
&
pInfo
->
blockData
[
1
];
}
pInfo
->
currentLoadBlockIndex
^=
1
;
if
(
pIter
->
pSttBlk
!=
NULL
)
{
// current block not loaded yet
code
=
tsdbReadSttBlock
(
pIter
->
pReader
,
pIter
->
iStt
,
pIter
->
pSttBlk
,
&
pInfo
->
blockData
[
pInfo
->
currentLoadBlockIndex
]);
tsdbDebug
(
"read last block, index:%d, last file index:%d"
,
pIter
->
iSttBlk
,
pIter
->
iStt
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_exit
;
}
pInfo
->
blockIndex
[
pInfo
->
currentLoadBlockIndex
]
=
pIter
->
iSttBlk
;
pIter
->
iRow
=
(
pIter
->
backward
)
?
pInfo
->
blockData
[
pInfo
->
currentLoadBlockIndex
].
nRow
:
-
1
;
}
return
&
pInfo
->
blockData
[
pInfo
->
currentLoadBlockIndex
];
_exit:
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
}
return
NULL
;
}
int32_t
tLDataIterOpen
(
struct
SLDataIter
**
pIter
,
SDataFReader
*
pReader
,
int32_t
iStt
,
int8_t
backward
,
uint64_t
suid
,
uint64_t
uid
,
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pRange
)
{
uint64_t
uid
,
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pRange
,
SSttBlockLoadInfo
*
pBlockLoadInfo
)
{
int32_t
code
=
0
;
*
pIter
=
taosMemoryCalloc
(
1
,
sizeof
(
SLDataIter
));
if
(
*
pIter
==
NULL
)
{
...
...
@@ -55,34 +135,22 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
(
*
pIter
)
->
backward
=
backward
;
(
*
pIter
)
->
verRange
=
*
pRange
;
(
*
pIter
)
->
timeWindow
=
*
pTimeWindow
;
(
*
pIter
)
->
aSttBlk
=
taosArrayInit
(
0
,
sizeof
(
SSttBlk
));
if
((
*
pIter
)
->
aSttBlk
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
code
=
tBlockDataCreate
(
&
(
*
pIter
)
->
bData
[
0
]);
if
(
code
)
{
goto
_exit
;
}
code
=
tBlockDataCreate
(
&
(
*
pIter
)
->
bData
[
1
]);
if
(
code
)
{
goto
_exit
;
}
code
=
tsdbReadSttBlk
(
pReader
,
iStt
,
(
*
pIter
)
->
aSttBlk
);
if
(
code
)
{
goto
_exit
;
(
*
pIter
)
->
pBlockLoadInfo
=
pBlockLoadInfo
;
if
(
taosArrayGetSize
(
pBlockLoadInfo
->
aSttBlk
)
==
0
)
{
code
=
tsdbReadSttBlk
(
pReader
,
iStt
,
pBlockLoadInfo
->
aSttBlk
);
if
(
code
)
{
goto
_exit
;
}
}
size_t
size
=
taosArrayGetSize
(
(
*
pIter
)
->
aSttBlk
);
size_t
size
=
taosArrayGetSize
(
pBlockLoadInfo
->
aSttBlk
);
// find the start block
int32_t
index
=
-
1
;
if
(
!
backward
)
{
// asc
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SSttBlk
*
p
=
taosArrayGet
(
(
*
pIter
)
->
aSttBlk
,
i
);
SSttBlk
*
p
=
taosArrayGet
(
pBlockLoadInfo
->
aSttBlk
,
i
);
if
(
p
->
suid
!=
suid
)
{
continue
;
}
...
...
@@ -94,7 +162,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
}
}
else
{
// desc
for
(
int32_t
i
=
size
-
1
;
i
>=
0
;
--
i
)
{
SSttBlk
*
p
=
taosArrayGet
(
(
*
pIter
)
->
aSttBlk
,
i
);
SSttBlk
*
p
=
taosArrayGet
(
pBlockLoadInfo
->
aSttBlk
,
i
);
if
(
p
->
suid
!=
suid
)
{
continue
;
}
...
...
@@ -108,7 +176,8 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
(
*
pIter
)
->
iSttBlk
=
index
;
if
(
index
!=
-
1
)
{
(
*
pIter
)
->
pSttBlk
=
taosArrayGet
((
*
pIter
)
->
aSttBlk
,
(
*
pIter
)
->
iSttBlk
);
(
*
pIter
)
->
pSttBlk
=
taosArrayGet
(
pBlockLoadInfo
->
aSttBlk
,
(
*
pIter
)
->
iSttBlk
);
(
*
pIter
)
->
iRow
=
((
*
pIter
)
->
backward
)
?
(
*
pIter
)
->
pSttBlk
->
nRow
:
-
1
;
}
_exit:
...
...
@@ -116,9 +185,6 @@ _exit:
}
void
tLDataIterClose
(
SLDataIter
*
pIter
)
{
tBlockDataDestroy
(
&
pIter
->
bData
[
0
],
1
);
tBlockDataDestroy
(
&
pIter
->
bData
[
1
],
1
);
taosArrayDestroy
(
pIter
->
aSttBlk
);
taosMemoryFree
(
pIter
);
}
...
...
@@ -127,9 +193,9 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
pIter
->
iSttBlk
+=
step
;
int32_t
index
=
-
1
;
size_t
size
=
taosArrayGetSize
(
pIter
->
aSttBlk
);
size_t
size
=
taosArrayGetSize
(
pIter
->
pBlockLoadInfo
->
aSttBlk
);
for
(
int32_t
i
=
pIter
->
iSttBlk
;
i
<
size
&&
i
>=
0
;
i
+=
step
)
{
SSttBlk
*
p
=
taosArrayGet
(
pIter
->
aSttBlk
,
i
);
SSttBlk
*
p
=
taosArrayGet
(
pIter
->
pBlockLoadInfo
->
aSttBlk
,
i
);
if
((
!
pIter
->
backward
)
&&
p
->
minUid
>
pIter
->
uid
)
{
break
;
}
...
...
@@ -169,7 +235,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
if
(
index
==
-
1
)
{
pIter
->
pSttBlk
=
NULL
;
}
else
{
pIter
->
pSttBlk
=
(
SSttBlk
*
)
taosArrayGet
(
pIter
->
aSttBlk
,
pIter
->
iSttBlk
);
pIter
->
pSttBlk
=
(
SSttBlk
*
)
taosArrayGet
(
pIter
->
pBlockLoadInfo
->
aSttBlk
,
pIter
->
iSttBlk
);
}
}
...
...
@@ -178,7 +244,8 @@ static void findNextValidRow(SLDataIter *pIter) {
bool
hasVal
=
false
;
int32_t
i
=
pIter
->
iRow
;
SBlockData
*
pBlockData
=
getCurrentBlock
(
pIter
);
SBlockData
*
pBlockData
=
loadBlockIfMissing
(
pIter
);
for
(;
i
<
pBlockData
->
nRow
&&
i
>=
0
;
i
+=
step
)
{
if
(
pBlockData
->
aUid
!=
NULL
)
{
...
...
@@ -238,19 +305,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
return
false
;
}
int32_t
iBlockL
=
pIter
->
iSttBlk
;
SBlockData
*
pBlockData
=
getCurrentBlock
(
pIter
);
if
(
pBlockData
->
nRow
==
0
&&
pIter
->
pSttBlk
!=
NULL
)
{
// current block not loaded yet
pBlockData
=
getNextBlock
(
pIter
);
code
=
tsdbReadSttBlock
(
pIter
->
pReader
,
pIter
->
iStt
,
pIter
->
pSttBlk
,
pBlockData
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_exit
;
}
pIter
->
iRow
=
(
pIter
->
backward
)
?
pBlockData
->
nRow
:
-
1
;
}
int32_t
iBlockL
=
pIter
->
iSttBlk
;
SBlockData
*
pBlockData
=
loadBlockIfMissing
(
pIter
);
pIter
->
iRow
+=
step
;
while
(
1
)
{
...
...
@@ -266,12 +322,8 @@ bool tLDataIterNextRow(SLDataIter *pIter) {
}
if
(
iBlockL
!=
pIter
->
iSttBlk
)
{
pBlockData
=
getNextBlock
(
pIter
);
code
=
tsdbReadSttBlock
(
pIter
->
pReader
,
pIter
->
iStt
,
pIter
->
pSttBlk
,
pBlockData
);
if
(
code
)
{
goto
_exit
;
}
pIter
->
iRow
=
pIter
->
backward
?
(
pBlockData
->
nRow
-
1
)
:
0
;
pBlockData
=
loadBlockIfMissing
(
pIter
);
pIter
->
iRow
+=
step
;
}
}
...
...
@@ -313,7 +365,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
}
int32_t
tMergeTreeOpen
(
SMergeTree
*
pMTree
,
int8_t
backward
,
SDataFReader
*
pFReader
,
uint64_t
suid
,
uint64_t
uid
,
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
)
{
STimeWindow
*
pTimeWindow
,
SVersionRange
*
pVerRange
,
void
*
pBlockLoadInfo
)
{
pMTree
->
backward
=
backward
;
pMTree
->
pIter
=
NULL
;
pMTree
->
pIterList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
...
...
@@ -322,21 +374,33 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
}
tRBTreeCreate
(
&
pMTree
->
rbt
,
tLDataIterCmprFn
);
int32_t
code
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SSttBlockLoadInfo
*
pLoadInfo
=
NULL
;
if
(
pBlockLoadInfo
==
NULL
)
{
if
(
pMTree
->
pLoadInfo
==
NULL
)
{
pMTree
->
destroyLoadInfo
=
true
;
pMTree
->
pLoadInfo
=
tCreateLastBlockLoadInfo
();
}
pLoadInfo
=
pMTree
->
pLoadInfo
;
}
else
{
pLoadInfo
=
pBlockLoadInfo
;
}
struct
SLDataIter
*
pIterList
[
TSDB_DEFAULT_STT_FILE
]
=
{
0
};
for
(
int32_t
i
=
0
;
i
<
pFReader
->
pSet
->
nSttF
;
++
i
)
{
// open all last file
code
=
tLDataIterOpen
(
&
pIterList
[
i
],
pFReader
,
i
,
pMTree
->
backward
,
suid
,
uid
,
pTimeWindow
,
pVerRange
);
struct
SLDataIter
*
pIter
=
NULL
;
code
=
tLDataIterOpen
(
&
pIter
,
pFReader
,
i
,
pMTree
->
backward
,
suid
,
uid
,
pTimeWindow
,
pVerRange
,
&
pLoadInfo
[
i
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_end
;
}
bool
hasVal
=
tLDataIterNextRow
(
pIter
List
[
i
]
);
bool
hasVal
=
tLDataIterNextRow
(
pIter
);
if
(
hasVal
)
{
taosArrayPush
(
pMTree
->
pIterList
,
&
pIter
List
[
i
]
);
tMergeTreeAddIter
(
pMTree
,
pIter
List
[
i
]
);
taosArrayPush
(
pMTree
->
pIterList
,
&
pIter
);
tMergeTreeAddIter
(
pMTree
,
pIter
);
}
else
{
tLDataIterClose
(
pIter
List
[
i
]
);
tLDataIterClose
(
pIter
);
}
}
...
...
@@ -393,4 +457,9 @@ void tMergeTreeClose(SMergeTree *pMTree) {
pMTree
->
pIterList
=
taosArrayDestroy
(
pMTree
->
pIterList
);
pMTree
->
pIter
=
NULL
;
if
(
pMTree
->
destroyLoadInfo
)
{
pMTree
->
pLoadInfo
=
destroyLastBlockLoadInfo
(
pMTree
->
pLoadInfo
);
pMTree
->
destroyLoadInfo
=
false
;
}
}
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
8e263537
...
...
@@ -17,8 +17,6 @@
#include "tsdb.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define ALL_ROWS_CHECKED_INDEX (INT16_MIN)
#define INITIAL_ROW_INDEX_VAL (-1)
typedef
enum
{
EXTERNAL_ROWS_PREV
=
0x1
,
...
...
@@ -88,6 +86,7 @@ typedef struct SLastBlockReader {
int32_t
order
;
uint64_t
uid
;
SMergeTree
mergeTree
;
SSttBlockLoadInfo
*
pInfo
;
}
SLastBlockReader
;
typedef
struct
SFilesetIter
{
...
...
@@ -226,13 +225,14 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
return
NULL
;
}
int32_t
step
=
ASCENDING_TRAVERSE
(
pTsdbReader
->
order
)
?
1
:-
1
;
for
(
int32_t
j
=
0
;
j
<
numOfTables
;
++
j
)
{
STableBlockScanInfo
info
=
{.
lastKey
=
0
,
.
uid
=
idList
[
j
].
uid
};
if
(
ASCENDING_TRAVERSE
(
pTsdbReader
->
order
))
{
info
.
lastKey
=
pTsdbReader
->
window
.
skey
-
step
;
int64_t
skey
=
pTsdbReader
->
window
.
skey
;
info
.
lastKey
=
(
skey
>
INT64_MIN
)
?
(
skey
-
1
)
:
skey
;
}
else
{
info
.
lastKey
=
pTsdbReader
->
window
.
ekey
-
step
;
int64_t
ekey
=
pTsdbReader
->
window
.
ekey
;
info
.
lastKey
=
(
ekey
<
INT64_MAX
)
?
(
ekey
+
1
)
:
ekey
;
}
taosHashPut
(
pTableMap
,
&
info
.
uid
,
sizeof
(
uint64_t
),
&
info
,
sizeof
(
info
));
...
...
@@ -319,8 +319,7 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap
}
// init file iterator
static
int32_t
initFilesetIterator
(
SFilesetIter
*
pIter
,
SArray
*
aDFileSet
,
STsdbReader
*
pReader
/*int32_t order, const char* idstr*/
)
{
static
int32_t
initFilesetIterator
(
SFilesetIter
*
pIter
,
SArray
*
aDFileSet
,
STsdbReader
*
pReader
)
{
size_t
numOfFileset
=
taosArrayGetSize
(
aDFileSet
);
pIter
->
index
=
ASCENDING_TRAVERSE
(
pReader
->
order
)
?
-
1
:
numOfFileset
;
...
...
@@ -345,6 +344,14 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet,
pLReader
->
uid
=
0
;
tMergeTreeClose
(
&
pLReader
->
mergeTree
);
if
(
pLReader
->
pInfo
==
NULL
)
{
pLReader
->
pInfo
=
tCreateLastBlockLoadInfo
();
if
(
pLReader
->
pInfo
==
NULL
)
{
tsdbDebug
(
"init fileset iterator failed, code:%s %s"
,
tstrerror
(
terrno
),
pReader
->
idStr
);
return
terrno
;
}
}
tsdbDebug
(
"init fileset iterator, total files:%d %s"
,
pIter
->
numOfFiles
,
pReader
->
idStr
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -360,6 +367,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
pIter
->
pLastBlockReader
->
uid
=
0
;
tMergeTreeClose
(
&
pIter
->
pLastBlockReader
->
mergeTree
);
resetLastBlockLoadInfo
(
pIter
->
pLastBlockReader
->
pInfo
);
// check file the time range of coverage
STimeWindow
win
=
{
0
};
...
...
@@ -1377,7 +1385,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
STableBlockScanInfo
*
pBlockScanInfo
,
SBlockData
*
pBlockData
,
bool
mergeBlockData
)
{
SFileBlockDumpInfo
*
pDumpInfo
=
&
pReader
->
status
.
fBlockDumpInfo
;
// SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t
tsLastBlock
=
getCurrentKeyInLastBlock
(
pLastBlockReader
);
STSRow
*
pTSRow
=
NULL
;
...
...
@@ -1866,36 +1873,35 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
}
}
static
bool
initLastBlockReader
(
SLastBlockReader
*
pLastBlockReader
,
STableBlockScanInfo
*
pBlockScanInfo
,
STsdbReader
*
pReader
)
{
static
bool
initLastBlockReader
(
SLastBlockReader
*
pLBlockReader
,
STableBlockScanInfo
*
pScanInfo
,
STsdbReader
*
pReader
)
{
// the last block reader has been initialized for this table.
if
(
pL
astBlockReader
->
uid
==
pBlock
ScanInfo
->
uid
)
{
if
(
pL
BlockReader
->
uid
==
p
ScanInfo
->
uid
)
{
return
true
;
}
if
(
pL
ast
BlockReader
->
uid
!=
0
)
{
tMergeTreeClose
(
&
pL
ast
BlockReader
->
mergeTree
);
if
(
pLBlockReader
->
uid
!=
0
)
{
tMergeTreeClose
(
&
pLBlockReader
->
mergeTree
);
}
initMemDataIterator
(
p
Block
ScanInfo
,
pReader
);
pL
astBlockReader
->
uid
=
pBlock
ScanInfo
->
uid
;
initMemDataIterator
(
pScanInfo
,
pReader
);
pL
BlockReader
->
uid
=
p
ScanInfo
->
uid
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pL
ast
BlockReader
->
order
)
?
1
:-
1
;
STimeWindow
w
=
pL
ast
BlockReader
->
window
;
if
(
ASCENDING_TRAVERSE
(
pL
ast
BlockReader
->
order
))
{
w
.
skey
=
p
Block
ScanInfo
->
lastKey
+
step
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pLBlockReader
->
order
)
?
1
:-
1
;
STimeWindow
w
=
pLBlockReader
->
window
;
if
(
ASCENDING_TRAVERSE
(
pLBlockReader
->
order
))
{
w
.
skey
=
pScanInfo
->
lastKey
+
step
;
}
else
{
w
.
ekey
=
p
Block
ScanInfo
->
lastKey
+
step
;
w
.
ekey
=
pScanInfo
->
lastKey
+
step
;
}
int32_t
code
=
tMergeTreeOpen
(
&
pL
astBlockReader
->
mergeTree
,
(
pLast
BlockReader
->
order
==
TSDB_ORDER_DESC
),
pReader
->
pFileReader
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
w
,
&
pLastBlockReader
->
verRange
);
tMergeTreeOpen
(
&
pL
BlockReader
->
mergeTree
,
(
pL
BlockReader
->
order
==
TSDB_ORDER_DESC
),
pReader
->
pFileReader
,
pReader
->
suid
,
pScanInfo
->
uid
,
&
w
,
&
pLBlockReader
->
verRange
,
pLBlockReader
->
pInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
false
;
}
return
nextRowFromLastBlocks
(
pL
astBlockReader
,
pBlock
ScanInfo
);
return
nextRowFromLastBlocks
(
pL
BlockReader
,
p
ScanInfo
);
}
static
int64_t
getCurrentKeyInLastBlock
(
SLastBlockReader
*
pLastBlockReader
)
{
...
...
@@ -3305,6 +3311,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
SFilesetIter
*
pFilesetIter
=
&
pReader
->
status
.
fileIter
;
if
(
pFilesetIter
->
pLastBlockReader
!=
NULL
)
{
tMergeTreeClose
(
&
pFilesetIter
->
pLastBlockReader
->
mergeTree
);
pFilesetIter
->
pLastBlockReader
->
pInfo
=
destroyLastBlockLoadInfo
(
pFilesetIter
->
pLastBlockReader
->
pInfo
);
taosMemoryFree
(
pFilesetIter
->
pLastBlockReader
);
}
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
8e263537
...
...
@@ -2795,6 +2795,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
// All null data column, return directly.
if
(
pInput
->
colDataAggIsSet
&&
(
pInput
->
pColumnDataAgg
[
0
]
->
numOfNull
==
pInput
->
totalRows
))
{
ASSERT
(
pInputCol
->
hasNull
==
true
);
// save selectivity value for column consisted of all null values
firstlastSaveTupleData
(
pCtx
->
pSrcBlock
,
pInput
->
startRowIndex
,
pCtx
,
pInfo
);
return
0
;
}
...
...
@@ -2871,7 +2873,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
}
}
#endif
if
(
numOfElems
==
0
)
{
// save selectivity value for column consisted of all null values
firstlastSaveTupleData
(
pCtx
->
pSrcBlock
,
pInput
->
startRowIndex
,
pCtx
,
pInfo
);
}
SET_VAL
(
pResInfo
,
numOfElems
,
1
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2892,6 +2897,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
// All null data column, return directly.
if
(
pInput
->
colDataAggIsSet
&&
(
pInput
->
pColumnDataAgg
[
0
]
->
numOfNull
==
pInput
->
totalRows
))
{
ASSERT
(
pInputCol
->
hasNull
==
true
);
// save selectivity value for column consisted of all null values
firstlastSaveTupleData
(
pCtx
->
pSrcBlock
,
pInput
->
startRowIndex
,
pCtx
,
pInfo
);
return
0
;
}
...
...
@@ -2952,7 +2959,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
}
}
#endif
if
(
numOfElems
==
0
)
{
// save selectivity value for column consisted of all null values
firstlastSaveTupleData
(
pCtx
->
pSrcBlock
,
pInput
->
startRowIndex
,
pCtx
,
pInfo
);
}
SET_VAL
(
pResInfo
,
numOfElems
,
1
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/function/src/tudf.c
浏览文件 @
8e263537
...
...
@@ -1183,7 +1183,9 @@ void onUdfcPipeClose(uv_handle_t *handle) {
QUEUE_REMOVE
(
&
task
->
procTaskQueue
);
uv_sem_post
(
&
task
->
taskSem
);
}
conn
->
session
->
udfUvPipe
=
NULL
;
if
(
conn
->
session
!=
NULL
)
{
conn
->
session
->
udfUvPipe
=
NULL
;
}
taosMemoryFree
(
conn
->
readBuf
.
buf
);
taosMemoryFree
(
conn
);
taosMemoryFree
((
uv_pipe_t
*
)
handle
);
...
...
@@ -1803,6 +1805,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
if
(
session
->
udfUvPipe
==
NULL
)
{
fnError
(
"tear down udf. pipe to udfd does not exist. udf name: %s"
,
session
->
udfName
);
taosMemoryFree
(
session
);
return
TSDB_CODE_UDF_PIPE_NO_PIPE
;
}
...
...
@@ -1821,7 +1824,11 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
udfcRunUdfUvTask
(
task
,
UV_TASK_DISCONNECT
);
fnInfo
(
"tear down udf. udf name: %s, udf func handle: %p"
,
session
->
udfName
,
handle
);
//TODO: synchronization refactor between libuv event loop and request thread
if
(
session
->
udfUvPipe
!=
NULL
&&
session
->
udfUvPipe
->
data
!=
NULL
)
{
SClientUvConn
*
conn
=
session
->
udfUvPipe
->
data
;
conn
->
session
=
NULL
;
}
taosMemoryFree
(
session
);
taosMemoryFree
(
task
);
...
...
source/libs/index/src/indexComm.c
浏览文件 @
8e263537
...
...
@@ -81,28 +81,28 @@ __compar_fn_t idxGetCompar(int8_t type) {
}
return
getComparFunc
(
type
,
0
);
}
static
TExeCond
tCompareLessThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareLessThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_LESS_THAN
,
a
,
b
,
type
);
}
static
TExeCond
tCompareLessEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareLessEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_LESS_EQUAL
,
a
,
b
,
type
);
}
static
TExeCond
tCompareGreaterThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareGreaterThan
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_GREATER_THAN
,
a
,
b
,
type
);
}
static
TExeCond
tCompareGreaterEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareGreaterEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_GREATER_EQUAL
,
a
,
b
,
type
);
}
static
TExeCond
tCompareContains
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareContains
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_TERM
,
a
,
b
,
type
);
}
static
TExeCond
tCompareEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
static
FORCE_INLINE
TExeCond
tCompareEqual
(
void
*
a
,
void
*
b
,
int8_t
type
)
{
__compar_fn_t
func
=
idxGetCompar
(
type
);
return
tCompare
(
func
,
QUERY_TERM
,
a
,
b
,
type
);
}
...
...
source/libs/index/src/indexFilter.c
浏览文件 @
8e263537
...
...
@@ -88,7 +88,7 @@ typedef struct SIFCtx {
SIndexMetaArg
arg
;
}
SIFCtx
;
static
int32_t
sifGetFuncFromSql
(
EOperatorType
src
,
EIndexQueryType
*
dst
)
{
static
FORCE_INLINE
int32_t
sifGetFuncFromSql
(
EOperatorType
src
,
EIndexQueryType
*
dst
)
{
if
(
src
==
OP_TYPE_GREATER_THAN
)
{
*
dst
=
QUERY_GREATER_THAN
;
}
else
if
(
src
==
OP_TYPE_GREATER_EQUAL
)
{
...
...
@@ -110,10 +110,9 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
}
typedef
int32_t
(
*
sif_func_t
)(
SIFParam
*
left
,
SIFParam
*
rigth
,
SIFParam
*
output
);
static
sif_func_t
sifNullFunc
=
NULL
;
static
void
sifFreeParam
(
SIFParam
*
param
)
{
static
FORCE_INLINE
void
sifFreeParam
(
SIFParam
*
param
)
{
if
(
param
==
NULL
)
return
;
taosArrayDestroy
(
param
->
result
);
...
...
@@ -123,7 +122,7 @@ static void sifFreeParam(SIFParam *param) {
param
->
pFilter
=
NULL
;
}
static
int32_t
sifGetOperParamNum
(
EOperatorType
ty
)
{
static
FORCE_INLINE
int32_t
sifGetOperParamNum
(
EOperatorType
ty
)
{
if
(
OP_TYPE_IS_NULL
==
ty
||
OP_TYPE_IS_NOT_NULL
==
ty
||
OP_TYPE_IS_TRUE
==
ty
||
OP_TYPE_IS_NOT_TRUE
==
ty
||
OP_TYPE_IS_FALSE
==
ty
||
OP_TYPE_IS_NOT_FALSE
==
ty
||
OP_TYPE_IS_UNKNOWN
==
ty
||
OP_TYPE_IS_NOT_UNKNOWN
==
ty
||
OP_TYPE_MINUS
==
ty
)
{
...
...
@@ -131,14 +130,14 @@ static int32_t sifGetOperParamNum(EOperatorType ty) {
}
return
2
;
}
static
int32_t
sifValidOp
(
EOperatorType
ty
)
{
static
FORCE_INLINE
int32_t
sifValidOp
(
EOperatorType
ty
)
{
if
((
ty
>=
OP_TYPE_ADD
&&
ty
<=
OP_TYPE_BIT_OR
)
||
(
ty
==
OP_TYPE_IN
||
ty
==
OP_TYPE_NOT_IN
)
||
(
ty
==
OP_TYPE_LIKE
||
ty
==
OP_TYPE_NOT_LIKE
||
ty
==
OP_TYPE_MATCH
||
ty
==
OP_TYPE_NMATCH
))
{
return
-
1
;
}
return
0
;
}
static
int32_t
sifValidColumn
(
SColumnNode
*
cn
)
{
static
FORCE_INLINE
int32_t
sifValidColumn
(
SColumnNode
*
cn
)
{
// add more check
if
(
cn
==
NULL
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
...
...
@@ -149,7 +148,7 @@ static int32_t sifValidColumn(SColumnNode *cn) {
return
TSDB_CODE_SUCCESS
;
}
static
SIdxFltStatus
sifMergeCond
(
ELogicConditionType
type
,
SIdxFltStatus
ls
,
SIdxFltStatus
rs
)
{
static
FORCE_INLINE
SIdxFltStatus
sifMergeCond
(
ELogicConditionType
type
,
SIdxFltStatus
ls
,
SIdxFltStatus
rs
)
{
// enh rule later
if
(
type
==
LOGIC_COND_TYPE_AND
)
{
if
(
ls
==
SFLT_NOT_INDEX
||
rs
==
SFLT_NOT_INDEX
)
{
...
...
@@ -167,7 +166,7 @@ static SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SI
return
SFLT_NOT_INDEX
;
}
static
int32_t
sifGetValueFromNode
(
SNode
*
node
,
char
**
value
)
{
static
FORCE_INLINE
int32_t
sifGetValueFromNode
(
SNode
*
node
,
char
**
value
)
{
// covert data From snode;
SValueNode
*
vn
=
(
SValueNode
*
)
node
;
...
...
@@ -205,7 +204,7 @@ static int32_t sifGetValueFromNode(SNode *node, char **value) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
sifInitJsonParam
(
SNode
*
node
,
SIFParam
*
param
,
SIFCtx
*
ctx
)
{
static
FORCE_INLINE
int32_t
sifInitJsonParam
(
SNode
*
node
,
SIFParam
*
param
,
SIFCtx
*
ctx
)
{
SOperatorNode
*
nd
=
(
SOperatorNode
*
)
node
;
assert
(
nodeType
(
node
)
==
QUERY_NODE_OPERATOR
);
SColumnNode
*
l
=
(
SColumnNode
*
)
nd
->
pLeft
;
...
...
@@ -355,30 +354,30 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
typedef
int
(
*
Filter
)(
void
*
a
,
void
*
b
,
int16_t
dtype
);
typedef
int
(
*
Filter
Func
)(
void
*
a
,
void
*
b
,
int16_t
dtype
);
int
sifGreaterThan
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
static
FORCE_INLINE
int
sifGreaterThan
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
__compar_fn_t
func
=
getComparFunc
(
dtype
,
0
);
return
tDoCompare
(
func
,
QUERY_GREATER_THAN
,
a
,
b
);
}
int
sifGreaterEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
static
FORCE_INLINE
int
sifGreaterEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
__compar_fn_t
func
=
getComparFunc
(
dtype
,
0
);
return
tDoCompare
(
func
,
QUERY_GREATER_EQUAL
,
a
,
b
);
}
int
sifLessEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
static
FORCE_INLINE
int
sifLessEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
__compar_fn_t
func
=
getComparFunc
(
dtype
,
0
);
return
tDoCompare
(
func
,
QUERY_LESS_EQUAL
,
a
,
b
);
}
int
sifLessThan
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
static
FORCE_INLINE
int
sifLessThan
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
__compar_fn_t
func
=
getComparFunc
(
dtype
,
0
);
return
(
int
)
tDoCompare
(
func
,
QUERY_LESS_THAN
,
a
,
b
);
}
int
sifEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
static
FORCE_INLINE
int
sifEqual
(
void
*
a
,
void
*
b
,
int16_t
dtype
)
{
__compar_fn_t
func
=
getComparFunc
(
dtype
,
0
);
//__compar_fn_t func = idxGetCompar(dtype);
return
(
int
)
tDoCompare
(
func
,
QUERY_TERM
,
a
,
b
);
}
static
F
ilter
sifGetFilterFunc
(
EIndexQueryType
type
,
bool
*
reverse
)
{
static
F
ORCE_INLINE
FilterFunc
sifGetFilterFunc
(
EIndexQueryType
type
,
bool
*
reverse
)
{
if
(
type
==
QUERY_LESS_EQUAL
||
type
==
QUERY_LESS_THAN
)
{
*
reverse
=
true
;
}
else
{
...
...
@@ -470,8 +469,8 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
indexMultiTermQueryAdd
(
mtm
,
tm
,
qtype
);
ret
=
indexJsonSearch
(
arg
->
ivtIdx
,
mtm
,
output
->
result
);
}
else
{
bool
reverse
;
Filter
filterFunc
=
sifGetFilterFunc
(
qtype
,
&
reverse
);
bool
reverse
;
Filter
Func
filterFunc
=
sifGetFilterFunc
(
qtype
,
&
reverse
);
SMetaFltParam
param
=
{.
suid
=
arg
->
suid
,
.
cid
=
left
->
colId
,
...
...
@@ -498,72 +497,72 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
return
ret
;
}
static
int32_t
sifLessThanFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifLessThanFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_LOWER_THAN
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifLessEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifLessEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_LOWER_EQUAL
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifGreaterThanFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifGreaterThanFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_GREATER_THAN
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifGreaterEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifGreaterEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_GREATER_EQUAL
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_EQUAL
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifNotEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifNotEqualFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_NOT_EQUAL
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifInFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifInFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_IN
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifNotInFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifNotInFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_NOT_IN
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifLikeFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifLikeFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_LIKE
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifNotLikeFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifNotLikeFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_NOT_LIKE
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifMatchFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifMatchFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_MATCH
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifNotMatchFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifNotMatchFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_NMATCH
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifJsonContains
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifJsonContains
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
int
id
=
OP_TYPE_JSON_CONTAINS
;
return
sifDoIndex
(
left
,
right
,
id
,
output
);
}
static
int32_t
sifJsonGetValue
(
SIFParam
*
left
,
SIFParam
*
rigth
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifJsonGetValue
(
SIFParam
*
left
,
SIFParam
*
rigth
,
SIFParam
*
output
)
{
// return 0
return
0
;
}
static
int32_t
sifDefaultFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
static
FORCE_INLINE
int32_t
sifDefaultFunc
(
SIFParam
*
left
,
SIFParam
*
right
,
SIFParam
*
output
)
{
// add more except
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
static
int32_t
sifGetOperFn
(
int32_t
funcId
,
sif_func_t
*
func
,
SIdxFltStatus
*
status
)
{
static
FORCE_INLINE
int32_t
sifGetOperFn
(
int32_t
funcId
,
sif_func_t
*
func
,
SIdxFltStatus
*
status
)
{
// impl later
*
status
=
SFLT_ACCURATE_INDEX
;
switch
(
funcId
)
{
...
...
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
8e263537
...
...
@@ -502,7 +502,7 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
i64
offset
;
int
ret
;
offset
=
pPage
->
pageSize
*
(
TDB_PAGE_PGNO
(
pPage
)
-
1
);
offset
=
(
i64
)
pPage
->
pageSize
*
(
TDB_PAGE_PGNO
(
pPage
)
-
1
);
if
(
tdbOsLSeek
(
pPager
->
fd
,
offset
,
SEEK_SET
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
...
...
source/libs/transport/src/thttp.c
浏览文件 @
8e263537
...
...
@@ -126,22 +126,22 @@ _OVER:
return
code
;
}
static
void
destroyHttpClient
(
SHttpClient
*
cli
)
{
static
FORCE_INLINE
void
destroyHttpClient
(
SHttpClient
*
cli
)
{
taosMemoryFree
(
cli
->
wbuf
);
taosMemoryFree
(
cli
->
rbuf
);
taosMemoryFree
(
cli
->
addr
);
taosMemoryFree
(
cli
);
}
static
void
clientCloseCb
(
uv_handle_t
*
handle
)
{
static
FORCE_INLINE
void
clientCloseCb
(
uv_handle_t
*
handle
)
{
SHttpClient
*
cli
=
handle
->
data
;
destroyHttpClient
(
cli
);
}
static
void
clientAllocBuffCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
static
FORCE_INLINE
void
clientAllocBuffCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
SHttpClient
*
cli
=
handle
->
data
;
buf
->
base
=
cli
->
rbuf
;
buf
->
len
=
HTTP_RECV_BUF_SIZE
;
}
static
void
clientRecvCb
(
uv_stream_t
*
handle
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
static
FORCE_INLINE
void
clientRecvCb
(
uv_stream_t
*
handle
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
SHttpClient
*
cli
=
handle
->
data
;
if
(
nread
<
0
)
{
uError
(
"http-report recv error:%s"
,
uv_err_name
(
nread
));
...
...
@@ -173,7 +173,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
uv_write
(
&
cli
->
req
,
(
uv_stream_t
*
)
&
cli
->
tcp
,
cli
->
wbuf
,
2
,
clientSentCb
);
}
static
int32_t
taosBuildDstAddr
(
const
char
*
server
,
uint16_t
port
,
struct
sockaddr_in
*
dest
)
{
static
FORCE_INLINE
int32_t
taosBuildDstAddr
(
const
char
*
server
,
uint16_t
port
,
struct
sockaddr_in
*
dest
)
{
uint32_t
ip
=
taosGetIpv4FromFqdn
(
server
);
if
(
ip
==
0xffffffff
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
8e263537
...
...
@@ -69,11 +69,9 @@ typedef struct SCliThrd {
SAsyncPool
*
asyncPool
;
uv_prepare_t
*
prepare
;
void
*
pool
;
// conn pool
// timer handles
SArray
*
timerList
;
// msg queue
queue
msg
;
TdThreadMutex
msgMtx
;
SDelayQueue
*
delayQueue
;
...
...
@@ -108,7 +106,7 @@ static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn
// static void cliTimeoutCb(uv_timer_t* handle);
// alloc buffer for recv
static
void
cliAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
static
FORCE_INLINE
void
cliAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
// callback after recv nbytes from socket
static
void
cliRecvCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
);
// callback after send data to socket
...
...
@@ -132,10 +130,10 @@ static void cliSend(SCliConn* pConn);
static
void
cliDestroyConnMsgs
(
SCliConn
*
conn
,
bool
destroy
);
// cli util func
static
bool
cliIsEpsetUpdated
(
int32_t
code
,
STransConnCtx
*
pCtx
);
static
void
cliMayCvtFqdnToIp
(
SEpSet
*
pEpSet
,
SCvtAddr
*
pCvtAddr
);
static
FORCE_INLINE
bool
cliIsEpsetUpdated
(
int32_t
code
,
STransConnCtx
*
pCtx
);
static
FORCE_INLINE
void
cliMayCvtFqdnToIp
(
SEpSet
*
pEpSet
,
SCvtAddr
*
pCvtAddr
);
static
int32_t
cliBuildExceptResp
(
SCliMsg
*
pMsg
,
STransMsg
*
resp
);
static
FORCE_INLINE
int32_t
cliBuildExceptResp
(
SCliMsg
*
pMsg
,
STransMsg
*
resp
);
// process data read from server, add decompress etc later
static
void
cliHandleResp
(
SCliConn
*
conn
);
...
...
@@ -150,12 +148,10 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd);
static
void
(
*
cliAsyncHandle
[])(
SCliMsg
*
pMsg
,
SCliThrd
*
pThrd
)
=
{
cliHandleReq
,
cliHandleQuit
,
cliHandleRelease
,
NULL
,
cliHandleUpdate
};
static
void
cliSendQuit
(
SCliThrd
*
thrd
);
static
void
destroyUserdata
(
STransMsg
*
userdata
);
static
int
cliRBChoseIdx
(
STrans
*
pTransInst
);
static
FORCE_INLINE
void
destroyUserdata
(
STransMsg
*
userdata
);
static
FORCE_INLINE
void
destroyCmsg
(
void
*
cmsg
);
static
FORCE_INLINE
int
cliRBChoseIdx
(
STrans
*
pTransInst
);
static
void
destroyCmsg
(
void
*
cmsg
);
static
void
transDestroyConnCtx
(
STransConnCtx
*
ctx
);
// thread obj
static
SCliThrd
*
createThrdObj
();
...
...
@@ -434,6 +430,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
if
(
pCtx
==
NULL
||
pCtx
->
pSem
==
NULL
)
{
if
(
transMsg
.
info
.
ahandle
==
NULL
)
{
if
(
REQUEST_NO_RESP
(
&
pMsg
->
msg
))
destroyCmsg
(
pMsg
);
once
=
true
;
continue
;
}
...
...
@@ -885,26 +882,23 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
}
return
conn
;
}
void
cliMayCvtFqdnToIp
(
SEpSet
*
pEpSet
,
SCvtAddr
*
pCvtAddr
)
{
FORCE_INLINE
void
cliMayCvtFqdnToIp
(
SEpSet
*
pEpSet
,
SCvtAddr
*
pCvtAddr
)
{
if
(
pCvtAddr
->
cvt
==
false
)
{
return
;
}
for
(
int
i
=
0
;
i
<
pEpSet
->
numOfEps
&&
pEpSet
->
numOfEps
==
1
;
i
++
)
{
if
(
strncmp
(
pEpSet
->
eps
[
i
].
fqdn
,
pCvtAddr
->
fqdn
,
TSDB_FQDN_LEN
)
==
0
)
{
memset
(
pEpSet
->
eps
[
i
].
fqdn
,
0
,
TSDB_FQDN_LEN
);
memcpy
(
pEpSet
->
eps
[
i
].
fqdn
,
pCvtAddr
->
ip
,
TSDB_FQDN_LEN
);
}
if
(
pEpSet
->
numOfEps
==
1
&&
strncmp
(
pEpSet
->
eps
[
0
].
fqdn
,
pCvtAddr
->
fqdn
,
TSDB_FQDN_LEN
)
==
0
)
{
memset
(
pEpSet
->
eps
[
0
].
fqdn
,
0
,
TSDB_FQDN_LEN
);
memcpy
(
pEpSet
->
eps
[
0
].
fqdn
,
pCvtAddr
->
ip
,
TSDB_FQDN_LEN
);
}
}
bool
cliIsEpsetUpdated
(
int32_t
code
,
STransConnCtx
*
pCtx
)
{
FORCE_INLINE
bool
cliIsEpsetUpdated
(
int32_t
code
,
STransConnCtx
*
pCtx
)
{
if
(
code
!=
0
)
return
false
;
if
(
pCtx
->
retryCnt
==
0
)
return
false
;
if
(
transEpSetIsEqual
(
&
pCtx
->
epSet
,
&
pCtx
->
origEpSet
))
return
false
;
return
true
;
}
int32_t
cliBuildExceptResp
(
SCliMsg
*
pMsg
,
STransMsg
*
pResp
)
{
FORCE_INLINE
int32_t
cliBuildExceptResp
(
SCliMsg
*
pMsg
,
STransMsg
*
pResp
)
{
if
(
pMsg
==
NULL
)
return
-
1
;
memset
(
pResp
,
0
,
sizeof
(
STransMsg
));
...
...
@@ -980,6 +974,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
return
;
}
}
STraceId
*
trace
=
&
pMsg
->
msg
.
info
.
traceId
;
tGTrace
(
"%s conn %p ready"
,
pTransInst
->
label
,
conn
);
}
static
void
cliAsyncCb
(
uv_async_t
*
handle
)
{
SAsyncItem
*
item
=
handle
->
data
;
...
...
@@ -1128,14 +1124,15 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
return
cli
;
}
static
void
destroyUserdata
(
STransMsg
*
userdata
)
{
FORCE_INLINE
void
destroyUserdata
(
STransMsg
*
userdata
)
{
if
(
userdata
->
pCont
==
NULL
)
{
return
;
}
transFreeMsg
(
userdata
->
pCont
);
userdata
->
pCont
=
NULL
;
}
static
void
destroyCmsg
(
void
*
arg
)
{
FORCE_INLINE
void
destroyCmsg
(
void
*
arg
)
{
SCliMsg
*
pMsg
=
arg
;
if
(
pMsg
==
NULL
)
{
return
;
...
...
@@ -1220,7 +1217,7 @@ void cliWalkCb(uv_handle_t* handle, void* arg) {
}
}
int
cliRBChoseIdx
(
STrans
*
pTransInst
)
{
FORCE_INLINE
int
cliRBChoseIdx
(
STrans
*
pTransInst
)
{
int8_t
index
=
pTransInst
->
index
;
if
(
pTransInst
->
numOfThreads
==
0
)
{
return
-
1
;
...
...
@@ -1230,7 +1227,7 @@ int cliRBChoseIdx(STrans* pTransInst) {
}
return
index
%
pTransInst
->
numOfThreads
;
}
static
void
doDelayTask
(
void
*
param
)
{
static
FORCE_INLINE
void
doDelayTask
(
void
*
param
)
{
STaskArg
*
arg
=
param
;
SCliMsg
*
pMsg
=
arg
->
param1
;
SCliThrd
*
pThrd
=
arg
->
param2
;
...
...
@@ -1264,13 +1261,13 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
transDQSched
(
pThrd
->
delayQueue
,
doDelayTask
,
arg
,
TRANS_RETRY_INTERVAL
);
}
void
cliCompareAndSwap
(
int8_t
*
val
,
int8_t
exp
,
int8_t
newVal
)
{
FORCE_INLINE
void
cliCompareAndSwap
(
int8_t
*
val
,
int8_t
exp
,
int8_t
newVal
)
{
if
(
*
val
!=
exp
)
{
*
val
=
newVal
;
}
}
bool
cliTryExtractEpSet
(
STransMsg
*
pResp
,
SEpSet
*
dst
)
{
FORCE_INLINE
bool
cliTryExtractEpSet
(
STransMsg
*
pResp
,
SEpSet
*
dst
)
{
if
((
pResp
==
NULL
||
pResp
->
info
.
hasEpSet
==
0
))
{
return
false
;
}
...
...
@@ -1300,7 +1297,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
if
(
pMsg
==
NULL
||
pMsg
->
ctx
==
NULL
)
{
t
Trace
(
"%s conn %p handle resp"
,
pTransInst
->
label
,
pConn
);
t
Debug
(
"%s conn %p handle resp"
,
pTransInst
->
label
,
pConn
);
pTransInst
->
cfp
(
pTransInst
->
parent
,
pResp
,
NULL
);
return
0
;
}
...
...
@@ -1402,7 +1399,7 @@ void transUnrefCliHandle(void* handle) {
cliDestroyConn
((
SCliConn
*
)
handle
,
true
);
}
}
SCliThrd
*
transGetWorkThrdFromHandle
(
int64_t
handle
,
bool
*
validHandle
)
{
static
FORCE_INLINE
SCliThrd
*
transGetWorkThrdFromHandle
(
int64_t
handle
,
bool
*
validHandle
)
{
SCliThrd
*
pThrd
=
NULL
;
SExHandle
*
exh
=
transAcquireExHandle
(
transGetRefMgt
(),
handle
);
if
(
exh
==
NULL
)
{
...
...
source/libs/transport/src/transComm.c
浏览文件 @
8e263537
...
...
@@ -424,7 +424,7 @@ void transQueueDestroy(STransQueue* queue) {
taosArrayDestroy
(
queue
->
q
);
}
static
int32_t
timeCompare
(
const
HeapNode
*
a
,
const
HeapNode
*
b
)
{
static
FORCE_INLINE
int32_t
timeCompare
(
const
HeapNode
*
a
,
const
HeapNode
*
b
)
{
SDelayTask
*
arg1
=
container_of
(
a
,
SDelayTask
,
node
);
SDelayTask
*
arg2
=
container_of
(
b
,
SDelayTask
,
node
);
if
(
arg1
->
execTime
>
arg2
->
execTime
)
{
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
8e263537
...
...
@@ -125,17 +125,17 @@ static void uvWorkAfterTask(uv_work_t* req, int status);
static
void
uvWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
);
static
void
uvFreeCb
(
uv_handle_t
*
handle
);
static
void
uvStartSendRespImpl
(
SSvrMsg
*
smsg
);
static
FORCE_INLINE
void
uvStartSendRespImpl
(
SSvrMsg
*
smsg
);
static
void
uvPrepareSendData
(
SSvrMsg
*
msg
,
uv_buf_t
*
wb
);
static
void
uvStartSendResp
(
SSvrMsg
*
msg
);
static
void
uvNotifyLinkBrokenToApp
(
SSvrConn
*
conn
);
static
void
destroySmsg
(
SSvrMsg
*
smsg
);
// check whether already read complete packet
static
SSvrConn
*
createConn
(
void
*
hThrd
);
static
void
destroyConn
(
SSvrConn
*
conn
,
bool
clear
/*clear handle or not*/
);
static
void
destroyConnRegArg
(
SSvrConn
*
conn
);
static
FORCE_INLINE
void
destroySmsg
(
SSvrMsg
*
smsg
);
static
FORCE_INLINE
SSvrConn
*
createConn
(
void
*
hThrd
);
static
FORCE_INLINE
void
destroyConn
(
SSvrConn
*
conn
,
bool
clear
/*clear handle or not*/
);
static
FORCE_INLINE
void
destroyConnRegArg
(
SSvrConn
*
conn
);
static
int
reallocConnRef
(
SSvrConn
*
conn
);
...
...
@@ -413,7 +413,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
wb
->
len
=
len
;
}
static
void
uvStartSendRespImpl
(
SSvrMsg
*
smsg
)
{
static
FORCE_INLINE
void
uvStartSendRespImpl
(
SSvrMsg
*
smsg
)
{
SSvrConn
*
pConn
=
smsg
->
pConn
;
if
(
pConn
->
broken
)
{
return
;
...
...
@@ -447,7 +447,7 @@ static void uvStartSendResp(SSvrMsg* smsg) {
return
;
}
static
void
destroySmsg
(
SSvrMsg
*
smsg
)
{
static
FORCE_INLINE
void
destroySmsg
(
SSvrMsg
*
smsg
)
{
if
(
smsg
==
NULL
)
{
return
;
}
...
...
@@ -812,7 +812,7 @@ void* transWorkerThread(void* arg) {
return
NULL
;
}
static
SSvrConn
*
createConn
(
void
*
hThrd
)
{
static
FORCE_INLINE
SSvrConn
*
createConn
(
void
*
hThrd
)
{
SWorkThrd
*
pThrd
=
hThrd
;
SSvrConn
*
pConn
=
(
SSvrConn
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSvrConn
));
...
...
@@ -842,7 +842,7 @@ static SSvrConn* createConn(void* hThrd) {
return
pConn
;
}
static
void
destroyConn
(
SSvrConn
*
conn
,
bool
clear
)
{
static
FORCE_INLINE
void
destroyConn
(
SSvrConn
*
conn
,
bool
clear
)
{
if
(
conn
==
NULL
)
{
return
;
}
...
...
@@ -854,7 +854,7 @@ static void destroyConn(SSvrConn* conn, bool clear) {
}
}
}
static
void
destroyConnRegArg
(
SSvrConn
*
conn
)
{
static
FORCE_INLINE
void
destroyConnRegArg
(
SSvrConn
*
conn
)
{
if
(
conn
->
regArg
.
init
==
1
)
{
transFreeMsg
(
conn
->
regArg
.
msg
.
pCont
);
conn
->
regArg
.
init
=
0
;
...
...
tests/script/tsim/parser/commit.sim
浏览文件 @
8e263537
...
...
@@ -97,6 +97,7 @@ while $loop <= $loops
endw
sql select count(*) from $stb
if $data00 != $totalNum then
print expect $totalNum , actual: $data00
return -1
endi
$loop = $loop + 1
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录