Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
14313882
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
14313882
编写于
6月 09, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
6月 09, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2209 from taosdata/feature/query
Feature/query
上级
0bfc1f61
c29ff049
变更
13
显示空白变更内容
内联
并排
Showing
13 changed file
with
399 addition
and
471 deletion
+399
-471
src/client/src/tscSecondaryMerge.c
src/client/src/tscSecondaryMerge.c
+11
-30
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+3
-3
src/common/src/tdataformat.c
src/common/src/tdataformat.c
+1
-2
src/kit/taosdemo/taosdemo.c
src/kit/taosdemo/taosdemo.c
+6
-4
src/mnode/src/mnodeProfile.c
src/mnode/src/mnodeProfile.c
+1
-1
src/mnode/src/mnodeShow.c
src/mnode/src/mnodeShow.c
+1
-1
src/query/inc/qfill.h
src/query/inc/qfill.h
+7
-7
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+8
-12
src/query/src/qfill.c
src/query/src/qfill.c
+36
-30
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+86
-155
src/util/inc/tcache.h
src/util/inc/tcache.h
+30
-27
src/util/src/tcache.c
src/util/src/tcache.c
+207
-194
src/util/tests/cacheTest.cpp
src/util/tests/cacheTest.cpp
+2
-5
未找到文件。
src/client/src/tscSecondaryMerge.c
浏览文件 @
14313882
...
...
@@ -324,7 +324,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
tfree
(
pReducer
->
discardData
);
tfree
(
pReducer
->
pResultBuf
);
tfree
(
pReducer
->
pFinalRes
);
// tfree(pReducer->pBufForInterpo);
tfree
(
pReducer
->
prevRowOfInput
);
pRes
->
code
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -363,7 +362,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
if
(
pQueryInfo
->
fillType
!=
TSDB_FILL_NONE
)
{
SFillColInfo
*
pFillCol
=
createFillColInfo
(
pQueryInfo
);
pReducer
->
pFillInfo
=
taosInitFillInfo
(
pQueryInfo
->
order
.
order
,
revisedSTime
,
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
,
4096
,
numOfCols
,
pQueryInfo
->
slidingTime
,
pQueryInfo
->
fillType
,
pFillCol
);
4096
,
numOfCols
,
pQueryInfo
->
slidingTime
,
pQueryInfo
->
slidingTimeUnit
,
tinfo
.
precision
,
pQueryInfo
->
fillType
,
pFillCol
);
}
int32_t
startIndex
=
pQueryInfo
->
fieldsInfo
.
numOfOutput
-
pQueryInfo
->
groupbyExpr
.
numOfGroupCols
;
...
...
@@ -494,7 +494,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tscTrace
(
"%p waiting for delete procedure, status: %d"
,
pSql
,
status
);
}
taosDestoryFillInfo
(
pLocalReducer
->
pFillInfo
);
pLocalReducer
->
pFillInfo
=
taosDestoryFillInfo
(
pLocalReducer
->
pFillInfo
);
if
(
pLocalReducer
->
pCtx
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryInfo
->
fieldsInfo
.
numOfOutput
;
++
i
)
{
...
...
@@ -980,8 +980,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
}
/* all output for current group are completed */
int32_t
totalRemainRows
=
taosGetNumOfResultWithFill
(
pFillInfo
,
rpoints
,
pFillInfo
->
slidingTime
,
actualETime
);
int32_t
totalRemainRows
=
getFilledNumOfRes
(
pFillInfo
,
actualETime
,
pLocalReducer
->
resColModel
->
capacity
);
if
(
totalRemainRows
<=
0
)
{
break
;
}
...
...
@@ -1267,13 +1266,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
SFillInfo
*
pFillInfo
=
pLocalReducer
->
pFillInfo
;
if
(
pFillInfo
!=
NULL
)
{
STableMetaInfo
*
pTableMetaInfo
=
tscGetTableMetaInfoFromCmd
(
pCmd
,
pCmd
->
clauseIndex
,
0
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
TSKEY
ekey
=
taosGetRevisedEndKey
(
pQueryInfo
->
window
.
ekey
,
pFillInfo
->
order
,
pFillInfo
->
slidingTime
,
pQueryInfo
->
slidingTimeUnit
,
tinfo
.
precision
);
taosFillSetStartInfo
(
pFillInfo
,
pResBuf
->
num
,
ekey
);
taosFillSetStartInfo
(
pFillInfo
,
pResBuf
->
num
,
pQueryInfo
->
window
.
ekey
);
taosFillCopyInputDataFromOneFilePage
(
pFillInfo
,
pResBuf
);
}
...
...
@@ -1327,23 +1320,15 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
SLocalReducer
*
pLocalReducer
=
pRes
->
pLocalReducer
;
SFillInfo
*
pFillInfo
=
pLocalReducer
->
pFillInfo
;
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
int8_t
p
=
tinfo
.
precision
;
if
(
pFillInfo
!=
NULL
&&
taosNumOfRemainRows
(
pFillInfo
)
>
0
)
{
assert
(
pQueryInfo
->
fillType
!=
TSDB_FILL_NONE
);
tFilePage
*
pFinalDataBuf
=
pLocalReducer
->
pResultBuf
;
int64_t
etime
=
*
(
int64_t
*
)(
pFinalDataBuf
->
data
+
TSDB_KEYSIZE
*
(
pFillInfo
->
numOfRows
-
1
));
int32_t
remain
=
taosNumOfRemainRows
(
pFillInfo
);
TSKEY
ekey
=
taosGetRevisedEndKey
(
etime
,
pQueryInfo
->
order
.
order
,
pQueryInfo
->
slidingTime
,
pQueryInfo
->
slidingTimeUnit
,
p
);
// the first column must be the timestamp column
int32_t
rows
=
taosGetNumOfResultWithFill
(
pFillInfo
,
remain
,
ekey
,
pLocalReducer
->
resColModel
->
capacity
);
if
(
rows
>
0
)
{
// do
interpo
int32_t
rows
=
getFilledNumOfRes
(
pFillInfo
,
etime
,
pLocalReducer
->
resColModel
->
capacity
);
if
(
rows
>
0
)
{
// do
fill gap
doFillResult
(
pSql
,
pLocalReducer
,
false
);
}
...
...
@@ -1362,10 +1347,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
bool
prevGroupCompleted
=
(
!
pLocalReducer
->
discard
)
&&
pLocalReducer
->
hasUnprocessedRow
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
if
((
isAllSourcesCompleted
(
pLocalReducer
)
&&
!
pLocalReducer
->
hasPrevRow
)
||
pLocalReducer
->
pLocalDataSrc
[
0
]
==
NULL
||
prevGroupCompleted
)
{
...
...
@@ -1373,9 +1355,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
if
(
pQueryInfo
->
fillType
!=
TSDB_FILL_NONE
)
{
int64_t
etime
=
(
pQueryInfo
->
window
.
skey
<
pQueryInfo
->
window
.
ekey
)
?
pQueryInfo
->
window
.
ekey
:
pQueryInfo
->
window
.
skey
;
etime
=
taosGetRevisedEndKey
(
etime
,
pQueryInfo
->
order
.
order
,
pQueryInfo
->
intervalTime
,
pQueryInfo
->
slidingTimeUnit
,
tinfo
.
precision
);
int32_t
rows
=
taosGetNumOfResultWithFill
(
pFillInfo
,
0
,
etime
,
pLocalReducer
->
resColModel
->
capacity
);
assert
(
pFillInfo
->
numOfRows
==
0
);
int32_t
rows
=
getFilledNumOfRes
(
pFillInfo
,
etime
,
pLocalReducer
->
resColModel
->
capacity
);
if
(
rows
>
0
)
{
// do interpo
doFillResult
(
pSql
,
pLocalReducer
,
true
);
}
...
...
src/client/src/tscSystem.c
浏览文件 @
14313882
...
...
@@ -144,11 +144,11 @@ void taos_init_imp() {
}
int64_t
refreshTime
=
tsTableMetaKeepTimer
;
refreshTime
=
refreshTime
>
2
?
2
:
refreshTime
;
refreshTime
=
refreshTime
<
1
?
1
:
refreshTime
;
refreshTime
=
refreshTime
>
10
?
10
:
refreshTime
;
refreshTime
=
refreshTime
<
1
0
?
10
:
refreshTime
;
if
(
tscCacheHandle
==
NULL
)
{
tscCacheHandle
=
taosCacheInit
(
tscTmr
,
refreshTime
);
tscCacheHandle
=
taosCacheInit
(
refreshTime
);
}
tscTrace
(
"client is initialized successfully"
);
...
...
src/common/src/tdataformat.c
浏览文件 @
14313882
...
...
@@ -273,8 +273,7 @@ void dataColSetNullAt(SDataCol *pCol, int index) {
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pCol
->
dataOff
[
index
]
=
pCol
->
len
;
char
*
ptr
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
len
);
varDataLen
(
ptr
)
=
(
pCol
->
type
==
TSDB_DATA_TYPE_BINARY
)
?
sizeof
(
char
)
:
TSDB_NCHAR_SIZE
;
setNull
(
varDataVal
(
ptr
),
pCol
->
type
,
pCol
->
bytes
);
setVardataNull
(
ptr
,
pCol
->
type
);
pCol
->
len
+=
varDataTLen
(
ptr
);
}
else
{
setNull
(
POINTER_SHIFT
(
pCol
->
pData
,
TYPE_BYTES
[
pCol
->
type
]
*
index
),
pCol
->
type
,
pCol
->
bytes
);
...
...
src/kit/taosdemo/taosdemo.c
浏览文件 @
14313882
...
...
@@ -710,7 +710,7 @@ void *readTable(void *sarg) {
int32_t
code
=
taos_errno
(
pSql
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"Failed to query:%s
\n
"
,
taos_errstr
(
taos
));
fprintf
(
stderr
,
"Failed to query:%s
\n
"
,
taos_errstr
(
pSql
));
taos_free_result
(
pSql
);
taos_close
(
taos
);
exit
(
EXIT_FAILURE
);
...
...
@@ -779,7 +779,7 @@ void *readMetric(void *sarg) {
int32_t
code
=
taos_errno
(
pSql
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"Failed to query:%s
\n
"
,
taos_errstr
(
taos
));
fprintf
(
stderr
,
"Failed to query:%s
\n
"
,
taos_errstr
(
pSql
));
taos_free_result
(
pSql
);
taos_close
(
taos
);
exit
(
1
);
...
...
@@ -818,7 +818,9 @@ void queryDB(TAOS *taos, char *command) {
}
if
(
i
==
0
)
{
fprintf
(
stderr
,
"Failed to run %s, reason: %s
\n
"
,
command
,
taos_errstr
(
taos
));
fprintf
(
stderr
,
"Failed to run %s, reason: %s
\n
"
,
command
,
taos_errstr
(
pSql
));
taos_free_result
(
pSql
);
taos_close
(
taos
);
exit
(
EXIT_FAILURE
);
}
...
...
@@ -914,7 +916,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
int64_t
tmp_time
=
tb_info
->
timestamp
;
if
(
code
<
0
)
{
fprintf
(
stderr
,
"failed to insert data %d:reason; %s
\n
"
,
code
,
taos_errstr
(
tb_info
->
tao
s
));
fprintf
(
stderr
,
"failed to insert data %d:reason; %s
\n
"
,
code
,
taos_errstr
(
re
s
));
exit
(
EXIT_FAILURE
);
}
...
...
src/mnode/src/mnodeProfile.c
浏览文件 @
14313882
...
...
@@ -67,7 +67,7 @@ int32_t mnodeInitProfile() {
mnodeAddWriteMsgHandle
(
TSDB_MSG_TYPE_CM_KILL_STREAM
,
mnodeProcessKillStreamMsg
);
mnodeAddWriteMsgHandle
(
TSDB_MSG_TYPE_CM_KILL_CONN
,
mnodeProcessKillConnectionMsg
);
tsMnodeConnCache
=
taosCacheInitWithCb
(
tsMnodeTmr
,
CONN_CHECK_TIME
,
mnodeFreeConn
);
tsMnodeConnCache
=
taosCacheInitWithCb
(
CONN_CHECK_TIME
,
mnodeFreeConn
);
return
0
;
}
...
...
src/mnode/src/mnodeShow.c
浏览文件 @
14313882
...
...
@@ -65,7 +65,7 @@ int32_t mnodeInitShow() {
mnodeAddReadMsgHandle
(
TSDB_MSG_TYPE_CM_CONNECT
,
mnodeProcessConnectMsg
);
mnodeAddReadMsgHandle
(
TSDB_MSG_TYPE_CM_USE_DB
,
mnodeProcessUseMsg
);
tsMnodeShowCache
=
taosCacheInitWithCb
(
tsMnodeTmr
,
10
,
mnodeFreeShowObj
);
tsMnodeShowCache
=
taosCacheInitWithCb
(
10
,
mnodeFreeShowObj
);
return
0
;
}
...
...
src/query/inc/qfill.h
浏览文件 @
14313882
...
...
@@ -50,7 +50,8 @@ typedef struct SFillInfo {
char
*
nextValues
;
// next row of data
char
**
pData
;
// original result data block involved in filling data
int32_t
capacityInRows
;
// data buffer size in rows
int8_t
slidingUnit
;
// sliding time unit
int8_t
precision
;
// time resoluation
SFillColInfo
*
pFillCol
;
// column info for fill operations
}
SFillInfo
;
...
...
@@ -61,12 +62,13 @@ typedef struct SPoint {
int64_t
taosGetIntervalStartTimestamp
(
int64_t
startTime
,
int64_t
slidingTime
,
char
timeUnit
,
int16_t
precision
);
SFillInfo
*
taosInitFillInfo
(
int32_t
order
,
TSKEY
skey
,
int32_t
numOfTags
,
int32_t
capacity
,
int32_t
numOfCols
,
int64_t
slidingTime
,
int32_t
fillType
,
SFillColInfo
*
pFillCol
);
SFillInfo
*
taosInitFillInfo
(
int32_t
order
,
TSKEY
skey
,
int32_t
numOfTags
,
int32_t
capacity
,
int32_t
numOfCols
,
int64_t
slidingTime
,
int8_t
slidingUnit
,
int8_t
precision
,
int32_t
fillType
,
SFillColInfo
*
pFillCol
);
void
taosResetFillInfo
(
SFillInfo
*
pFillInfo
,
TSKEY
startTimestamp
);
void
taosDestoryFillInfo
(
SFillInfo
*
pFillInfo
);
void
*
taosDestoryFillInfo
(
SFillInfo
*
pFillInfo
);
void
taosFillSetStartInfo
(
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
TSKEY
endKey
);
...
...
@@ -74,9 +76,7 @@ void taosFillCopyInputDataFromFilePage(SFillInfo* pFillInfo, tFilePage** pInput)
void
taosFillCopyInputDataFromOneFilePage
(
SFillInfo
*
pFillInfo
,
tFilePage
*
pInput
);
TSKEY
taosGetRevisedEndKey
(
TSKEY
ekey
,
int32_t
order
,
int64_t
timeInterval
,
int8_t
slidingTimeUnit
,
int8_t
precision
);
int64_t
taosGetNumOfResultWithFill
(
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
int64_t
ekey
,
int32_t
maxNumOfRows
);
int64_t
getFilledNumOfRes
(
SFillInfo
*
pFillInfo
,
int64_t
ekey
,
int32_t
maxNumOfRows
);
int32_t
taosNumOfRemainRows
(
SFillInfo
*
pFillInfo
);
...
...
src/query/src/qExecutor.c
浏览文件 @
14313882
...
...
@@ -1466,7 +1466,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tfree
(
pRuntimeEnv
->
pCtx
);
}
taosDestoryFillInfo
(
pRuntimeEnv
->
pFillInfo
);
pRuntimeEnv
->
pFillInfo
=
taosDestoryFillInfo
(
pRuntimeEnv
->
pFillInfo
);
destroyResultBuf
(
pRuntimeEnv
->
pResultBuf
,
pQInfo
);
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pQueryHandle
);
...
...
@@ -3557,9 +3557,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
* first result row in the actual result set will fill nothing.
*/
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
TSKEY
ekey
=
taosGetRevisedEndKey
(
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
,
pQuery
->
slidingTime
,
pQuery
->
slidingTimeUnit
,
pQuery
->
precision
);
int32_t
numOfTotal
=
taosGetNumOfResultWithFill
(
pFillInfo
,
remain
,
ekey
,
pQuery
->
rec
.
capacity
);
int32_t
numOfTotal
=
getFilledNumOfRes
(
pFillInfo
,
pQuery
->
window
.
ekey
,
pQuery
->
rec
.
capacity
);
return
numOfTotal
>
0
;
}
...
...
@@ -3601,7 +3599,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
}
}
int32_t
doFillGapsInResults
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
tFilePage
**
pDst
,
int32_t
numOfRows
,
int32_t
*
numOfInterpo
)
{
int32_t
doFillGapsInResults
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
tFilePage
**
pDst
,
int32_t
*
numOfInterpo
)
{
SQInfo
*
pQInfo
=
GET_QINFO_ADDR
(
pRuntimeEnv
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SFillInfo
*
pFillInfo
=
pRuntimeEnv
->
pFillInfo
;
...
...
@@ -4013,7 +4011,8 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
if
(
pQuery
->
fillType
!=
TSDB_FILL_NONE
&&
!
isPointInterpoQuery
(
pQuery
))
{
SFillColInfo
*
pColInfo
=
taosCreateFillColInfo
(
pQuery
);
pRuntimeEnv
->
pFillInfo
=
taosInitFillInfo
(
pQuery
->
order
.
order
,
0
,
0
,
pQuery
->
rec
.
capacity
,
pQuery
->
numOfOutput
,
pQuery
->
slidingTime
,
pQuery
->
fillType
,
pColInfo
);
pQuery
->
slidingTime
,
pQuery
->
slidingTimeUnit
,
pQuery
->
precision
,
pQuery
->
fillType
,
pColInfo
);
}
// todo refactor
...
...
@@ -4666,13 +4665,11 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
limitResults
(
pRuntimeEnv
);
break
;
}
else
{
TSKEY
ekey
=
taosGetRevisedEndKey
(
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
,
pQuery
->
slidingTime
,
pQuery
->
slidingTimeUnit
,
pQuery
->
precision
);
taosFillSetStartInfo
(
pRuntimeEnv
->
pFillInfo
,
pQuery
->
rec
.
rows
,
ekey
);
taosFillSetStartInfo
(
pRuntimeEnv
->
pFillInfo
,
pQuery
->
rec
.
rows
,
pQuery
->
window
.
ekey
);
taosFillCopyInputDataFromFilePage
(
pRuntimeEnv
->
pFillInfo
,
(
tFilePage
**
)
pQuery
->
sdata
);
numOfInterpo
=
0
;
pQuery
->
rec
.
rows
=
doFillGapsInResults
(
pRuntimeEnv
,
(
tFilePage
**
)
pQuery
->
sdata
,
pQuery
->
rec
.
rows
,
&
numOfInterpo
);
pQuery
->
rec
.
rows
=
doFillGapsInResults
(
pRuntimeEnv
,
(
tFilePage
**
)
pQuery
->
sdata
,
&
numOfInterpo
);
if
(
pQuery
->
rec
.
rows
>
0
||
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
limitResults
(
pRuntimeEnv
);
break
;
...
...
@@ -4704,8 +4701,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
*/
int32_t
numOfInterpo
=
0
;
int32_t
remain
=
taosNumOfRemainRows
(
pRuntimeEnv
->
pFillInfo
);
pQuery
->
rec
.
rows
=
doFillGapsInResults
(
pRuntimeEnv
,
(
tFilePage
**
)
pQuery
->
sdata
,
remain
,
&
numOfInterpo
);
pQuery
->
rec
.
rows
=
doFillGapsInResults
(
pRuntimeEnv
,
(
tFilePage
**
)
pQuery
->
sdata
,
&
numOfInterpo
);
if
(
pQuery
->
rec
.
rows
>
0
)
{
limitResults
(
pRuntimeEnv
);
...
...
src/query/src/qfill.c
浏览文件 @
14313882
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qfill.h"
#include "os.h"
#include "qfill.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "taosmsg.h"
...
...
@@ -58,7 +58,7 @@ int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, ch
}
SFillInfo
*
taosInitFillInfo
(
int32_t
order
,
TSKEY
skey
,
int32_t
numOfTags
,
int32_t
capacity
,
int32_t
numOfCols
,
int64_t
slidingTime
,
int32_t
fillType
,
SFillColInfo
*
pFillCol
)
{
int64_t
slidingTime
,
int
8_t
slidingUnit
,
int8_t
precision
,
int
32_t
fillType
,
SFillColInfo
*
pFillCol
)
{
if
(
fillType
==
TSDB_FILL_NONE
)
{
return
NULL
;
}
...
...
@@ -72,7 +72,9 @@ SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_
pFillInfo
->
pFillCol
=
pFillCol
;
pFillInfo
->
numOfTags
=
numOfTags
;
pFillInfo
->
numOfCols
=
numOfCols
;
pFillInfo
->
precision
=
precision
;
pFillInfo
->
slidingTime
=
slidingTime
;
pFillInfo
->
slidingUnit
=
slidingUnit
;
pFillInfo
->
pData
=
malloc
(
POINTER_BYTES
*
numOfCols
);
...
...
@@ -102,9 +104,9 @@ void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
pFillInfo
->
numOfTotal
=
0
;
}
void
taosDestoryFillInfo
(
SFillInfo
*
pFillInfo
)
{
void
*
taosDestoryFillInfo
(
SFillInfo
*
pFillInfo
)
{
if
(
pFillInfo
==
NULL
)
{
return
;
return
NULL
;
}
tfree
(
pFillInfo
->
prevValues
);
...
...
@@ -119,6 +121,15 @@ void taosDestoryFillInfo(SFillInfo* pFillInfo) {
tfree
(
pFillInfo
->
pFillCol
);
tfree
(
pFillInfo
);
return
NULL
;
}
static
TSKEY
taosGetRevisedEndKey
(
TSKEY
ekey
,
int32_t
order
,
int64_t
timeInterval
,
int8_t
slidingTimeUnit
,
int8_t
precision
)
{
if
(
order
==
TSDB_ORDER_ASC
)
{
return
ekey
;
}
else
{
return
taosGetIntervalStartTimestamp
(
ekey
,
timeInterval
,
slidingTimeUnit
,
precision
);
}
}
void
taosFillSetStartInfo
(
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
TSKEY
endKey
)
{
...
...
@@ -126,8 +137,10 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
return
;
}
pFillInfo
->
endKey
=
taosGetRevisedEndKey
(
endKey
,
pFillInfo
->
order
,
pFillInfo
->
slidingTime
,
pFillInfo
->
slidingUnit
,
pFillInfo
->
precision
);
pFillInfo
->
rowIdx
=
0
;
pFillInfo
->
endKey
=
endKey
;
pFillInfo
->
numOfRows
=
numOfRows
;
// ensure the space
...
...
@@ -165,36 +178,29 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, tFilePage* pInpu
}
}
TSKEY
taosGetRevisedEndKey
(
TSKEY
ekey
,
int32_t
order
,
int64_t
timeInterval
,
int8_t
slidingTimeUnit
,
int8_t
precision
)
{
if
(
order
==
TSDB_ORDER_ASC
)
{
return
ekey
;
}
else
{
return
taosGetIntervalStartTimestamp
(
ekey
,
timeInterval
,
slidingTimeUnit
,
precision
);
}
}
int64_t
getFilledNumOfRes
(
SFillInfo
*
pFillInfo
,
TSKEY
ekey
,
int32_t
maxNumOfRows
)
{
int64_t
*
tsList
=
(
int64_t
*
)
pFillInfo
->
pData
[
0
];
int32_t
numOfRows
=
taosNumOfRemainRows
(
pFillInfo
);
static
int32_t
taosGetTotalNumOfFilledRes
(
SFillInfo
*
pFillInfo
,
const
TSKEY
*
tsArray
,
int32_t
remain
,
int64_t
nInterval
,
int64_t
ekey
)
{
TSKEY
ekey1
=
taosGetRevisedEndKey
(
ekey
,
pFillInfo
->
order
,
pFillInfo
->
slidingTime
,
pFillInfo
->
slidingUnit
,
pFillInfo
->
precision
);
i
f
(
remain
>
0
)
{
// still fill gap within current data block, not generating data after the result set.
TSKEY
lastKey
=
tsArray
[
pFillInfo
->
numOfRows
-
1
];
int32_t
total
=
(
int32_t
)(
labs
(
lastKey
-
pFillInfo
->
start
)
/
nInterval
)
+
1
;
i
nt64_t
numOfRes
=
-
1
;
if
(
numOfRows
>
0
)
{
// still fill gap within current data block, not generating data after the result set.
TSKEY
lastKey
=
tsList
[
pFillInfo
->
numOfRows
-
1
]
;
assert
(
total
>=
remain
)
;
return
total
;
numOfRes
=
(
int64_t
)(
labs
(
lastKey
-
pFillInfo
->
start
)
/
pFillInfo
->
slidingTime
)
+
1
;
assert
(
numOfRes
>=
numOfRows
)
;
}
else
{
// reach the end of data
if
((
ekey
<
pFillInfo
->
start
&&
FILL_IS_ASC_FILL
(
pFillInfo
))
||
(
ekey
>
pFillInfo
->
start
&&
!
FILL_IS_ASC_FILL
(
pFillInfo
)))
{
if
((
ekey
1
<
pFillInfo
->
start
&&
FILL_IS_ASC_FILL
(
pFillInfo
))
||
(
ekey
1
>
pFillInfo
->
start
&&
!
FILL_IS_ASC_FILL
(
pFillInfo
)))
{
return
0
;
}
else
{
return
(
int32_t
)(
labs
(
ekey
-
pFillInfo
->
start
)
/
nInterval
)
+
1
;
}
else
{
// the numOfRes rows are all filled with specified policy
numOfRes
=
(
labs
(
ekey1
-
pFillInfo
->
start
)
/
pFillInfo
->
slidingTime
)
+
1
;
}
}
}
int64_t
taosGetNumOfResultWithFill
(
SFillInfo
*
pFillInfo
,
int32_t
numOfRows
,
int64_t
ekey
,
int32_t
maxNumOfRows
)
{
int32_t
numOfRes
=
taosGetTotalNumOfFilledRes
(
pFillInfo
,
(
int64_t
*
)
pFillInfo
->
pData
[
0
],
numOfRows
,
pFillInfo
->
slidingTime
,
ekey
);
return
(
numOfRes
>
maxNumOfRows
)
?
maxNumOfRows
:
numOfRes
;
}
...
...
@@ -496,8 +502,8 @@ int32_t generateDataBlockImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t nu
int64_t
taosGenerateDataBlock
(
SFillInfo
*
pFillInfo
,
tFilePage
**
output
,
int32_t
capacity
)
{
int32_t
remain
=
taosNumOfRemainRows
(
pFillInfo
);
// todo use iterator?
int32_t
rows
=
taosGetNumOfResultWithFill
(
pFillInfo
,
remain
,
pFillInfo
->
endKey
,
capacity
);
int32_t
rows
=
getFilledNumOfRes
(
pFillInfo
,
pFillInfo
->
endKey
,
capacity
);
int32_t
numOfRes
=
generateDataBlockImpl
(
pFillInfo
,
output
,
remain
,
rows
,
pFillInfo
->
pData
);
assert
(
numOfRes
==
rows
);
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
14313882
...
...
@@ -233,8 +233,6 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond*
STsdbQueryHandle
*
pQueryHandle
=
(
STsdbQueryHandle
*
)
tsdbQueryTables
(
tsdb
,
pCond
,
groupList
,
qinfo
);
pQueryHandle
->
type
=
TSDB_QUERY_TYPE_EXTERNAL
;
// pQueryHandle->outputCapacity = 2; // only allowed two rows to be loaded
changeQueryHandleForInterpQuery
(
pQueryHandle
);
return
pQueryHandle
;
}
...
...
@@ -618,54 +616,19 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
static
void
handleDataMergeIfNeeded
(
STsdbQueryHandle
*
pQueryHandle
,
SCompBlock
*
pBlock
,
STableCheckInfo
*
pCheckInfo
){
SQueryFilePos
*
cur
=
&
pQueryHandle
->
cur
;
SDataBlockInfo
binfo
=
getTrueDataBlockInfo
(
pCheckInfo
,
pBlock
);
/*bool hasData = */
initTableMemIterator
(
pQueryHandle
,
pCheckInfo
);
TSKEY
k1
=
TSKEY_INITIAL_VAL
,
k2
=
TSKEY_INITIAL_VAL
;
if
(
pCheckInfo
->
iter
!=
NULL
&&
tSkipListIterGet
(
pCheckInfo
->
iter
)
!=
NULL
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iter
);
SDataRow
row
=
SL_GET_NODE_DATA
(
node
);
k1
=
dataRowKey
(
row
);
if
(
k1
==
binfo
.
window
.
skey
)
{
if
(
tSkipListIterNext
(
pCheckInfo
->
iter
))
{
node
=
tSkipListIterGet
(
pCheckInfo
->
iter
);
row
=
SL_GET_NODE_DATA
(
node
);
k1
=
dataRowKey
(
row
);
}
else
{
k1
=
TSKEY_INITIAL_VAL
;
}
}
}
if
(
pCheckInfo
->
iiter
!=
NULL
&&
tSkipListIterGet
(
pCheckInfo
->
iiter
)
!=
NULL
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iiter
);
SDataRow
row
=
SL_GET_NODE_DATA
(
node
);
k2
=
dataRowKey
(
row
);
if
(
k2
==
binfo
.
window
.
skey
)
{
if
(
tSkipListIterNext
(
pCheckInfo
->
iiter
))
{
node
=
tSkipListIterGet
(
pCheckInfo
->
iiter
);
row
=
SL_GET_NODE_DATA
(
node
);
k2
=
dataRowKey
(
row
);
}
else
{
k2
=
TSKEY_INITIAL_VAL
;
}
}
}
/*bool hasData = */
initTableMemIterator
(
pQueryHandle
,
pCheckInfo
);
SDataRow
row
=
getSDataRowInTableMem
(
pCheckInfo
);
TSKEY
key
=
(
row
!=
NULL
)
?
dataRowKey
(
row
)
:
TSKEY_INITIAL_VAL
;
cur
->
pos
=
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
?
0
:
(
binfo
.
rows
-
1
);
if
((
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
((
k1
!=
TSKEY_INITIAL_VAL
&&
k1
<=
binfo
.
window
.
ekey
)
||
(
k2
!=
TSKEY_INITIAL_VAL
&&
k2
<=
binfo
.
window
.
ekey
)))
||
(
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
((
k1
!=
TSKEY_INITIAL_VAL
&&
k1
>=
binfo
.
window
.
skey
)
||
(
k2
!=
TSKEY_INITIAL_VAL
&&
k2
>=
binfo
.
window
.
skey
))))
{
if
((
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
<=
binfo
.
window
.
ekey
))
||
(
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
>=
binfo
.
window
.
skey
)))
{
if
((
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
<
binfo
.
window
.
skey
))
||
(
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
(
key
!=
TSKEY_INITIAL_VAL
&&
key
>
binfo
.
window
.
ekey
)))
{
if
((
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
((
k1
!=
TSKEY_INITIAL_VAL
&&
k1
<
binfo
.
window
.
skey
)
||
(
k2
!=
TSKEY_INITIAL_VAL
&&
k2
<
binfo
.
window
.
skey
)))
||
(
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
(((
k1
!=
TSKEY_INITIAL_VAL
&&
k1
>
binfo
.
window
.
skey
)
||
(
k2
!=
TSKEY_INITIAL_VAL
&&
k2
>
binfo
.
window
.
skey
)))))
{
// do not load file block into buffer
int32_t
step
=
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
?
1
:
-
1
;
...
...
@@ -756,7 +719,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
return
pQueryHandle
->
realNumOfRows
>
0
;
}
static
int
vnode
BinarySearchKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
)
{
static
int
do
BinarySearchKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
)
{
int
firstPos
,
lastPos
,
midPos
=
-
1
;
int
numOfRows
;
TSKEY
*
keyList
;
...
...
@@ -868,14 +831,21 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
return
numOfRows
+
num
;
}
static
void
copyOneRowFromMem
(
STsdbQueryHandle
*
pQueryHandle
,
STableCheckInfo
*
pCheckInfo
,
int32_t
capacity
,
int32_t
numOfRows
,
SDataRow
row
,
STSchema
*
pSchema
)
{
int32_t
numOfCols
=
taosArrayGetSize
(
pQueryHandle
->
pColumns
);
int32_t
numOfTableCols
=
schemaNCols
(
pSchema
);
static
void
copyOneRowFromMem
(
STsdbQueryHandle
*
pQueryHandle
,
int32_t
capacity
,
int32_t
numOfRows
,
SDataRow
row
,
STsdbMeta
*
pMeta
,
int32_t
numOfCols
,
STable
*
pTable
)
{
char
*
pData
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
// the schema version info is embeded in SDataRow
STSchema
*
pSchema
=
tsdbGetTableSchemaByVersion
(
pMeta
,
pTable
,
dataRowVersion
(
row
));
int32_t
numOfRowCols
=
schemaNCols
(
pSchema
);
int32_t
i
=
0
,
j
=
0
;
while
(
i
<
numOfCols
&&
j
<
numOfRowCols
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
if
(
pSchema
->
columns
[
j
].
colId
<
pColInfo
->
info
.
colId
)
{
j
++
;
continue
;
}
if
(
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
{
pData
=
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
...
...
@@ -883,22 +853,41 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* p
pData
=
pColInfo
->
pData
+
(
capacity
-
numOfRows
-
1
)
*
pColInfo
->
info
.
bytes
;
}
int32_t
offset
=
0
;
for
(
int32_t
j
=
0
;
j
<
numOfTableCols
;
++
j
)
{
if
(
pColInfo
->
info
.
colId
==
pSchema
->
columns
[
j
].
colId
)
{
offset
=
pSchema
->
columns
[
j
].
offset
;
break
;
if
(
pSchema
->
columns
[
j
].
colId
==
pColInfo
->
info
.
colId
)
{
void
*
value
=
tdGetRowDataOfCol
(
row
,
pColInfo
->
info
.
type
,
TD_DATA_ROW_HEAD_SIZE
+
pSchema
->
columns
[
j
].
offset
);
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_BINARY
||
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_NCHAR
)
{
memcpy
(
pData
,
value
,
varDataTLen
(
value
));
}
else
{
memcpy
(
pData
,
value
,
pColInfo
->
info
.
bytes
);
}
j
++
;
i
++
;
}
else
{
// pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_BINARY
||
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_NCHAR
)
{
setVardataNull
(
pData
,
pColInfo
->
info
.
type
);
}
else
{
setNull
(
pData
,
pColInfo
->
info
.
type
,
pColInfo
->
info
.
bytes
);
}
i
++
;
}
}
assert
(
offset
!=
-
1
);
// todo handle error
void
*
value
=
tdGetRowDataOfCol
(
row
,
pColInfo
->
info
.
type
,
TD_DATA_ROW_HEAD_SIZE
+
offset
);
while
(
i
<
numOfCols
)
{
// the remain columns are all null data
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
if
(
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
{
pData
=
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
}
else
{
pData
=
pColInfo
->
pData
+
(
capacity
-
numOfRows
-
1
)
*
pColInfo
->
info
.
bytes
;
}
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_BINARY
||
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_NCHAR
)
{
memcpy
(
pData
,
value
,
varDataTLen
(
value
)
);
setVardataNull
(
pData
,
pColInfo
->
info
.
type
);
}
else
{
memcpy
(
pData
,
valu
e
,
pColInfo
->
info
.
bytes
);
setNull
(
pData
,
pColInfo
->
info
.
typ
e
,
pColInfo
->
info
.
bytes
);
}
i
++
;
}
}
...
...
@@ -912,6 +901,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
initTableMemIterator
(
pQueryHandle
,
pCheckInfo
);
SDataCols
*
pCols
=
pQueryHandle
->
rhelper
.
pDataCols
[
0
];
// for search the endPos, so the order needs to reverse
int32_t
order
=
(
pQueryHandle
->
order
==
TSDB_ORDER_ASC
)
?
TSDB_ORDER_DESC
:
TSDB_ORDER_ASC
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
?
1
:-
1
;
int32_t
numOfCols
=
taosArrayGetSize
(
pQueryHandle
->
pColumns
);
STsdbMeta
*
pMeta
=
tsdbGetMeta
(
pQueryHandle
->
pTsdb
);
STable
*
pTable
=
pCheckInfo
->
pTableObj
;
int32_t
endPos
=
cur
->
pos
;
if
(
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
pQueryHandle
->
window
.
ekey
>
blockInfo
.
window
.
ekey
)
{
endPos
=
blockInfo
.
rows
-
1
;
...
...
@@ -920,8 +918,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
endPos
=
0
;
cur
->
mixBlock
=
(
cur
->
pos
!=
blockInfo
.
rows
-
1
);
}
else
{
int32_t
order
=
(
pQueryHandle
->
order
==
TSDB_ORDER_ASC
)
?
TSDB_ORDER_DESC
:
TSDB_ORDER_ASC
;
endPos
=
vnode
BinarySearchKey
(
pCols
->
cols
[
0
].
pData
,
pCols
->
numOfRows
,
pQueryHandle
->
window
.
ekey
,
order
);
assert
(
pCols
->
numOfRows
>
0
)
;
endPos
=
do
BinarySearchKey
(
pCols
->
cols
[
0
].
pData
,
pCols
->
numOfRows
,
pQueryHandle
->
window
.
ekey
,
order
);
cur
->
mixBlock
=
true
;
}
...
...
@@ -933,7 +931,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t
numOfRows
=
0
;
pQueryHandle
->
cur
.
win
=
TSWINDOW_INITIALIZER
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
?
1
:-
1
;
// no data in buffer, load data from file directly
if
(
pCheckInfo
->
iiter
==
NULL
&&
pCheckInfo
->
iter
==
NULL
)
{
...
...
@@ -953,9 +950,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if
(
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
&&
numOfRows
<
pQueryHandle
->
outputCapacity
)
{
int32_t
emptySize
=
pQueryHandle
->
outputCapacity
-
numOfRows
;
int32_t
reqNumOfCols
=
taosArrayGetSize
(
pQueryHandle
->
pColumns
);
for
(
int32_t
i
=
0
;
i
<
reqN
umOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
n
umOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
memmove
(
pColInfo
->
pData
,
pColInfo
->
pData
+
emptySize
*
pColInfo
->
info
.
bytes
,
numOfRows
*
pColInfo
->
info
.
bytes
);
}
...
...
@@ -969,19 +965,14 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
pQueryHandle
->
realNumOfRows
=
numOfRows
;
cur
->
rows
=
numOfRows
;
return
;
}
else
if
(
pCheckInfo
->
iter
!=
NULL
&&
pCheckInfo
->
iiter
==
NULL
)
{
// } else if (pCheckInfo->iter == NULL && pCheckInfo->iiter != NULL) {
// } else { // iter and iiter are all not NULL, three-way merge data block
STSchema
*
pSchema
=
tsdbGetTableSchema
(
tsdbGetMeta
(
pQueryHandle
->
pTsdb
),
pCheckInfo
->
pTableObj
);
}
else
if
(
pCheckInfo
->
iter
!=
NULL
||
pCheckInfo
->
iiter
!=
NULL
)
{
SSkipListNode
*
node
=
NULL
;
do
{
node
=
tSkipListIterGet
(
pCheckInfo
->
iter
);
if
(
node
==
NULL
)
{
SDataRow
row
=
getSDataRowInTableMem
(
pCheckInfo
);
if
(
row
==
NULL
)
{
break
;
}
SDataRow
row
=
SL_GET_NODE_DATA
(
node
);
TSKEY
key
=
dataRowKey
(
row
);
if
((
key
>
pQueryHandle
->
window
.
ekey
&&
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
||
(
key
<
pQueryHandle
->
window
.
ekey
&&
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)))
{
...
...
@@ -995,7 +986,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if
((
key
<
tsArray
[
pos
]
&&
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
||
(
key
>
tsArray
[
pos
]
&&
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)))
{
copyOneRowFromMem
(
pQueryHandle
,
p
CheckInfo
,
pQueryHandle
->
outputCapacity
,
numOfRows
,
row
,
pSchema
);
copyOneRowFromMem
(
pQueryHandle
,
p
QueryHandle
->
outputCapacity
,
numOfRows
,
row
,
pMeta
,
numOfCols
,
pTable
);
numOfRows
+=
1
;
if
(
cur
->
win
.
skey
==
TSKEY_INITIAL_VAL
)
{
cur
->
win
.
skey
=
key
;
...
...
@@ -1005,17 +996,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
cur
->
lastKey
=
key
+
step
;
cur
->
mixBlock
=
true
;
tSkipListIterNext
(
pCheckInfo
->
iter
);
moveToNextRow
(
pCheckInfo
);
}
else
if
(
key
==
tsArray
[
pos
])
{
// data in buffer has the same timestamp of data in file block, ignore it
tSkipListIterNext
(
pCheckInfo
->
iter
);
moveToNextRow
(
pCheckInfo
);
}
else
if
((
key
>
tsArray
[
pos
]
&&
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
||
(
key
<
tsArray
[
pos
]
&&
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)))
{
if
(
cur
->
win
.
skey
==
TSKEY_INITIAL_VAL
)
{
cur
->
win
.
skey
=
tsArray
[
pos
];
}
int32_t
order
=
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)
?
TSDB_ORDER_DESC
:
TSDB_ORDER_ASC
;
int32_t
end
=
vnodeBinarySearchKey
(
pCols
->
cols
[
0
].
pData
,
pCols
->
numOfRows
,
key
,
order
);
int32_t
end
=
doBinarySearchKey
(
pCols
->
cols
[
0
].
pData
,
pCols
->
numOfRows
,
key
,
order
);
if
(
tsArray
[
end
]
==
key
)
{
// the value of key in cache equals to the end timestamp value, ignore it
tSkipListIterNext
(
pCheckInfo
->
iter
);
}
...
...
@@ -1093,9 +1083,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if
(
numOfRows
<
pQueryHandle
->
outputCapacity
)
{
int32_t
emptySize
=
pQueryHandle
->
outputCapacity
-
numOfRows
;
int32_t
requiredNumOfCols
=
taosArrayGetSize
(
pQueryHandle
->
pColumns
);
for
(
int32_t
i
=
0
;
i
<
requiredNumOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
memmove
(
pColInfo
->
pData
,
pColInfo
->
pData
+
emptySize
*
pColInfo
->
info
.
bytes
,
numOfRows
*
pColInfo
->
info
.
bytes
);
}
...
...
@@ -1567,9 +1555,6 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pQueryHandle
->
pTableCheckInfo
,
i
);
if
(
pCheckInfo
->
pTableObj
->
tableId
.
uid
==
12094628167747
)
{
printf
(
"abc
\n
"
);
}
if
(
pCheckInfo
->
pTableObj
->
lastKey
>
key
)
{
key
=
pCheckInfo
->
pTableObj
->
lastKey
;
index
=
i
;
...
...
@@ -1652,8 +1637,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
*
skey
=
TSKEY_INITIAL_VAL
;
int64_t
st
=
taosGetTimestampUs
();
ST
Schema
*
pSchema
=
tsdbGetTableSchema
(
tsdbGetMeta
(
pQueryHandle
->
pTsdb
),
pCheckInfo
->
pTableObj
);
int32_t
numOfTableCols
=
schemaNCols
(
pSchema
)
;
ST
sdbMeta
*
pMeta
=
tsdbGetMeta
(
pQueryHandle
->
pTsdb
);
STable
*
pTable
=
pCheckInfo
->
pTableObj
;
do
{
SDataRow
row
=
getSDataRowInTableMem
(
pCheckInfo
);
...
...
@@ -1662,10 +1647,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
}
TSKEY
key
=
dataRowKey
(
row
);
if
((
key
>
maxKey
&&
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
||
(
key
<
maxKey
&&
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)))
{
if
((
key
>
maxKey
&&
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
||
(
key
<
maxKey
&&
!
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
)))
{
tsdbTrace
(
"%p key:%"
PRIu64
" beyond qrange:%"
PRId64
" - %"
PRId64
", no more data in buffer"
,
pQueryHandle
,
key
,
pQueryHandle
->
window
.
skey
,
pQueryHandle
->
window
.
ekey
);
...
...
@@ -1677,58 +1659,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
}
*
ekey
=
key
;
char
*
pData
=
NULL
;
int32_t
i
=
0
,
j
=
0
;
while
(
i
<
numOfCols
&&
j
<
numOfTableCols
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
if
(
pSchema
->
columns
[
j
].
colId
<
pColInfo
->
info
.
colId
)
{
j
++
;
continue
;
}
if
(
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
{
pData
=
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
}
else
{
pData
=
pColInfo
->
pData
+
(
maxRowsToRead
-
numOfRows
-
1
)
*
pColInfo
->
info
.
bytes
;
}
if
(
pSchema
->
columns
[
j
].
colId
==
pColInfo
->
info
.
colId
)
{
void
*
value
=
tdGetRowDataOfCol
(
row
,
pColInfo
->
info
.
type
,
TD_DATA_ROW_HEAD_SIZE
+
pSchema
->
columns
[
j
].
offset
);
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_BINARY
||
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_NCHAR
)
{
memcpy
(
pData
,
value
,
varDataTLen
(
value
));
}
else
{
memcpy
(
pData
,
value
,
pColInfo
->
info
.
bytes
);
}
j
++
;
i
++
;
}
else
{
// pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_BINARY
||
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_NCHAR
)
{
setVardataNull
(
pData
,
pColInfo
->
info
.
type
);
}
else
{
setNull
(
pData
,
pColInfo
->
info
.
type
,
pColInfo
->
info
.
bytes
);
}
i
++
;
}
}
while
(
i
<
numOfCols
)
{
// the remain columns are all null data
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pQueryHandle
->
pColumns
,
i
);
if
(
ASCENDING_TRAVERSE
(
pQueryHandle
->
order
))
{
pData
=
pColInfo
->
pData
+
numOfRows
*
pColInfo
->
info
.
bytes
;
}
else
{
pData
=
pColInfo
->
pData
+
(
maxRowsToRead
-
numOfRows
-
1
)
*
pColInfo
->
info
.
bytes
;
}
if
(
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_BINARY
||
pColInfo
->
info
.
type
==
TSDB_DATA_TYPE_NCHAR
)
{
setVardataNull
(
pData
,
pColInfo
->
info
.
type
);
}
else
{
setNull
(
pData
,
pColInfo
->
info
.
type
,
pColInfo
->
info
.
bytes
);
}
i
++
;
}
copyOneRowFromMem
(
pQueryHandle
,
maxRowsToRead
,
numOfRows
,
row
,
pMeta
,
numOfCols
,
pTable
);
if
(
++
numOfRows
>=
maxRowsToRead
)
{
moveToNextRow
(
pCheckInfo
);
...
...
src/util/inc/tcache.h
浏览文件 @
14313882
...
...
@@ -37,8 +37,8 @@ typedef struct SCacheDataNode {
uint64_t
expiredTime
;
// expiredTime expiredTime when this element should be remove from cache
uint64_t
signature
;
uint32_t
size
;
// allocated size for current SCacheDataNode
uint16_t
keySize
:
15
;
bool
inTrash
:
1
;
// denote if it is in trash or not
uint16_t
keySize
:
15
;
bool
inTrash
Can
:
1
;
// denote if it is in trash or not
T_REF_DECLARE
()
char
*
key
;
char
data
[];
...
...
@@ -50,46 +50,49 @@ typedef struct STrashElem {
SCacheDataNode
*
pData
;
}
STrashElem
;
typedef
struct
{
int64_t
totalSize
;
// total allocated buffer in this hash table, SCacheObj is not included.
int64_t
refreshTime
;
/*
* to accommodate the old datanode which has the same key value of new one in hashList
/*
* to accommodate the old data which has the same key value of new one in hashList
* when an new node is put into cache, if an existed one with the same key:
* 1. if the old one does not be referenced, update it.
* 2. otherwise, move the old one to pTrash, addedTime the new one.
*
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
*/
typedef
struct
{
int64_t
totalSize
;
// total allocated buffer in this hash table, SCacheObj is not included.
int64_t
refreshTime
;
STrashElem
*
pTrash
;
void
*
tmrCtrl
;
void
*
pTimer
;
SCacheStatis
statistics
;
SHashObj
*
pHashTable
;
_hash_free_fn_t
freeFp
;
int
numOfElemsInTrash
;
// number of element in trash
int16_t
deleting
;
// set the deleting flag to stop refreshing ASAP.
T_REF_DECLARE
()
uint32_t
numOfElemsInTrash
;
// number of element in trash
uint8_t
deleting
;
// set the deleting flag to stop refreshing ASAP.
pthread_t
refreshWorker
;
#if defined(LINUX)
pthread_rwlock_t
lock
;
#else
pthread_mutex_t
lock
;
#endif
}
SCacheObj
;
/**
*
* @param maxSessions maximum slots available for hash elements
* @param tmrCtrl timer ctrl
* initialize the cache object
* @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and
* not referenced by other objects
* @return
*/
SCacheObj
*
taosCacheInit
(
void
*
tmrCtrl
,
int64_t
refreshTimeInSeconds
);
SCacheObj
*
taosCacheInitWithCb
(
void
*
tmrCtrl
,
int64_t
refreshTimeInSeconds
,
void
(
*
freeCb
)(
void
*
data
));
SCacheObj
*
taosCacheInit
(
int64_t
refreshTimeInSeconds
);
/**
* initialize the cache object and set the free object callback function
* @param refreshTimeInSeconds
* @param freeCb
* @return
*/
SCacheObj
*
taosCacheInitWithCb
(
int64_t
refreshTimeInSeconds
,
void
(
*
freeCb
)(
void
*
data
));
/**
* add data into cache
...
...
src/util/src/tcache.c
浏览文件 @
14313882
...
...
@@ -77,31 +77,7 @@ static FORCE_INLINE void taosFreeNode(void *data) {
* @param lifespan total survial expiredTime from now
* @return SCacheDataNode
*/
static
SCacheDataNode
*
taosCreateCacheNode
(
const
char
*
key
,
size_t
keyLen
,
const
char
*
pData
,
size_t
size
,
uint64_t
duration
)
{
size_t
totalSize
=
size
+
sizeof
(
SCacheDataNode
)
+
keyLen
+
1
;
SCacheDataNode
*
pNewNode
=
calloc
(
1
,
totalSize
);
if
(
pNewNode
==
NULL
)
{
uError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
return
NULL
;
}
memcpy
(
pNewNode
->
data
,
pData
,
size
);
pNewNode
->
key
=
(
char
*
)
pNewNode
+
sizeof
(
SCacheDataNode
)
+
size
;
pNewNode
->
keySize
=
keyLen
;
memcpy
(
pNewNode
->
key
,
key
,
keyLen
);
pNewNode
->
addedTime
=
(
uint64_t
)
taosGetTimestampMs
();
pNewNode
->
expiredTime
=
pNewNode
->
addedTime
+
duration
;
pNewNode
->
signature
=
(
uint64_t
)
pNewNode
;
pNewNode
->
size
=
(
uint32_t
)
totalSize
;
return
pNewNode
;
}
static
SCacheDataNode
*
taosCreateCacheNode
(
const
char
*
key
,
size_t
keyLen
,
const
char
*
pData
,
size_t
size
,
uint64_t
duration
);
/**
* addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
...
...
@@ -109,50 +85,15 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const
* @param pCacheObj Cache object
* @param pNode Cache slot object
*/
static
void
taosAddToTrash
(
SCacheObj
*
pCacheObj
,
SCacheDataNode
*
pNode
)
{
if
(
pNode
->
inTrash
)
{
/* node is already in trash */
return
;
}
STrashElem
*
pElem
=
calloc
(
1
,
sizeof
(
STrashElem
));
pElem
->
pData
=
pNode
;
pElem
->
next
=
pCacheObj
->
pTrash
;
if
(
pCacheObj
->
pTrash
)
{
pCacheObj
->
pTrash
->
prev
=
pElem
;
}
pElem
->
prev
=
NULL
;
pCacheObj
->
pTrash
=
pElem
;
pNode
->
inTrash
=
true
;
pCacheObj
->
numOfElemsInTrash
++
;
static
void
taosAddToTrash
(
SCacheObj
*
pCacheObj
,
SCacheDataNode
*
pNode
);
uTrace
(
"key:%s %p move to trash, numOfElem in trash:%d"
,
pNode
->
key
,
pNode
,
pCacheObj
->
numOfElemsInTrash
);
}
static
void
taosRemoveFromTrash
(
SCacheObj
*
pCacheObj
,
STrashElem
*
pElem
)
{
if
(
pElem
->
pData
->
signature
!=
(
uint64_t
)
pElem
->
pData
)
{
uError
(
"key:sig:%d %p data has been released, ignore"
,
pElem
->
pData
->
signature
,
pElem
->
pData
);
return
;
}
pCacheObj
->
numOfElemsInTrash
--
;
if
(
pElem
->
prev
)
{
pElem
->
prev
->
next
=
pElem
->
next
;
}
else
{
/* pnode is the header, update header */
pCacheObj
->
pTrash
=
pElem
->
next
;
}
if
(
pElem
->
next
)
{
pElem
->
next
->
prev
=
pElem
->
prev
;
}
/**
* remove node in trash can
* @param pCacheObj
* @param pElem
*/
static
void
taosRemoveFromTrashCan
(
SCacheObj
*
pCacheObj
,
STrashElem
*
pElem
);
pElem
->
pData
->
signature
=
0
;
if
(
pCacheObj
->
freeFp
)
pCacheObj
->
freeFp
(
pElem
->
pData
->
data
);
free
(
pElem
->
pData
);
free
(
pElem
);
}
/**
* remove nodes in trash with refCount == 0 in cache
* @param pNode
...
...
@@ -160,42 +101,7 @@ static void taosRemoveFromTrash(SCacheObj *pCacheObj, STrashElem *pElem) {
* @param force force model, if true, remove data in trash without check refcount.
* may cause corruption. So, forece model only applys before cache is closed
*/
static
void
taosTrashEmpty
(
SCacheObj
*
pCacheObj
,
bool
force
)
{
__cache_wr_lock
(
pCacheObj
);
if
(
pCacheObj
->
numOfElemsInTrash
==
0
)
{
if
(
pCacheObj
->
pTrash
!=
NULL
)
{
uError
(
"key:inconsistency data in cache, numOfElem in trash:%d"
,
pCacheObj
->
numOfElemsInTrash
);
}
pCacheObj
->
pTrash
=
NULL
;
__cache_unlock
(
pCacheObj
);
return
;
}
STrashElem
*
pElem
=
pCacheObj
->
pTrash
;
while
(
pElem
)
{
T_REF_VAL_CHECK
(
pElem
->
pData
);
if
(
pElem
->
next
==
pElem
)
{
pElem
->
next
=
NULL
;
}
if
(
force
||
(
T_REF_VAL_GET
(
pElem
->
pData
)
==
0
))
{
uTrace
(
"key:%s %p removed from trash. numOfElem in trash:%d"
,
pElem
->
pData
->
key
,
pElem
->
pData
,
pCacheObj
->
numOfElemsInTrash
-
1
);
STrashElem
*
p
=
pElem
;
pElem
=
pElem
->
next
;
taosRemoveFromTrash
(
pCacheObj
,
p
);
}
else
{
pElem
=
pElem
->
next
;
}
}
assert
(
pCacheObj
->
numOfElemsInTrash
>=
0
);
__cache_unlock
(
pCacheObj
);
}
static
void
taosTrashCanEmpty
(
SCacheObj
*
pCacheObj
,
bool
force
);
/**
* release node
...
...
@@ -304,87 +210,20 @@ static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pCacheObj, con
return
pNode
;
}
static
void
doCleanupDataCache
(
SCacheObj
*
pCacheObj
)
{
__cache_wr_lock
(
pCacheObj
);
//if (taosHashGetSize(pCacheObj->pHashTable) > 0) {
taosHashCleanup
(
pCacheObj
->
pHashTable
);
//}
__cache_unlock
(
pCacheObj
);
taosTrashEmpty
(
pCacheObj
,
true
);
__cache_lock_destroy
(
pCacheObj
);
memset
(
pCacheObj
,
0
,
sizeof
(
SCacheObj
));
free
(
pCacheObj
);
}
/**
* do cleanup the taos cache
* @param pCacheObj
*/
static
void
doCleanupDataCache
(
SCacheObj
*
pCacheObj
);
/**
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pCacheObj->refreshTime
* @param handle Cache object handle
*/
static
void
taosCacheRefresh
(
void
*
handle
,
void
*
tmrId
)
{
SCacheObj
*
pCacheObj
=
(
SCacheObj
*
)
handle
;
if
(
pCacheObj
==
NULL
||
T_REF_VAL_GET
(
pCacheObj
)
==
0
)
{
uTrace
(
"object is destroyed. no refresh retry"
);
return
;
}
int16_t
ref
=
T_REF_INC
(
pCacheObj
);
if
(
ref
==
1
)
{
T_REF_DEC
(
pCacheObj
);
return
;
}
// todo add the ref before start the timer
int32_t
num
=
taosHashGetSize
(
pCacheObj
->
pHashTable
);
if
(
num
==
0
)
{
ref
=
T_REF_DEC
(
pCacheObj
);
if
(
ref
==
0
)
{
doCleanupDataCache
(
pCacheObj
);
}
else
{
taosTmrReset
(
taosCacheRefresh
,
pCacheObj
->
refreshTime
,
pCacheObj
,
pCacheObj
->
tmrCtrl
,
&
pCacheObj
->
pTimer
);
}
return
;
}
uint64_t
expiredTime
=
taosGetTimestampMs
();
pCacheObj
->
statistics
.
refreshCount
++
;
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pCacheObj
->
pHashTable
);
__cache_wr_lock
(
pCacheObj
);
while
(
taosHashIterNext
(
pIter
))
{
if
(
pCacheObj
->
deleting
==
1
)
{
taosHashDestroyIter
(
pIter
);
break
;
}
SCacheDataNode
*
pNode
=
*
(
SCacheDataNode
**
)
taosHashIterGet
(
pIter
);
if
(
pNode
->
expiredTime
<=
expiredTime
&&
T_REF_VAL_GET
(
pNode
)
<=
0
)
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
}
}
__cache_unlock
(
pCacheObj
);
taosHashDestroyIter
(
pIter
);
taosTrashEmpty
(
pCacheObj
,
false
);
ref
=
T_REF_DEC
(
pCacheObj
);
if
(
ref
==
0
)
{
doCleanupDataCache
(
pCacheObj
);
return
;
}
else
{
taosTmrReset
(
taosCacheRefresh
,
pCacheObj
->
refreshTime
,
pCacheObj
,
pCacheObj
->
tmrCtrl
,
&
pCacheObj
->
pTimer
);
}
}
static
void
*
taosCacheRefresh
(
void
*
handle
);
SCacheObj
*
taosCacheInitWithCb
(
void
*
tmrCtrl
,
int64_t
refreshTime
,
void
(
*
freeCb
)(
void
*
data
))
{
if
(
tmrCtrl
==
NULL
||
refreshTime
<=
0
)
{
SCacheObj
*
taosCacheInitWithCb
(
int64_t
refreshTime
,
void
(
*
freeCb
)(
void
*
data
))
{
if
(
refreshTime
<=
0
)
{
return
NULL
;
}
...
...
@@ -394,7 +233,7 @@ SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb
return
NULL
;
}
pCacheObj
->
pHashTable
=
taosHashInit
(
1
024
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
);
pCacheObj
->
pHashTable
=
taosHashInit
(
1
28
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
);
if
(
pCacheObj
->
pHashTable
==
NULL
)
{
free
(
pCacheObj
);
uError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
...
...
@@ -406,12 +245,8 @@ SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb
pCacheObj
->
freeFp
=
freeCb
;
pCacheObj
->
refreshTime
=
refreshTime
*
1000
;
pCacheObj
->
tmrCtrl
=
tmrCtrl
;
taosTmrReset
(
taosCacheRefresh
,
pCacheObj
->
refreshTime
,
pCacheObj
,
pCacheObj
->
tmrCtrl
,
&
pCacheObj
->
pTimer
);
if
(
__cache_lock_init
(
pCacheObj
)
!=
0
)
{
taosTmrStopA
(
&
pCacheObj
->
pTimer
);
taosHashCleanup
(
pCacheObj
->
pHashTable
);
free
(
pCacheObj
);
...
...
@@ -419,12 +254,18 @@ SCacheObj *taosCacheInitWithCb(void *tmrCtrl, int64_t refreshTime, void (*freeCb
return
NULL
;
}
T_REF_INC
(
pCacheObj
);
pthread_attr_t
thattr
=
{
0
};
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
pthread_create
(
&
pCacheObj
->
refreshWorker
,
&
thattr
,
taosCacheRefresh
,
pCacheObj
);
pthread_attr_destroy
(
&
thattr
);
return
pCacheObj
;
}
SCacheObj
*
taosCacheInit
(
void
*
tmrCtrl
,
int64_t
refreshTime
)
{
return
taosCacheInitWithCb
(
tmrCtrl
,
refreshTime
,
NULL
);
SCacheObj
*
taosCacheInit
(
int64_t
refreshTime
)
{
return
taosCacheInitWithCb
(
refreshTime
,
NULL
);
}
void
*
taosCachePut
(
SCacheObj
*
pCacheObj
,
const
char
*
key
,
const
void
*
pData
,
size_t
dataSize
,
int
duration
)
{
...
...
@@ -600,7 +441,7 @@ void taosCacheEmpty(SCacheObj *pCacheObj) {
__cache_unlock
(
pCacheObj
);
taosHashDestroyIter
(
pIter
);
taosTrashEmpty
(
pCacheObj
,
false
);
taosTrash
Can
Empty
(
pCacheObj
,
false
);
}
void
taosCacheCleanup
(
SCacheObj
*
pCacheObj
)
{
...
...
@@ -608,8 +449,180 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
return
;
}
int32_t
ref
=
T_REF_DEC
(
pCacheObj
);
if
(
ref
==
0
)
{
pCacheObj
->
deleting
=
1
;
pthread_join
(
pCacheObj
->
refreshWorker
,
NULL
);
doCleanupDataCache
(
pCacheObj
);
}
SCacheDataNode
*
taosCreateCacheNode
(
const
char
*
key
,
size_t
keyLen
,
const
char
*
pData
,
size_t
size
,
uint64_t
duration
)
{
size_t
totalSize
=
size
+
sizeof
(
SCacheDataNode
)
+
keyLen
+
1
;
SCacheDataNode
*
pNewNode
=
calloc
(
1
,
totalSize
);
if
(
pNewNode
==
NULL
)
{
uError
(
"failed to allocate memory, reason:%s"
,
strerror
(
errno
));
return
NULL
;
}
memcpy
(
pNewNode
->
data
,
pData
,
size
);
pNewNode
->
key
=
(
char
*
)
pNewNode
+
sizeof
(
SCacheDataNode
)
+
size
;
pNewNode
->
keySize
=
keyLen
;
memcpy
(
pNewNode
->
key
,
key
,
keyLen
);
pNewNode
->
addedTime
=
(
uint64_t
)
taosGetTimestampMs
();
pNewNode
->
expiredTime
=
pNewNode
->
addedTime
+
duration
;
pNewNode
->
signature
=
(
uint64_t
)
pNewNode
;
pNewNode
->
size
=
(
uint32_t
)
totalSize
;
return
pNewNode
;
}
void
taosAddToTrash
(
SCacheObj
*
pCacheObj
,
SCacheDataNode
*
pNode
)
{
if
(
pNode
->
inTrashCan
)
{
/* node is already in trash */
return
;
}
STrashElem
*
pElem
=
calloc
(
1
,
sizeof
(
STrashElem
));
pElem
->
pData
=
pNode
;
pElem
->
next
=
pCacheObj
->
pTrash
;
if
(
pCacheObj
->
pTrash
)
{
pCacheObj
->
pTrash
->
prev
=
pElem
;
}
pElem
->
prev
=
NULL
;
pCacheObj
->
pTrash
=
pElem
;
pNode
->
inTrashCan
=
true
;
pCacheObj
->
numOfElemsInTrash
++
;
uTrace
(
"key:%s %p move to trash, numOfElem in trash:%d"
,
pNode
->
key
,
pNode
,
pCacheObj
->
numOfElemsInTrash
);
}
void
taosRemoveFromTrashCan
(
SCacheObj
*
pCacheObj
,
STrashElem
*
pElem
)
{
if
(
pElem
->
pData
->
signature
!=
(
uint64_t
)
pElem
->
pData
)
{
uError
(
"key:sig:%d %p data has been released, ignore"
,
pElem
->
pData
->
signature
,
pElem
->
pData
);
return
;
}
pCacheObj
->
numOfElemsInTrash
--
;
if
(
pElem
->
prev
)
{
pElem
->
prev
->
next
=
pElem
->
next
;
}
else
{
/* pnode is the header, update header */
pCacheObj
->
pTrash
=
pElem
->
next
;
}
if
(
pElem
->
next
)
{
pElem
->
next
->
prev
=
pElem
->
prev
;
}
pElem
->
pData
->
signature
=
0
;
if
(
pCacheObj
->
freeFp
)
pCacheObj
->
freeFp
(
pElem
->
pData
->
data
);
free
(
pElem
->
pData
);
free
(
pElem
);
}
void
taosTrashCanEmpty
(
SCacheObj
*
pCacheObj
,
bool
force
)
{
__cache_wr_lock
(
pCacheObj
);
if
(
pCacheObj
->
numOfElemsInTrash
==
0
)
{
if
(
pCacheObj
->
pTrash
!=
NULL
)
{
uError
(
"key:inconsistency data in cache, numOfElem in trash:%d"
,
pCacheObj
->
numOfElemsInTrash
);
}
pCacheObj
->
pTrash
=
NULL
;
__cache_unlock
(
pCacheObj
);
return
;
}
STrashElem
*
pElem
=
pCacheObj
->
pTrash
;
while
(
pElem
)
{
T_REF_VAL_CHECK
(
pElem
->
pData
);
if
(
pElem
->
next
==
pElem
)
{
pElem
->
next
=
NULL
;
}
if
(
force
||
(
T_REF_VAL_GET
(
pElem
->
pData
)
==
0
))
{
uTrace
(
"key:%s %p removed from trash. numOfElem in trash:%d"
,
pElem
->
pData
->
key
,
pElem
->
pData
,
pCacheObj
->
numOfElemsInTrash
-
1
);
STrashElem
*
p
=
pElem
;
pElem
=
pElem
->
next
;
taosRemoveFromTrashCan
(
pCacheObj
,
p
);
}
else
{
pElem
=
pElem
->
next
;
}
}
assert
(
pCacheObj
->
numOfElemsInTrash
>=
0
);
__cache_unlock
(
pCacheObj
);
}
void
doCleanupDataCache
(
SCacheObj
*
pCacheObj
)
{
__cache_wr_lock
(
pCacheObj
);
taosHashCleanup
(
pCacheObj
->
pHashTable
);
__cache_unlock
(
pCacheObj
);
taosTrashCanEmpty
(
pCacheObj
,
true
);
__cache_lock_destroy
(
pCacheObj
);
memset
(
pCacheObj
,
0
,
sizeof
(
SCacheObj
));
free
(
pCacheObj
);
}
void
*
taosCacheRefresh
(
void
*
handle
)
{
SCacheObj
*
pCacheObj
=
(
SCacheObj
*
)
handle
;
if
(
pCacheObj
==
NULL
)
{
uTrace
(
"object is destroyed. no refresh retry"
);
return
NULL
;
}
const
int32_t
SLEEP_DURATION
=
500
;
//500 ms
int64_t
totalTick
=
pCacheObj
->
refreshTime
/
SLEEP_DURATION
;
int64_t
count
=
0
;
while
(
1
)
{
taosMsleep
(
500
);
// check if current cache object will be deleted every 500ms.
if
(
pCacheObj
->
deleting
)
{
break
;
}
if
(
++
count
<
totalTick
)
{
continue
;
}
// reset the count value
count
=
0
;
size_t
num
=
taosHashGetSize
(
pCacheObj
->
pHashTable
);
if
(
num
==
0
)
{
continue
;
}
uint64_t
expiredTime
=
taosGetTimestampMs
();
pCacheObj
->
statistics
.
refreshCount
++
;
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pCacheObj
->
pHashTable
);
__cache_wr_lock
(
pCacheObj
);
while
(
taosHashIterNext
(
pIter
))
{
SCacheDataNode
*
pNode
=
*
(
SCacheDataNode
**
)
taosHashIterGet
(
pIter
);
if
(
pNode
->
expiredTime
<=
expiredTime
&&
T_REF_VAL_GET
(
pNode
)
<=
0
)
{
taosCacheReleaseNode
(
pCacheObj
,
pNode
);
}
}
__cache_unlock
(
pCacheObj
);
taosHashDestroyIter
(
pIter
);
taosTrashCanEmpty
(
pCacheObj
,
false
);
}
return
NULL
;
}
\ No newline at end of file
src/util/tests/cacheTest.cpp
浏览文件 @
14313882
...
...
@@ -19,8 +19,7 @@ int32_t tsMaxMeterConnections = 200;
// test cache
TEST
(
testCase
,
client_cache_test
)
{
const
int32_t
REFRESH_TIME_IN_SEC
=
2
;
void
*
tscTmr
=
taosTmrInit
(
tsMaxMgmtConnections
*
2
,
200
,
6000
,
"TSC"
);
SCacheObj
*
tscCacheHandle
=
taosCacheInit
(
tscTmr
,
REFRESH_TIME_IN_SEC
);
SCacheObj
*
tscCacheHandle
=
taosCacheInit
(
REFRESH_TIME_IN_SEC
);
const
char
*
key1
=
"test1"
;
char
data1
[]
=
"test11"
;
...
...
@@ -106,9 +105,7 @@ TEST(testCase, client_cache_test) {
TEST
(
testCase
,
cache_resize_test
)
{
const
int32_t
REFRESH_TIME_IN_SEC
=
2
;
void
*
tscTmr
=
taosTmrInit
(
1000
*
2
,
200
,
6000
,
"TSC"
);
auto
*
pCache
=
taosCacheInit
(
tscTmr
,
REFRESH_TIME_IN_SEC
);
auto
*
pCache
=
taosCacheInit
(
REFRESH_TIME_IN_SEC
);
char
key
[
256
]
=
{
0
};
char
data
[
1024
]
=
"abcdefghijk"
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录