Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
355eeda4
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
355eeda4
编写于
3月 24, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '2.0' into refact/slguan
# Conflicts: # src/dnode/src/dnodeRead.c
上级
fa9a4099
fd5cc4b5
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
308 addition
and
301 deletion
+308
-301
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+5
-4
src/client/src/tscServer.c
src/client/src/tscServer.c
+6
-8
src/client/src/tscSql.c
src/client/src/tscSql.c
+4
-57
src/dnode/src/dnodeRead.c
src/dnode/src/dnodeRead.c
+10
-5
src/query/inc/queryExecutor.h
src/query/inc/queryExecutor.h
+6
-6
src/query/src/queryExecutor.c
src/query/src/queryExecutor.c
+67
-78
src/util/src/shash.c
src/util/src/shash.c
+0
-2
src/vnode/tsdb/inc/tsdbFile.h
src/vnode/tsdb/inc/tsdbFile.h
+48
-16
src/vnode/tsdb/src/tsdbFile.c
src/vnode/tsdb/src/tsdbFile.c
+118
-114
src/vnode/tsdb/src/tsdbMain.c
src/vnode/tsdb/src/tsdbMain.c
+43
-10
src/vnode/tsdb/tests/tsdbTests.cpp
src/vnode/tsdb/tests/tsdbTests.cpp
+1
-1
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
355eeda4
...
...
@@ -294,20 +294,21 @@ typedef struct SResRec {
struct
STSBuf
;
typedef
struct
{
int32_t
code
;
int64_t
numOfRows
;
// num of results in current retrieved
int64_t
numOfTotal
;
// num of total results
int64_t
numOfTotalInCurrentClause
;
// num of total result in current subclause
char
*
pRsp
;
int
rspType
;
int
rspLen
;
int
32_t
rspType
;
int
32_t
rspLen
;
uint64_t
qhandle
;
int64_t
uid
;
int64_t
useconds
;
int64_t
offset
;
// offset value from vnode during projection query of stable
int
row
;
int
32_t
row
;
int16_t
numOfCols
;
int16_t
precision
;
bool
completed
;
int32_t
code
;
int32_t
numOfGroups
;
SResRec
*
pGroupRec
;
char
*
data
;
...
...
src/client/src/tscServer.c
浏览文件 @
355eeda4
...
...
@@ -209,7 +209,6 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}
void
tscProcessMsgFromServer
(
SRpcMsg
*
rpcMsg
)
{
tscTrace
(
"response:%s is received, len:%d error:%s"
,
taosMsg
[
rpcMsg
->
msgType
],
rpcMsg
->
contLen
,
tstrerror
(
rpcMsg
->
code
));
SSqlObj
*
pSql
=
(
SSqlObj
*
)
rpcMsg
->
handle
;
if
(
pSql
==
NULL
||
pSql
->
signature
!=
pSql
)
{
tscError
(
"%p sql is already released, signature:%p"
,
pSql
,
pSql
->
signature
);
...
...
@@ -256,7 +255,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
else
{
tscTrace
(
"%p it shall renew meter meta, code:%d"
,
pSql
,
rpcMsg
->
code
);
tscTrace
(
"%p it shall renew meter meta, code:%d"
,
pSql
,
tstrerror
(
rpcMsg
->
code
)
);
pSql
->
maxRetry
=
TSDB_VNODES_SUPPORT
*
2
;
pSql
->
res
.
code
=
rpcMsg
->
code
;
// keep the previous error code
...
...
@@ -278,7 +277,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
if
(
pRes
->
code
!=
TSDB_CODE_QUERY_CANCELLED
)
{
pRes
->
code
=
(
rpcMsg
->
code
!=
TSDB_CODE_SUCCESS
)
?
rpcMsg
->
code
:
TSDB_CODE_NETWORK_UNAVAIL
;
}
else
{
tscTrace
(
"%p query is cancelled, code:%d"
,
pSql
,
pRes
->
code
);
tscTrace
(
"%p query is cancelled, code:%d"
,
pSql
,
tstrerror
(
pRes
->
code
)
);
}
if
(
pRes
->
code
!=
TSDB_CODE_QUERY_CANCELLED
)
{
...
...
@@ -318,19 +317,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
tscTrace
(
"%p cmd:%d code:%d, inserted rows:%d, rsp len:%d"
,
pSql
,
pCmd
->
command
,
pRes
->
code
,
*
(
int32_t
*
)
pRes
->
pRsp
,
pRes
->
rspLen
);
}
else
{
tscTrace
(
"%p cmd:%d code:%
d rsp len:%d"
,
pSql
,
pCmd
->
command
,
pRes
->
code
,
pRes
->
rspLen
);
tscTrace
(
"%p cmd:%d code:%
s rsp len:%d"
,
pSql
,
pCmd
->
command
,
tstrerror
(
pRes
->
code
)
,
pRes
->
rspLen
);
}
}
if
(
pRes
->
code
==
TSDB_CODE_SUCCESS
&&
tscProcessMsgRsp
[
pCmd
->
command
])
rpcMsg
->
code
=
(
*
tscProcessMsgRsp
[
pCmd
->
command
])(
pSql
);
if
(
rpcMsg
->
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
int
command
=
pCmd
->
command
;
void
*
taosres
=
tscKeepConn
[
command
]
?
pSql
:
NULL
;
rpcMsg
->
code
=
pRes
->
code
?
-
pRes
->
code
:
pRes
->
numOfRows
;
tscTrace
(
"%p Async SQL result:%d res:%p"
,
pSql
,
rpcMsg
->
code
,
taosres
);
tscTrace
(
"%p Async SQL result:%s res:%p"
,
pSql
,
tstrerror
(
pRes
->
code
),
taosres
);
/*
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
...
...
@@ -2304,6 +2301,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes
->
precision
=
htons
(
pRetrieve
->
precision
);
pRes
->
offset
=
htobe64
(
pRetrieve
->
offset
);
pRes
->
useconds
=
htobe64
(
pRetrieve
->
useconds
);
pRes
->
completed
=
(
pRetrieve
->
completed
==
1
);
pRes
->
data
=
pRetrieve
->
data
;
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
pCmd
->
clauseIndex
);
...
...
src/client/src/tscSql.c
浏览文件 @
355eeda4
...
...
@@ -653,62 +653,6 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
return
pRes
->
tsrow
;
}
TAOS_ROW
taos_fetch_row_impl
(
TAOS_RES
*
res
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
pRes
->
qhandle
==
0
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
)
{
return
NULL
;
}
if
(
pCmd
->
command
==
TSDB_SQL_METRIC_JOIN_RETRIEVE
)
{
tscFetchDatablockFromSubquery
(
pSql
);
if
(
pRes
->
code
==
TSDB_CODE_SUCCESS
)
{
tscTrace
(
"%p data from all subqueries have been retrieved to client"
,
pSql
);
return
tscBuildResFromSubqueries
(
pSql
);
}
else
{
tscTrace
(
"%p retrieve data from subquery failed, code:%d"
,
pSql
,
pRes
->
code
);
return
NULL
;
}
}
else
if
(
pRes
->
row
>=
pRes
->
numOfRows
)
{
/**
* NOT a join query
*
* If the data block of current result set have been consumed already, try fetch next result
* data block from virtual node.
*/
tscResetForNextRetrieve
(
pRes
);
if
(
pCmd
->
command
<
TSDB_SQL_LOCAL
)
{
pCmd
->
command
=
(
pCmd
->
command
>
TSDB_SQL_MGMT
)
?
TSDB_SQL_RETRIEVE
:
TSDB_SQL_FETCH
;
}
tscProcessSql
(
pSql
);
// retrieve data from virtual node
// if failed to retrieve data from current virtual node, try next one if exists
if
(
hasMoreVnodesToTry
(
pSql
))
{
tscTryQueryNextVnode
(
pSql
,
NULL
);
}
/*
* local reducer has handle this case,
* so no need to add the pRes->numOfRows for super table query
*/
if
(
pCmd
->
command
!=
TSDB_SQL_RETRIEVE_METRIC
)
{
pRes
->
numOfTotalInCurrentClause
+=
pRes
->
numOfRows
;
}
if
(
pRes
->
numOfRows
==
0
)
{
return
NULL
;
}
}
return
doSetResultRowData
(
pSql
);
}
static
void
asyncFetchCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
tres
;
if
(
numOfRows
<
0
)
{
...
...
@@ -729,7 +673,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
if
(
pRes
->
qhandle
==
0
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pCmd
->
command
==
TSDB_SQL_INSERT
)
{
if
(
pRes
->
qhandle
==
0
||
pRes
->
completed
||
pCmd
->
command
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pCmd
->
command
==
TSDB_SQL_INSERT
)
{
return
NULL
;
}
...
...
src/dnode/src/dnodeRead.c
浏览文件 @
355eeda4
...
...
@@ -92,8 +92,8 @@ void dnodeRead(SRpcMsg *pMsg) {
while
(
leftLen
>
0
)
{
SMsgHead
*
pHead
=
(
SMsgHead
*
)
pCont
;
pHead
->
vgId
=
1
;
//
htonl(pHead->vgId);
pHead
->
contLen
=
pMsg
->
contLen
;
//
htonl(pHead->contLen);
pHead
->
vgId
=
htonl
(
pHead
->
vgId
);
pHead
->
contLen
=
htonl
(
pHead
->
contLen
);
void
*
pVnode
=
dnodeGetVnode
(
pHead
->
vgId
);
if
(
pVnode
==
NULL
)
{
...
...
@@ -253,17 +253,19 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
qTableQuery
(
pQInfo
);
}
static
int32_t
c
=
0
;
static
void
dnodeProcessRetrieveMsg
(
SReadMsg
*
pMsg
)
{
SRetrieveTableMsg
*
pRetrieve
=
pMsg
->
pCont
;
void
*
pQInfo
=
(
void
*
)
htobe64
(
pRetrieve
->
qhandle
);
dTrace
(
"QInfo:%p vgId:%d, retrieve msg is received"
,
pQInfo
,
pRetrieve
->
header
.
vgId
);
if
((
++
c
)
%
2
==
0
)
{
int32_t
k
=
1
;
}
int32_t
rowSize
=
0
;
int32_t
numOfRows
=
0
;
int32_t
contLen
=
0
;
SRpcMsg
rpcRsp
=
{
0
};
SRetrieveTableRsp
*
pRsp
=
NULL
;
int32_t
code
=
qRetrieveQueryResultInfo
(
pQInfo
,
&
numOfRows
,
&
rowSize
);
...
...
@@ -277,7 +279,7 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
code
=
qDumpRetrieveResult
(
pQInfo
,
&
pRsp
,
&
contLen
);
}
rpcRsp
=
(
SRpcMsg
)
{
SRpcMsg
rpcRsp
=
(
SRpcMsg
)
{
.
handle
=
pMsg
->
rpcMsg
.
handle
,
.
pCont
=
pRsp
,
.
contLen
=
contLen
,
...
...
@@ -286,4 +288,7 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
};
rpcSendResponse
(
&
rpcRsp
);
//todo merge result should be done here
//dnodeProcessReadResult(&readMsg);
}
src/query/inc/queryExecutor.h
浏览文件 @
355eeda4
...
...
@@ -33,7 +33,7 @@ typedef struct SData {
}
SData
;
enum
{
ST_QUERY_KILLED
=
0
,
// query killed
//
ST_QUERY_KILLED = 0, // query killed
ST_QUERY_PAUSED
=
1
,
// query paused, due to full of the response buffer
ST_QUERY_COMPLETED
=
2
,
// query completed
};
...
...
@@ -142,8 +142,8 @@ typedef struct SQuery {
SResultRec
rec
;
int32_t
pos
;
int64_t
pointsOffset
;
// the number of points offset to save read data
SData
**
sdata
;
int32_t
capacity
;
SData
**
sdata
;
int32_t
capacity
;
SSingleColumnFilterInfo
*
pFilterInfo
;
}
SQuery
;
...
...
@@ -170,14 +170,14 @@ typedef struct SQueryRuntimeEnv {
}
SQueryRuntimeEnv
;
typedef
struct
SQInfo
{
uint64_t
signature
;
void
*
signature
;
void
*
pVnode
;
TSKEY
startTime
;
int64_t
elapsedTime
;
TSKEY
elapsedTime
;
SResultRec
rec
;
int32_t
pointsInterpo
;
int32_t
code
;
// error code to returned to client
int32_t
killed
;
// denotes if current query is killed
//
int32_t killed; // denotes if current query is killed
sem_t
dataReady
;
SArray
*
pTableIdList
;
// table list
SQueryRuntimeEnv
runtimeEnv
;
...
...
src/query/src/queryExecutor.c
浏览文件 @
355eeda4
...
...
@@ -64,38 +64,24 @@ typedef struct SPointInterpoSupporter {
}
SPointInterpoSupporter
;
typedef
enum
{
/*
* the program will call this function again, if this status is set.
* used to transfer from QUERY_RESBUF_FULL
*/
// when query starts to execute, this status will set
QUERY_NOT_COMPLETED
=
0x1u
,
/*
* output buffer is full, so, the next query will be employed,
* in this case, we need to set the appropriated start scan point for
* the next query.
*
* this status is only exist in group-by clause and
* diff/add/division/multiply/ query.
/* result output buffer is full, current query is paused.
* this status is only exist in group-by clause and diff/add/division/multiply/ query.
*/
QUERY_RESBUF_FULL
=
0x2u
,
/*
* query is over
* 1. this status is used in one row result query process, e.g.,
* count/sum/first/last/
* avg...etc.
* 2. when the query range on timestamp is satisfied, it is also denoted as
* query_compeleted
/* query is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
QUERY_COMPLETED
=
0x4u
,
/*
* all data has been scanned, so current search is stopped,
* At last, the function will transfer this status to QUERY_COMPLETED
/* when the result is not completed return to client, this status will be
* usually used in case of interval query with interpolation option
*/
QUERY_
NO_DATA_TO_CHECK
=
0x8u
,
QUERY_
OVER
=
0x8u
,
}
vnodeQueryStatus
;
static
void
setQueryStatus
(
SQuery
*
pQuery
,
int8_t
status
);
...
...
@@ -1301,7 +1287,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
// if timestamp filter list is empty, quit current query
if
(
!
tsBufNextPos
(
pRuntimeEnv
->
pTSBuf
))
{
setQueryStatus
(
pQuery
,
QUERY_
NO_DATA_TO_CHECK
);
setQueryStatus
(
pQuery
,
QUERY_
COMPLETED
);
break
;
}
}
...
...
@@ -1621,10 +1607,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv
->
pTSBuf
=
tsBufDestory
(
pRuntimeEnv
->
pTSBuf
);
}
bool
isQueryKilled
(
SQuery
*
pQuery
)
{
return
false
;
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pQuery
);
static
bool
isQueryKilled
(
SQuery
*
pQuery
)
{
#if 0
/*
* check if the queried meter is going to be deleted.
...
...
@@ -1638,9 +1621,14 @@ bool isQueryKilled(SQuery *pQuery) {
return (pQInfo->killed == 1);
#endif
return
0
;
}
static
bool
setQueryKilled
(
SQInfo
*
pQInfo
)
{
pQInfo
->
code
=
TSDB_CODE_QUERY_CANCELLED
;
}
bool
isFixedOutputQuery
(
SQuery
*
pQuery
)
{
if
(
pQuery
->
intervalTime
!=
0
)
{
return
false
;
...
...
@@ -2664,7 +2652,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
// check if query is killed or not set the status of query to pass the status check
if
(
isQueryKilled
(
pQuery
))
{
setQueryStatus
(
pQuery
,
QUERY_NO_DATA_TO_CHECK
);
return
cnt
;
}
...
...
@@ -2714,7 +2701,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
if
(
isIntervalQuery
(
pQuery
)
&&
IS_MASTER_SCAN
(
pRuntimeEnv
))
{
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
|
QUERY_NO_DATA_TO_CHECK
))
{
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
int32_t
step
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
QUERY_ASC_FORWARD_STEP
:
QUERY_DESC_FORWARD_STEP
;
closeAllTimeWindow
(
&
pRuntimeEnv
->
windowResInfo
);
...
...
@@ -3631,7 +3618,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
/* check if query is killed or not */
if
(
isQueryKilled
(
pQuery
))
{
setQueryStatus
(
pQuery
,
QUERY_NO_DATA_TO_CHECK
);
//
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return
;
}
}
...
...
@@ -4111,7 +4098,7 @@ bool vnodeHasRemainResults(void *handle) {
}
// query has completed
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
|
QUERY_NO_DATA_TO_CHECK
))
{
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
TSKEY
ekey
=
taosGetRevisedEndKey
(
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
,
pQuery
->
intervalTime
,
pQuery
->
slidingTimeUnit
,
pQuery
->
precision
);
// int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY
...
...
@@ -4272,7 +4259,7 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) {
pQuery
->
window
.
ekey
,
pQuery
->
order
.
order
);
sem_post
(
&
pQInfo
->
dataReady
);
pQInfo
->
killed
=
1
;
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
)
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -5024,7 +5011,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) {
// since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously.
pQuery
->
rec
.
pointsRead
=
getNumOfResult
(
pRuntimeEnv
);
// assert(pQuery->pointsRead <= pQuery->pointsToRead &&
// Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED
| QUERY_NO_DATA_TO_CHECK
));
// Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED));
// must be top/bottom query if offset > 0
if
(
pQuery
->
limit
.
offset
>
0
)
{
...
...
@@ -5128,7 +5115,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery
->
limit
.
offset
-=
c
;
}
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_
NO_DATA_TO_CHECK
|
QUERY_
COMPLETED
))
{
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
break
;
}
...
...
@@ -5178,7 +5165,7 @@ static void tableIntervalProcessor(SQInfo *pQInfo) {
pQInfo
,
(
tFilePage
**
)
pQuery
->
sdata
,
(
tFilePage
**
)
pInterpoBuf
,
pQuery
->
rec
.
pointsRead
,
&
numOfInterpo
);
dTrace
(
"QInfo: %p interpo completed, final:%d"
,
pQInfo
,
pQuery
->
rec
.
pointsRead
);
if
(
pQuery
->
rec
.
pointsRead
>
0
||
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
|
QUERY_NO_DATA_TO_CHECK
))
{
if
(
pQuery
->
rec
.
pointsRead
>
0
||
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
doRevisedResultsByLimit
(
pQInfo
);
break
;
}
...
...
@@ -5206,17 +5193,20 @@ static void tableIntervalProcessor(SQInfo *pQInfo) {
}
void
qTableQuery
(
SQInfo
*
pQInfo
)
{
assert
(
pQInfo
!=
NULL
);
if
(
pQInfo
->
killed
)
{
dTrace
(
"QInfo:%p it is already killed, abort"
,
pQInfo
);
if
(
pQInfo
==
NULL
||
pQInfo
->
signature
!=
pQInfo
)
{
dTrace
(
"%p freed abort query"
,
pQInfo
);
return
;
}
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
isQueryKilled
(
pQuery
))
{
dTrace
(
"QInfo:%p it is already killed, abort"
,
pQInfo
);
return
;
}
dTrace
(
"QInfo:%p query task is launched"
,
pQInfo
);
if
(
vnodeHasRemainResults
(
pQInfo
))
{
/*
...
...
@@ -5242,7 +5232,7 @@ void qTableQuery(SQInfo *pQInfo) {
}
// here we have scan all qualified data in both data file and cache
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_
NO_DATA_TO_CHECK
|
QUERY_
COMPLETED
))
{
if
(
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
// continue to get push data from the group result
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
||
(
pQuery
->
intervalTime
>
0
&&
pQInfo
->
rec
.
pointsTotal
<
pQuery
->
limit
.
limit
))
{
...
...
@@ -5303,10 +5293,8 @@ void qTableQuery(SQInfo *pQInfo) {
/* check if query is killed or not */
if
(
isQueryKilled
(
pQuery
))
{
dTrace
(
"QInfo:%p query is killed"
,
pQInfo
);
// pQInfo->over = 1;
}
else
{
// dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned", pQInfo,
// pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead);
dTrace
(
"QInfo:%p query task completed, %d points are returned"
,
pQInfo
,
pQuery
->
rec
.
pointsRead
);
}
sem_post
(
&
pQInfo
->
dataReady
);
...
...
@@ -5989,21 +5977,16 @@ bool isQInfoValid(void *param) {
return
(
sig
==
(
uint64_t
)
pQInfo
);
}
void
vnodeFreeQInfo
(
SQInfo
*
pQInfo
,
bool
decQueryRef
)
{
void
vnodeFreeQInfo
(
SQInfo
*
pQInfo
)
{
if
(
!
isQInfoValid
(
pQInfo
))
{
return
;
}
pQInfo
->
killed
=
1
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
setQueryKilled
(
pQInfo
);
dTrace
(
"QInfo:%p start to free SQInfo"
,
pQInfo
);
if
(
decQueryRef
)
{
vnodeDecMeterRefcnt
(
pQInfo
);
}
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
for
(
int
col
=
0
;
col
<
pQuery
->
numOfOutputCols
;
++
col
)
{
for
(
int32_t
col
=
0
;
col
<
pQuery
->
numOfOutputCols
;
++
col
)
{
tfree
(
pQuery
->
sdata
[
col
]);
}
...
...
@@ -6049,7 +6032,7 @@ void vnodeFreeQInfo(SQInfo *pQInfo, bool decQueryRef) {
tfree
(
pQuery
->
pGroupbyExpr
);
tfree
(
pQuery
);
// dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId
);
dTrace
(
"QInfo:%p QInfo is freed"
,
pQInfo
);
// destroy signature, in order to avoid the query process pass the object safety check
memset
(
pQInfo
,
0
,
sizeof
(
SQInfo
));
...
...
@@ -6105,7 +6088,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE
_error:
// table query ref will be decrease during error handling
vnodeFreeQInfo
(
*
pQInfo
,
false
);
vnodeFreeQInfo
(
*
pQInfo
);
return
code
;
}
...
...
@@ -6176,28 +6159,25 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro
if
(
pQInfo
==
NULL
||
!
isQInfoValid
(
pQInfo
))
{
return
TSDB_CODE_INVALID_QHANDLE
;
}
if
(
pQInfo
->
killed
)
{
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
if
(
isQueryKilled
(
pQuery
))
{
dTrace
(
"QInfo:%p query is killed, code:%d"
,
pQInfo
,
pQInfo
->
code
);
if
(
pQInfo
->
code
==
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_QUERY_CANCELLED
;
}
else
{
// in case of not TSDB_CODE_SUCCESS, return the code to client
return
abs
(
pQInfo
->
code
);
return
(
pQInfo
->
code
>=
0
)
?
pQInfo
->
code
:
(
-
pQInfo
->
code
);
}
}
sem_wait
(
&
pQInfo
->
dataReady
);
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
*
numOfRows
=
pQInfo
->
rec
.
pointsRead
;
*
rowsize
=
pQuery
->
rowSize
;
dTrace
(
"QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d"
,
pQInfo
,
*
rowsize
,
*
numOfRows
,
pQInfo
->
code
);
if
(
pQInfo
->
code
<
0
)
{
// less than 0 means there are error existed.
return
-
pQInfo
->
code
;
}
return
(
pQInfo
->
code
>=
0
)
?
pQInfo
->
code
:
(
-
pQInfo
->
code
);
}
static
size_t
getResultSize
(
SQInfo
*
pQInfo
,
int64_t
*
numOfRows
)
{
...
...
@@ -6250,6 +6230,11 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) {
pQInfo
->
rec
.
pointsTotal
+=
pQInfo
->
rec
.
pointsRead
;
dTrace
(
"QInfo:%p current:%d, total:%d"
,
pQInfo
,
pQInfo
->
rec
.
pointsRead
,
pQInfo
->
rec
.
pointsTotal
);
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
return
TSDB_CODE_SUCCESS
;
// todo if interpolation exists, the result may be dump to client by several rounds
}
static
void
addToTaskQueue
(
SQInfo
*
pQInfo
)
{
...
...
@@ -6261,11 +6246,7 @@ static void addToTaskQueue(SQInfo* pQInfo) {
dTrace
(
"QInfo:%p set query flag, sig:%"
PRIu64
", func:%s"
,
pQInfo
,
pQInfo
->
signature
,
__FUNCTION__
);
#endif
if
(
pQInfo
->
killed
==
1
)
{
dTrace
(
"%p freed or killed, abort query"
,
pQInfo
);
}
else
{
// todo add to task queue
}
// todo add to task queue
}
}
...
...
@@ -6293,12 +6274,20 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
}
if
(
pQInfo
->
rec
.
pointsRead
>
0
&&
code
==
TSDB_CODE_SUCCESS
)
{
doDumpQueryResult
(
pQInfo
,
(
*
pRsp
)
->
data
,
NULL
);
code
=
doDumpQueryResult
(
pQInfo
,
(
*
pRsp
)
->
data
,
NULL
);
// has more data to return or need next round to execute
addToTaskQueue
(
pQInfo
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
isQueryKilled
(
pQuery
))
{
code
=
TSDB_CODE_QUERY_CANCELLED
;
}
assert
(
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
);
if
(
isQueryKilled
(
pQuery
)
||
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
(
*
pRsp
)
->
completed
=
1
;
// notify no more result to client
vnodeFreeQInfo
(
pQInfo
);
}
return
code
;
// if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) {
// dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code);
...
...
src/util/src/shash.c
浏览文件 @
355eeda4
...
...
@@ -162,8 +162,6 @@ void taosDeleteStrHash(void *handle, char *string) {
if
(
pObj
==
NULL
||
pObj
->
maxSessions
==
0
)
return
;
if
(
string
==
NULL
||
string
[
0
]
==
0
)
return
;
return
;
hash
=
(
*
(
pObj
->
hashFp
))(
pObj
,
string
);
pthread_mutex_lock
(
&
pObj
->
mutex
);
...
...
src/vnode/tsdb/inc/tsdbFile.h
浏览文件 @
355eeda4
...
...
@@ -34,20 +34,22 @@ typedef enum {
TSDB_FILE_TYPE_MAX
}
TSDB_FILE_TYPE
;
extern
const
char
*
tsdbFileSuffix
[];
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX)
typedef
struct
{
int64_t
size
;
int64_t
tombSize
;
}
SFileInfo
;
extern
const
char
*
tsdbFileSuffix
[];
typedef
struct
{
int8_t
type
;
int
fd
;
char
fname
[
128
];
int64_t
size
;
// total size of the file
int64_t
tombSize
;
// unused file size
int32_t
totalBlocks
;
int32_t
totalSubBlocks
;
}
SFile
;
#define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1)
typedef
struct
{
int32_t
fileId
;
SFile
files
[
TSDB_FILE_TYPE_MAX
];
...
...
@@ -55,14 +57,26 @@ typedef struct {
// TSDB file handle
typedef
struct
{
int32_t
daysPerFile
;
int32_t
keep
;
int32_t
minRowPerFBlock
;
int32_t
maxRowsPerFBlock
;
int32_t
maxTables
;
int
maxFGroups
;
int
numOfFGroups
;
SFileGroup
fGroup
[];
}
STsdbFileH
;
#define TSDB_MIN_FILE_ID(fh) (fh)->fGroup[0].fileId
#define TSDB_MAX_FILE_ID(fh) (fh)->fGroup[(fh)->numOfFGroups - 1].fileId
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
int
maxFiles
);
void
tsdbCloseFileH
(
STsdbFileH
*
pFileH
);
int
tsdbCreateFGroup
(
STsdbFileH
*
pFileH
,
char
*
dataDir
,
int
fid
,
int
maxTables
);
int
tsdbRemoveFileGroup
(
STsdbFileH
*
pFile
,
int
fid
);
typedef
struct
{
int32_t
len
;
int32_t
padding
;
// For padding purpose
int64_t
offset
;
}
SCompIdx
;
/**
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
...
...
@@ -83,14 +97,32 @@ typedef struct {
TSKEY
keyLast
;
}
SCompBlock
;
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX)
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
checksum
;
// TODO: decide if checksum logic in this file or make it one API
int64_t
uid
;
int32_t
padding
;
// For padding purpose
int32_t
numOfBlocks
;
// TODO: make the struct padding
SCompBlock
blocks
[];
}
SCompInfo
;
STsdbFileH
*
tsdbInitFile
(
char
*
dataDir
,
int32_t
daysPerFile
,
int32_t
keep
,
int32_t
minRowsPerFBlock
,
int32_t
maxRowsPerFBlock
,
int32_t
maxTables
);
// TODO: take pre-calculation into account
typedef
struct
{
int16_t
colId
;
// Column ID
int16_t
len
;
// Column length
int32_t
type
:
8
;
int32_t
offset
:
24
;
}
SCompCol
;
// TODO: Take recover into account
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
numOfCols
;
// For recovery usage
int64_t
uid
;
// For recovery usage
SCompCol
cols
[];
}
SCompData
;
void
tsdbCloseFile
(
STsdbFileH
*
pFileH
);
int
tsdbCreateFileGroup
(
char
*
dataDir
,
int
fileId
,
SFileGroup
*
pFGroup
,
int
maxTables
);
void
tsdbGetKeyRangeOfFileId
(
int32_t
daysPerFile
,
int8_t
precision
,
int32_t
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
void
tsdbGetKeyRangeOfFileId
(
int32_t
daysPerFile
,
int8_t
precision
,
int32_t
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
#ifdef __cplusplus
}
#endif
...
...
src/vnode/tsdb/src/tsdbFile.c
浏览文件 @
355eeda4
...
...
@@ -27,72 +27,126 @@
#define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F
typedef
struct
{
int32_t
len
;
int32_t
padding
;
// For padding purpose
int64_t
offset
;
}
SCompIdx
;
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
checksum
;
// TODO: decide if checksum logic in this file or make it one API
int64_t
uid
;
int32_t
padding
;
// For padding purpose
int32_t
numOfBlocks
;
// TODO: make the struct padding
SCompBlock
blocks
[];
}
SCompInfo
;
// TODO: take pre-calculation into account
typedef
struct
{
int16_t
colId
;
// Column ID
int16_t
len
;
// Column length
int32_t
type
:
8
;
int32_t
offset
:
24
;
}
SCompCol
;
// TODO: Take recover into account
typedef
struct
{
int32_t
delimiter
;
// For recovery usage
int32_t
numOfCols
;
// For recovery usage
int64_t
uid
;
// For recovery usage
SCompCol
cols
[];
}
SCompData
;
const
char
*
tsdbFileSuffix
[]
=
{
".head"
,
// TSDB_FILE_TYPE_HEAD
".data"
,
// TSDB_FILE_TYPE_DATA
".last"
// TSDB_FILE_TYPE_LAST
};
static
int
tsdbWriteFileHead
(
int
fd
,
SFile
*
pFile
)
{
static
int
compFGroupKey
(
const
void
*
key
,
const
void
*
fgroup
);
static
int
compFGroup
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
tsdbGetFileName
(
char
*
dataDir
,
int
fileId
,
int8_t
type
,
char
*
fname
);
static
int
tsdbCreateFile
(
char
*
dataDir
,
int
fileId
,
int8_t
type
,
int
maxTables
,
SFile
*
pFile
);
static
int
tsdbWriteFileHead
(
SFile
*
pFile
);
static
int
tsdbWriteHeadFileIdx
(
SFile
*
pFile
,
int
maxTables
);
STsdbFileH
*
tsdbInitFileH
(
char
*
dataDir
,
int
maxFiles
)
{
STsdbFileH
*
pFileH
=
(
STsdbFileH
*
)
calloc
(
1
,
sizeof
(
STsdbFileH
)
+
sizeof
(
SFileGroup
)
*
maxFiles
);
if
(
pFileH
==
NULL
)
{
// TODO: deal with ERROR here
return
NULL
;
}
pFileH
->
maxFGroups
=
maxFiles
;
DIR
*
dir
=
opendir
(
dataDir
);
if
(
dir
==
NULL
)
{
free
(
pFileH
);
return
NULL
;
}
struct
dirent
*
dp
;
while
((
dp
=
readdir
(
dir
))
!=
NULL
)
{
if
(
strncmp
(
dp
->
d_name
,
"."
,
1
)
==
0
||
strncmp
(
dp
->
d_name
,
".."
,
1
)
==
0
)
continue
;
// TODO
}
return
pFileH
;
}
void
tsdbCloseFileH
(
STsdbFileH
*
pFileH
)
{
free
(
pFileH
);
}
int
tsdbCreateFGroup
(
STsdbFileH
*
pFileH
,
char
*
dataDir
,
int
fid
,
int
maxTables
)
{
if
(
pFileH
->
numOfFGroups
>=
pFileH
->
maxFGroups
)
return
-
1
;
SFileGroup
fGroup
;
SFileGroup
*
pFGroup
=
&
fGroup
;
if
(
fid
<
TSDB_MIN_FILE_ID
(
pFileH
)
||
fid
>
TSDB_MAX_FILE_ID
(
pFileH
)
||
bsearch
((
void
*
)
&
fid
,
(
void
*
)(
pFileH
->
fGroup
),
pFileH
->
numOfFGroups
,
sizeof
(
SFileGroup
),
compFGroupKey
)
==
NULL
)
{
pFGroup
->
fileId
=
fid
;
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
tsdbCreateFile
(
dataDir
,
fid
,
type
,
maxTables
,
&
(
pFGroup
->
files
[
type
]))
<
0
)
{
// TODO: deal with the ERROR here, remove those creaed file
return
-
1
;
}
}
pFileH
->
fGroup
[
pFileH
->
numOfFGroups
++
]
=
fGroup
;
qsort
((
void
*
)(
pFileH
->
fGroup
),
pFileH
->
numOfFGroups
,
sizeof
(
SFileGroup
),
compFGroup
);
}
return
0
;
}
int
tsdbRemoveFileGroup
(
STsdbFileH
*
pFileH
,
int
fid
)
{
SFileGroup
*
pGroup
=
bsearch
((
void
*
)
&
fid
,
(
void
*
)(
pFileH
->
fGroup
),
pFileH
->
numOfFGroups
,
sizeof
(
SFileGroup
),
compFGroupKey
);
if
(
pGroup
==
NULL
)
return
-
1
;
// Remove from disk
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
remove
(
pGroup
->
files
[
type
].
fname
);
}
// Adjust the memory
int
filesBehind
=
pFileH
->
numOfFGroups
-
(((
char
*
)
pGroup
-
(
char
*
)(
pFileH
->
fGroup
))
/
sizeof
(
SFileGroup
)
+
1
);
if
(
filesBehind
>
0
)
{
memmove
((
void
*
)
pGroup
,
(
void
*
)((
char
*
)
pGroup
+
sizeof
(
SFileGroup
)),
sizeof
(
SFileGroup
)
*
filesBehind
);
}
pFileH
->
numOfFGroups
--
;
return
0
;
}
static
int
compFGroupKey
(
const
void
*
key
,
const
void
*
fgroup
)
{
int
fid
=
*
(
int
*
)
key
;
SFileGroup
*
pFGroup
=
(
SFileGroup
*
)
fgroup
;
return
(
fid
-
pFGroup
->
fileId
);
}
static
int
compFGroup
(
const
void
*
arg1
,
const
void
*
arg2
)
{
return
((
SFileGroup
*
)
arg1
)
->
fileId
-
((
SFileGroup
*
)
arg2
)
->
fileId
;
}
static
int
tsdbWriteFileHead
(
SFile
*
pFile
)
{
char
head
[
TSDB_FILE_HEAD_SIZE
]
=
"
\0
"
;
pFile
->
size
+=
TSDB_FILE_HEAD_SIZE
;
// TODO: write version and File statistic to the head
lseek
(
fd
,
0
,
SEEK_SET
);
if
(
write
(
fd
,
head
,
TSDB_FILE_HEAD_SIZE
)
<
0
)
return
-
1
;
lseek
(
pFile
->
fd
,
0
,
SEEK_SET
);
if
(
write
(
pFile
->
fd
,
head
,
TSDB_FILE_HEAD_SIZE
)
<
0
)
return
-
1
;
return
0
;
}
static
int
tsdbWriteHeadFileIdx
(
int
fd
,
int
maxTables
,
SFile
*
pFile
)
{
static
int
tsdbWriteHeadFileIdx
(
SFile
*
pFile
,
int
maxTables
)
{
int
size
=
sizeof
(
SCompIdx
)
*
maxTables
;
void
*
buf
=
calloc
(
1
,
size
);
if
(
buf
==
NULL
)
return
-
1
;
if
(
lseek
(
fd
,
TSDB_FILE_HEAD_SIZE
,
SEEK_SET
)
<
0
)
{
if
(
lseek
(
pFile
->
fd
,
TSDB_FILE_HEAD_SIZE
,
SEEK_SET
)
<
0
)
{
free
(
buf
);
return
-
1
;
}
if
(
write
(
fd
,
buf
,
size
)
<
0
)
{
if
(
write
(
pFile
->
fd
,
buf
,
size
)
<
0
)
{
free
(
buf
);
return
-
1
;
}
pFile
->
size
+=
size
;
free
(
buf
);
return
0
;
}
...
...
@@ -104,12 +158,27 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname)
return
0
;
}
/**
* Create a file and set the SFile object
*/
static
int
tsdbOpenFileForWrite
(
SFile
*
pFile
,
int
oflag
)
{
// TODO: change the function
if
(
TSDB_IS_FILE_OPENED
(
pFile
))
return
-
1
;
pFile
->
fd
=
open
(
pFile
->
fname
,
oflag
,
0755
);
if
(
pFile
->
fd
<
0
)
return
-
1
;
return
0
;
}
static
int
tsdbCloseFile
(
SFile
*
pFile
)
{
if
(
!
TSDB_IS_FILE_OPENED
(
pFile
))
return
-
1
;
int
ret
=
close
(
pFile
->
fd
);
pFile
->
fd
=
-
1
;
return
ret
;
}
static
int
tsdbCreateFile
(
char
*
dataDir
,
int
fileId
,
int8_t
type
,
int
maxTables
,
SFile
*
pFile
)
{
memset
((
void
*
)
pFile
,
0
,
sizeof
(
SFile
));
pFile
->
type
=
type
;
pFile
->
fd
=
-
1
;
tsdbGetFileName
(
dataDir
,
fileId
,
type
,
pFile
->
fname
);
if
(
access
(
pFile
->
fname
,
F_OK
)
==
0
)
{
...
...
@@ -117,93 +186,28 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables,
return
-
1
;
}
int
fd
=
open
(
pFile
->
fname
,
O_WRONLY
|
O_CREAT
,
0755
);
if
(
fd
<
0
)
return
-
1
;
if
(
tsdbOpenFileForWrite
(
pFile
,
O_WRONLY
|
O_CREAT
)
<
0
)
{
// TODO: deal with the ERROR here
return
-
1
;
}
if
(
type
==
TSDB_FILE_TYPE_HEAD
)
{
if
(
tsdbWriteHeadFileIdx
(
fd
,
maxTables
,
pFile
)
<
0
)
{
close
(
fd
);
if
(
tsdbWriteHeadFileIdx
(
pFile
,
maxTables
)
<
0
)
{
tsdbCloseFile
(
pFile
);
return
-
1
;
}
}
if
(
tsdbWriteFileHead
(
fd
,
pFile
)
<
0
)
{
close
(
fd
);
if
(
tsdbWriteFileHead
(
pFile
)
<
0
)
{
tsdbCloseFile
(
pFile
);
return
-
1
;
}
close
(
fd
);
return
0
;
}
static
int
tsdbRemoveFile
(
SFile
*
pFile
)
{
if
(
pFile
==
NULL
)
return
-
1
;
return
remove
(
pFile
->
fname
);
}
// Create a file group with fileId and return a SFileGroup object
int
tsdbCreateFileGroup
(
char
*
dataDir
,
int
fileId
,
SFileGroup
*
pFGroup
,
int
maxTables
)
{
if
(
dataDir
==
NULL
||
pFGroup
==
NULL
)
return
-
1
;
memset
((
void
*
)
pFGroup
,
0
,
sizeof
(
SFileGroup
));
for
(
int
type
=
TSDB_FILE_TYPE_HEAD
;
type
<
TSDB_FILE_TYPE_MAX
;
type
++
)
{
if
(
tsdbCreateFile
(
dataDir
,
fileId
,
type
,
maxTables
,
&
(
pFGroup
->
files
[
type
]))
<
0
)
{
// TODO: deal with the error here, remove the created files
return
-
1
;
}
}
pFGroup
->
fileId
=
fileId
;
tsdbCloseFile
(
pFile
);
return
0
;
}
/**
* Initialize the TSDB file handle
*/
STsdbFileH
*
tsdbInitFile
(
char
*
dataDir
,
int32_t
daysPerFile
,
int32_t
keep
,
int32_t
minRowsPerFBlock
,
int32_t
maxRowsPerFBlock
,
int32_t
maxTables
)
{
STsdbFileH
*
pTsdbFileH
=
(
STsdbFileH
*
)
calloc
(
1
,
sizeof
(
STsdbFileH
)
+
sizeof
(
SFileGroup
)
*
tsdbGetMaxNumOfFiles
(
keep
,
daysPerFile
));
if
(
pTsdbFileH
==
NULL
)
return
NULL
;
pTsdbFileH
->
daysPerFile
=
daysPerFile
;
pTsdbFileH
->
keep
=
keep
;
pTsdbFileH
->
minRowPerFBlock
=
minRowsPerFBlock
;
pTsdbFileH
->
maxRowsPerFBlock
=
maxRowsPerFBlock
;
pTsdbFileH
->
maxTables
=
maxTables
;
// Open the directory to read information of each file
DIR
*
dir
=
opendir
(
dataDir
);
if
(
dir
==
NULL
)
{
free
(
pTsdbFileH
);
return
NULL
;
}
char
fname
[
256
];
struct
dirent
*
dp
;
while
((
dp
=
readdir
(
dir
))
!=
NULL
)
{
if
(
strncmp
(
dp
->
d_name
,
"."
,
1
)
==
0
||
strncmp
(
dp
->
d_name
,
".."
,
2
)
==
0
)
continue
;
if
(
true
/* check if the file is the .head file */
)
{
int
fileId
=
0
;
int
vgId
=
0
;
sscanf
(
dp
->
d_name
,
"v%df%d.head"
,
&
vgId
,
&
fileId
);
// TODO
// Open head file
// Open data file
// Open last file
}
}
return
pTsdbFileH
;
}
void
tsdbGetKeyRangeOfFileId
(
int32_t
daysPerFile
,
int8_t
precision
,
int32_t
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
)
{
*
minKey
=
fileId
*
daysPerFile
*
tsMsPerDay
[
precision
];
...
...
src/vnode/tsdb/src/tsdbMain.c
浏览文件 @
355eeda4
...
...
@@ -182,7 +182,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO
char
dataDir
[
128
]
=
"
\0
"
;
tsdbGetDataDirName
(
pRepo
,
dataDir
);
pRepo
->
tsdbFileH
=
tsdbInitFile
(
dataDir
,
pCfg
->
daysPerFile
,
pCfg
->
keep
,
pCfg
->
minRowsPerFileBlock
,
pCfg
->
maxRowsPerFileBlock
,
pCfg
->
maxTables
);
tsdbInitFile
H
(
dataDir
,
pCfg
->
maxTables
);
if
(
pRepo
->
tsdbFileH
==
NULL
)
{
free
(
pRepo
->
rootDir
);
tsdbFreeCache
(
pRepo
->
tsdbCache
);
...
...
@@ -782,19 +782,51 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
return
numOfRows
;
}
static
void
tsdbDestroyTableIters
(
SSkipListIterator
**
iters
,
int
maxTables
)
{
if
(
iters
==
NULL
)
return
;
for
(
int
tid
=
0
;
tid
<
maxTables
;
tid
++
)
{
if
(
iters
[
tid
]
==
NULL
)
continue
;
tSkipListDestroyIter
(
iters
[
tid
]);
}
free
(
iters
);
}
static
SSkipListIterator
**
tsdbCreateTableIters
(
STsdbMeta
*
pMeta
,
int
maxTables
)
{
SSkipListIterator
**
iters
=
(
SSkipListIterator
*
)
calloc
(
maxTables
,
sizeof
(
SSkipListIterator
*
));
if
(
iters
==
NULL
)
return
NULL
;
for
(
int
tid
=
0
;
tid
<
maxTables
;
tid
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
tid
];
if
(
pTable
==
NULL
||
pTable
->
imem
==
NULL
)
continue
;
iters
[
tid
]
=
tSkipListCreateIter
(
pTable
->
imem
->
pData
);
if
(
iters
[
tid
]
==
NULL
)
{
tsdbDestroyTableIters
(
iters
,
maxTables
);
return
NULL
;
}
if
(
!
tSkipListIterNext
(
iters
[
tid
]))
{
assert
(
false
);
}
}
return
iters
;
}
// Commit to file
static
void
*
tsdbCommitToFile
(
void
*
arg
)
{
// TODO
printf
(
"Starting to commit....
\n
"
);
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
arg
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbCache
*
pCache
=
pRepo
->
tsdbCache
;
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
if
(
pCache
->
imem
==
NULL
)
return
;
int
sfid
=
tsdbGetKeyFileId
(
pCache
->
imem
->
keyFirst
,
pCfg
->
daysPerFile
,
pCfg
->
precision
);
int
efid
=
tsdbGetKeyFileId
(
pCache
->
imem
->
keyLast
,
pCfg
->
daysPerFile
,
pCfg
->
precision
);
SSkipListIterator
**
iters
=
(
SSkipListIterator
**
)
calloc
(
pCfg
->
maxTables
,
sizeof
(
SSkipListIterator
*
));
// Create the iterator to read from cache
SSkipListIterator
**
iters
=
tsdbCreateTableIters
(
pMeta
,
pCfg
->
maxTables
);
if
(
iters
==
NULL
)
{
// TODO: deal with the error
return
NULL
;
...
...
@@ -805,10 +837,15 @@ static void *tsdbCommitToFile(void *arg) {
SDataCol
**
cols
=
(
SDataCol
**
)
malloc
(
sizeof
(
SDataCol
*
)
*
maxCols
);
void
*
buf
=
malloc
((
maxBytes
+
sizeof
(
SDataCol
))
*
pCfg
->
maxRowsPerFileBlock
);
int
sfid
=
tsdbGetKeyFileId
(
pCache
->
imem
->
keyFirst
,
pCfg
->
daysPerFile
,
pCfg
->
precision
);
int
efid
=
tsdbGetKeyFileId
(
pCache
->
imem
->
keyLast
,
pCfg
->
daysPerFile
,
pCfg
->
precision
);
for
(
int
fid
=
sfid
;
fid
<=
efid
;
fid
++
)
{
TSKEY
minKey
=
0
,
maxKey
=
0
;
tsdbGetKeyRangeOfFileId
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fid
,
&
minKey
,
&
maxKey
);
// tsdbOpenFileForWrite(pRepo, fid);
for
(
int
tid
=
0
;
tid
<
pCfg
->
maxTables
;
tid
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
tid
];
if
(
pTable
==
NULL
||
pTable
->
imem
==
NULL
)
continue
;
...
...
@@ -837,14 +874,10 @@ static void *tsdbCommitToFile(void *arg) {
}
}
// Free the iterator
for
(
int
tid
=
0
;
tid
<
pCfg
->
maxTables
;
tid
++
)
{
if
(
iters
[
tid
]
!=
NULL
)
tSkipListDestroyIter
(
iters
[
tid
]);
}
tsdbDestroyTableIters
(
iters
,
pCfg
->
maxTables
);
free
(
buf
);
free
(
cols
);
free
(
iters
);
tsdbLockRepo
(
arg
);
tdListMove
(
pCache
->
imem
->
list
,
pCache
->
pool
.
memPool
);
...
...
src/vnode/tsdb/tests/tsdbTests.cpp
浏览文件 @
355eeda4
...
...
@@ -142,7 +142,7 @@ TEST(TsdbTest, DISABLED_openRepo) {
TEST
(
TsdbTest
,
DISABLED_createFileGroup
)
{
SFileGroup
fGroup
;
ASSERT_EQ
(
tsdbCreateFileGroup
(
"/home/ubuntu/work/ttest/vnode0/data"
,
1820
,
&
fGroup
,
1000
),
0
);
//
ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0);
int
k
=
0
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录