Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ce71ba91
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ce71ba91
编写于
3月 09, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/3.0_mhli
上级
30be9d3e
35b82d79
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
415 addition
and
127 deletion
+415
-127
include/common/taosdef.h
include/common/taosdef.h
+5
-0
include/os/osDir.h
include/os/osDir.h
+22
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+17
-1
source/dnode/vnode/src/inc/tqPush.h
source/dnode/vnode/src/inc/tqPush.h
+8
-5
source/dnode/vnode/src/inc/tsdbDef.h
source/dnode/vnode/src/inc/tsdbDef.h
+1
-0
source/dnode/vnode/src/inc/tsdbFS.h
source/dnode/vnode/src/inc/tsdbFS.h
+4
-1
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+6
-3
source/dnode/vnode/src/meta/metaBDBImpl.c
source/dnode/vnode/src/meta/metaBDBImpl.c
+2
-6
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+20
-0
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+1
-1
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
+14
-0
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbMain.c
source/dnode/vnode/src/tsdb/tsdbMain.c
+1
-0
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+155
-0
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+6
-6
source/libs/tfs/inc/tfsInt.h
source/libs/tfs/inc/tfsInt.h
+1
-1
source/libs/tfs/src/tfs.c
source/libs/tfs/src/tfs.c
+34
-33
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+12
-12
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+0
-3
source/libs/wal/test/walMetaTest.cpp
source/libs/wal/test/walMetaTest.cpp
+11
-11
source/os/src/osDir.c
source/os/src/osDir.c
+48
-0
source/os/src/osSemaphore.c
source/os/src/osSemaphore.c
+28
-28
source/os/src/osSocket.c
source/os/src/osSocket.c
+4
-2
source/os/src/osTimer.c
source/os/src/osTimer.c
+4
-4
tests/test/c/tmqDemo.c
tests/test/c/tmqDemo.c
+9
-8
未找到文件。
include/common/taosdef.h
浏览文件 @
ce71ba91
...
...
@@ -56,6 +56,11 @@ typedef enum {
TSDB_STATIS_NONE
=
1
,
// statis part not exist
}
ETsdbStatisStatus
;
typedef
enum
{
TSDB_SMA_STAT_OK
=
0
,
// ready to provide service
TSDB_SMA_STAT_EXPIRED
=
1
,
// not ready or expired
}
ETsdbSmaStat
;
extern
char
*
qtypeStr
[];
#ifdef __cplusplus
...
...
include/os/osDir.h
浏览文件 @
ce71ba91
...
...
@@ -16,10 +16,24 @@
#ifndef _TD_OS_DIR_H_
#define _TD_OS_DIR_H_
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define opendir OPENDIR_FUNC_TAOS_FORBID
#define readdir READDIR_FUNC_TAOS_FORBID
#define closedir CLOSEDIR_FUNC_TAOS_FORBID
#define dirname DIRNAME_FUNC_TAOS_FORBID
#undef basename
#define basename BASENAME_FUNC_TAOS_FORBID
#endif
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
TdDir
*
TdDirPtr
;
typedef
struct
TdDirEntry
*
TdDirEntryPtr
;
void
taosRemoveDir
(
const
char
*
dirname
);
bool
taosDirExist
(
char
*
dirname
);
int32_t
taosMkDir
(
const
char
*
dirname
);
...
...
@@ -27,6 +41,14 @@ void taosRemoveOldFiles(const char *dirname, int32_t keepDays);
int32_t
taosExpandDir
(
const
char
*
dirname
,
char
*
outname
,
int32_t
maxlen
);
int32_t
taosRealPath
(
char
*
dirname
,
int32_t
maxlen
);
bool
taosIsDir
(
const
char
*
dirname
);
char
*
taosDirName
(
char
*
dirname
);
char
*
taosDirEntryBaseName
(
char
*
dirname
);
TdDirPtr
taosOpenDir
(
const
char
*
dirname
);
TdDirEntryPtr
taosReadDir
(
TdDirPtr
pDir
);
bool
taosDirEntryIsDir
(
TdDirEntryPtr
pDirEntry
);
char
*
taosGetDirEntryName
(
TdDirEntryPtr
pDirEntry
);
int32_t
taosCloseDir
(
TdDirPtr
pDir
);
#ifdef __cplusplus
}
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
ce71ba91
...
...
@@ -59,7 +59,7 @@ typedef struct {
SWalCfg
walCfg
;
uint32_t
hashBegin
;
uint32_t
hashEnd
;
int8_t
hashMethod
;
int8_t
hashMethod
;
}
SVnodeCfg
;
typedef
struct
{
...
...
@@ -202,6 +202,22 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
/* ------------------------- TQ READ --------------------------- */
enum
{
TQ_STREAM_TOKEN__DATA
=
1
,
TQ_STREAM_TOKEN__WATERMARK
,
TQ_STREAM_TOKEN__CHECKPOINT
,
};
typedef
struct
{
int8_t
type
;
int8_t
reserved
[
7
];
union
{
void
*
data
;
int64_t
wmTs
;
int64_t
checkpointId
;
};
}
STqStreamToken
;
STqReadHandle
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
);
static
FORCE_INLINE
void
tqReadHandleSetColIdList
(
STqReadHandle
*
pReadHandle
,
SArray
*
pColIdList
)
{
...
...
source/dnode/vnode/src/inc/tqPush.h
浏览文件 @
ce71ba91
...
...
@@ -16,9 +16,11 @@
#ifndef _TQ_PUSH_H_
#define _TQ_PUSH_H_
#include "executor.h"
#include "thash.h"
#include "trpc.h"
#include "ttimer.h"
#include "vnode.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -39,11 +41,12 @@ typedef struct {
}
STqClientPusher
;
typedef
struct
{
int8_t
type
;
int8_t
nodeType
;
int8_t
reserved
[
6
];
int64_t
streamId
;
SEpSet
epSet
;
int8_t
type
;
int8_t
nodeType
;
int8_t
reserved
[
6
];
int64_t
streamId
;
qTaskInfo_t
task
;
// TODO sync function
}
STqStreamPusher
;
typedef
struct
{
...
...
source/dnode/vnode/src/inc/tsdbDef.h
浏览文件 @
ce71ba91
...
...
@@ -52,6 +52,7 @@ struct STsdb {
STsdbFS
*
fs
;
SMeta
*
pMeta
;
STfs
*
pTfs
;
SSmaStat
*
pSmaStat
;
};
#define REPO_ID(r) ((r)->vgId)
...
...
source/dnode/vnode/src/inc/tsdbFS.h
浏览文件 @
ce71ba91
...
...
@@ -42,7 +42,10 @@ typedef struct {
typedef
struct
{
STsdbFSMeta
meta
;
// FS meta
SArray
*
df
;
// data file array
SArray
*
smaf
;
// sma data file array
// SArray * v2f100.tsma.index_name
SArray
*
smaf
;
// sma data file array v2f1900.tsma.index_name
}
SFSStatus
;
typedef
struct
{
...
...
source/dnode/vnode/src/inc/tsdbSma.h
浏览文件 @
ce71ba91
...
...
@@ -16,6 +16,8 @@
#ifndef _TD_TSDB_SMA_H_
#define _TD_TSDB_SMA_H_
typedef
struct
SSmaStat
SSmaStat
;
// insert/update interface
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
SRSma
*
param
,
STSmaData
*
pData
);
...
...
@@ -26,13 +28,14 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData);
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
,
STimeWindow
*
queryWin
,
int32_t
nMaxResult
);
// management interface
int32_t
tsdbGetTSmaStatus
(
STsdb
*
pTsdb
,
STSma
*
param
,
void
*
result
);
int32_t
tsdbUpdateExpiredWindow
(
STsdb
*
pTsdb
,
char
*
msg
);
int32_t
tsdbGetTSmaStatus
(
STsdb
*
pTsdb
,
STSma
*
param
,
void
*
result
);
int32_t
tsdbRemoveTSmaData
(
STsdb
*
pTsdb
,
STSma
*
param
,
STimeWindow
*
pWin
);
int32_t
tsdbFreeSmaState
(
SSmaStat
*
pSmaStat
);
// internal func
// internal func
static
FORCE_INLINE
int32_t
tsdbEncodeTSmaKey
(
uint64_t
tableUid
,
col_id_t
colId
,
TSKEY
tsKey
,
void
**
pData
)
{
int32_t
len
=
0
;
len
+=
taosEncodeFixedU64
(
pData
,
tableUid
);
...
...
source/dnode/vnode/src/meta/metaBDBImpl.c
浏览文件 @
ce71ba91
...
...
@@ -923,6 +923,7 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
SMetaDB
*
pDB
=
pMeta
->
pDB
;
DBC
*
pCur
=
NULL
;
DBT
pkey
=
{
0
},
pval
=
{
0
};
uint32_t
mode
=
isDup
?
DB_NEXT_DUP
:
DB_NEXT_NODUP
;
int
ret
;
pUids
=
taosArrayInit
(
16
,
sizeof
(
tb_uid_t
));
...
...
@@ -941,13 +942,8 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) {
void
*
pBuf
=
NULL
;
// TODO: lock?
while
(
true
)
{
ret
=
pCur
->
get
(
pCur
,
&
pkey
,
&
pval
,
isDup
?
DB_NEXT_DUP
:
DB_NEXT_NODUP
);
if
(
ret
==
0
)
{
while
((
ret
=
pCur
->
get
(
pCur
,
&
pkey
,
&
pval
,
mode
))
==
0
)
{
taosArrayPush
(
pUids
,
pkey
.
data
);
continue
;
}
break
;
}
if
(
pCur
)
{
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
ce71ba91
...
...
@@ -67,6 +67,26 @@ void tqClose(STQ* pTq) {
}
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
tmsg_t
msgType
,
int64_t
version
)
{
if
(
msgType
!=
TDMT_VND_SUBMIT
)
return
0
;
void
*
pIter
=
taosHashIterate
(
pTq
->
tqPushMgr
->
pHash
,
NULL
);
while
(
pIter
!=
NULL
)
{
STqPusher
*
pusher
=
*
(
STqPusher
**
)
pIter
;
if
(
pusher
->
type
==
TQ_PUSHER_TYPE__STREAM
)
{
STqStreamPusher
*
streamPusher
=
(
STqStreamPusher
*
)
pusher
;
// repack
STqStreamToken
*
token
=
malloc
(
sizeof
(
STqStreamToken
));
if
(
token
==
NULL
)
{
taosHashCancelIterate
(
pTq
->
tqPushMgr
->
pHash
,
pIter
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
token
->
type
=
TQ_STREAM_TOKEN__DATA
;
token
->
data
=
msg
;
// set input
// exec
}
// send msg to ep
}
// iterate hash
// process all msg
// if waiting
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
ce71ba91
...
...
@@ -73,7 +73,7 @@ STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet
streamPusher
->
type
=
TQ_PUSHER_TYPE__STREAM
;
streamPusher
->
nodeType
=
0
;
streamPusher
->
streamId
=
streamId
;
memcpy
(
&
streamPusher
->
epSet
,
pEpSet
,
sizeof
(
SEpSet
));
/*memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet));*/
if
(
taosHashPut
(
pushMgr
->
pHash
,
&
streamId
,
sizeof
(
int64_t
),
&
streamPusher
,
sizeof
(
void
*
))
<
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
ce71ba91
...
...
@@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "vnode.h"
...
...
@@ -37,6 +36,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
pMsg
->
length
=
htonl
(
pMsg
->
length
);
pMsg
->
numOfBlocks
=
htonl
(
pMsg
->
numOfBlocks
);
// iterate and convert
if
(
tInitSubmitMsgIter
(
pMsg
,
&
pReadHandle
->
msgIter
)
<
0
)
return
-
1
;
while
(
true
)
{
if
(
tGetSubmitMsgNext
(
&
pReadHandle
->
msgIter
,
&
pReadHandle
->
pBlock
)
<
0
)
return
-
1
;
...
...
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
0 → 100644
浏览文件 @
ce71ba91
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
ce71ba91
...
...
@@ -365,7 +365,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T
if
(
errno
==
ENOENT
)
{
// Try to create directory recursively
char
*
s
=
strdup
(
TSDB_FILE_REL_NAME
(
pDFile
));
if
(
tfsMkdirRecurAt
(
pRepo
->
pTfs
,
dirn
ame
(
s
),
TSDB_FILE_DID
(
pDFile
))
<
0
)
{
if
(
tfsMkdirRecurAt
(
pRepo
->
pTfs
,
taosDirN
ame
(
s
),
TSDB_FILE_DID
(
pDFile
))
<
0
)
{
tfree
(
s
);
return
-
1
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbMain.c
浏览文件 @
ce71ba91
...
...
@@ -89,6 +89,7 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
static
void
tsdbFree
(
STsdb
*
pTsdb
)
{
if
(
pTsdb
)
{
tsdbFreeFS
(
pTsdb
->
fs
);
tsdbFreeSmaState
(
pTsdb
->
pSmaStat
);
tfree
(
pTsdb
->
path
);
free
(
pTsdb
);
}
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
ce71ba91
...
...
@@ -21,6 +21,10 @@
#define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks
#define SMA_STATE_HASH_SLOT 4
#define SMA_STATE_ITEM_HASH_SLOT 32
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
typedef
enum
{
SMA_STORAGE_LEVEL_TSDB
=
0
,
// store TSma in dir e.g. vnode${N}/tsdb/.tsma
SMA_STORAGE_LEVEL_DFILESET
=
1
// store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name}
...
...
@@ -48,6 +52,22 @@ typedef struct {
// TODO
}
STSmaReadH
;
typedef
struct
{
/**
* @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
* without information about its previous state.
* - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
* Streaming Module or TSDB local persistence.
*/
int8_t
state
;
// ETsdbSmaStat
SHashObj
*
expiredWindows
;
// key: skey of time window, value: N/A
}
SSmaStatItem
;
struct
SSmaStat
{
SHashObj
*
smaStatItems
;
// key: indexName, value: SSmaStatItem
};
// declaration of static functions
static
int32_t
tsdbInitTSmaWriteH
(
STSmaWriteH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
static
int32_t
tsdbInitTSmaReadH
(
STSmaReadH
*
pSmaH
,
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
);
...
...
@@ -64,6 +84,125 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param,
static
int32_t
tsdbInitTSmaFile
(
STSmaReadH
*
pReadH
,
STSma
*
param
,
STimeWindow
*
queryWin
);
static
bool
tsdbSetAndOpenTSmaFile
(
STSmaReadH
*
pReadH
,
STSma
*
param
,
STimeWindow
*
queryWin
);
static
int32_t
tsdbInitSmaStat
(
SSmaStat
**
pSmaStat
)
{
ASSERT
(
pSmaStat
!=
NULL
);
if
(
*
pSmaStat
!=
NULL
)
{
// no lock
return
TSDB_CODE_SUCCESS
;
}
// TODO: lock. lazy mode when update expired window, or hungry mode during tsdbNew.
if
(
*
pSmaStat
==
NULL
)
{
*
pSmaStat
=
(
SSmaStat
*
)
calloc
(
1
,
sizeof
(
SSmaStat
));
if
(
*
pSmaStat
==
NULL
)
{
// TODO: unlock
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
(
*
pSmaStat
)
->
smaStatItems
=
taosHashInit
(
SMA_STATE_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
((
*
pSmaStat
)
->
smaStatItems
==
NULL
)
{
tfree
(
*
pSmaStat
);
// TODO: unlock
return
TSDB_CODE_FAILED
;
}
}
// TODO: unlock
return
TSDB_CODE_SUCCESS
;
}
static
SSmaStatItem
*
tsdbNewSmaStatItem
(
int8_t
state
)
{
SSmaStatItem
*
pItem
=
NULL
;
pItem
=
(
SSmaStatItem
*
)
calloc
(
1
,
sizeof
(
SSmaStatItem
));
if
(
pItem
)
{
pItem
->
state
=
state
;
pItem
->
expiredWindows
=
taosHashInit
(
SMA_STATE_ITEM_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_TIMESTAMP
),
true
,
HASH_ENTRY_LOCK
);
if
(
!
pItem
->
expiredWindows
)
{
tfree
(
pItem
);
}
}
return
pItem
;
}
int32_t
tsdbFreeSmaState
(
SSmaStat
*
pSmaStat
)
{
if
(
pSmaStat
)
{
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
SSmaStatItem
*
item
=
taosHashIterate
(
pSmaStat
->
smaStatItems
,
NULL
);
while
(
item
!=
NULL
)
{
taosHashCleanup
(
item
->
expiredWindows
);
item
=
taosHashIterate
(
pSmaStat
->
smaStatItems
,
item
);
}
taosHashCleanup
(
pSmaStat
->
smaStatItems
);
free
(
pSmaStat
);
}
}
/**
* @brief Update expired window according to msg from stream computing module.
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t
tsdbUpdateExpiredWindow
(
STsdb
*
pTsdb
,
char
*
msg
)
{
if
(
msg
==
NULL
)
{
return
TSDB_CODE_FAILED
;
}
tsdbInitSmaStat
(
&
pTsdb
->
pSmaStat
);
// lazy mode
// TODO: decode the msg => start
const
char
*
indexName
=
SMA_TEST_INDEX_NAME
;
const
int32_t
SMA_TEST_EXPIRED_WINDOW_SIZE
=
10
;
TSKEY
expiredWindows
[
SMA_TEST_EXPIRED_WINDOW_SIZE
];
int64_t
now
=
taosGetTimestampMs
();
for
(
int32_t
i
=
0
;
i
<
SMA_TEST_EXPIRED_WINDOW_SIZE
;
++
i
)
{
expiredWindows
[
i
]
=
now
+
i
;
}
// TODO: decode the msg <= end
SHashObj
*
pItemsHash
=
pTsdb
->
pSmaStat
->
smaStatItems
;
SSmaStatItem
*
pItem
=
(
SSmaStatItem
*
)
taosHashGet
(
pItemsHash
,
indexName
,
strlen
(
indexName
));
if
(
!
pItem
)
{
pItem
=
tsdbNewSmaStatItem
(
TSDB_SMA_STAT_EXPIRED
);
// TODO use the real state
if
(
!
pItem
)
{
// Response to stream computing: OOM
// For query, if the indexName not found, the TSDB should tell query module to query raw TS data.
return
TSDB_CODE_FAILED
;
}
if
(
taosHashPut
(
pItemsHash
,
indexName
,
strnlen
(
indexName
,
TSDB_INDEX_NAME_LEN
),
&
pItem
,
sizeof
(
pItem
))
!=
0
)
{
// If error occurs during put smaStatItem, free the resources of pItem
taosHashCleanup
(
pItem
->
expiredWindows
);
free
(
pItem
);
return
TSDB_CODE_FAILED
;
}
}
int8_t
state
=
TSDB_SMA_STAT_EXPIRED
;
for
(
int32_t
i
=
0
;
i
<
SMA_TEST_EXPIRED_WINDOW_SIZE
;
++
i
)
{
if
(
taosHashPut
(
pItem
->
expiredWindows
,
&
expiredWindows
[
i
],
sizeof
(
TSKEY
),
&
state
,
sizeof
(
state
))
!=
0
)
{
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
// tell query module to query raw TS data.
// N.B.
// 1) It is assumed to be extemely little probability event of fail to taosHashPut.
// 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired
// windows failed to put into hash table.
taosHashCleanup
(
pItem
->
expiredWindows
);
taosHashRemove
(
pItemsHash
,
indexName
,
sizeof
(
indexName
));
return
TSDB_CODE_FAILED
;
}
}
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief Judge the tSma storage level
*
...
...
@@ -484,6 +623,22 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow
* @return int32_t
*/
int32_t
tsdbGetTSmaDataImpl
(
STsdb
*
pTsdb
,
STSma
*
param
,
STSmaData
*
pData
,
STimeWindow
*
queryWin
,
int32_t
nMaxResult
)
{
const
char
*
indexName
=
param
->
indexName
;
SSmaStatItem
*
pItem
=
(
SSmaStatItem
*
)
taosHashGet
(
pTsdb
->
pSmaStat
->
smaStatItems
,
indexName
,
strlen
(
indexName
));
if
(
pItem
==
NULL
)
{
// mark all window as expired and notify query module to query raw TS data.
return
TSDB_CODE_SUCCESS
;
}
int32_t
nQueryWin
=
0
;
for
(
int32_t
n
=
0
;
n
<
nQueryWin
;
++
n
)
{
TSKEY
thisWindow
=
n
;
if
(
taosHashGet
(
pItem
->
expiredWindows
,
&
thisWindow
,
sizeof
(
thisWindow
))
!=
NULL
)
{
// TODO: mark this window as expired.
}
}
STSmaReadH
tReadH
=
{
0
};
tsdbInitTSmaReadH
(
&
tReadH
,
pTsdb
,
param
,
pData
);
...
...
source/libs/index/src/index_tfile.c
浏览文件 @
ce71ba91
...
...
@@ -722,13 +722,13 @@ static SArray* tfileGetFileList(const char* path) {
uint32_t
version
;
SArray
*
files
=
taosArrayInit
(
4
,
sizeof
(
void
*
));
DIR
*
dir
=
opend
ir
(
path
);
if
(
NULL
==
d
ir
)
{
TdDirPtr
pDir
=
taosOpenD
ir
(
path
);
if
(
NULL
==
pD
ir
)
{
return
NULL
;
}
struct
dirent
*
e
ntry
;
while
((
entry
=
readdir
(
d
ir
))
!=
NULL
)
{
char
*
file
=
entry
->
d_name
;
TdDirEntryPtr
pDirE
ntry
;
while
((
pDirEntry
=
taosReadDir
(
pD
ir
))
!=
NULL
)
{
char
*
file
=
taosGetDirEntryName
(
pDirEntry
)
;
if
(
0
!=
tfileParseFileName
(
file
,
&
suid
,
buf
,
&
version
))
{
continue
;
}
...
...
@@ -738,7 +738,7 @@ static SArray* tfileGetFileList(const char* path) {
sprintf
(
buf
,
"%s/%s"
,
path
,
file
);
taosArrayPush
(
files
,
&
buf
);
}
closedir
(
d
ir
);
taosCloseDir
(
pD
ir
);
taosArraySort
(
files
,
tfileCompare
);
tfileRmExpireFile
(
files
);
...
...
source/libs/tfs/inc/tfsInt.h
浏览文件 @
ce71ba91
...
...
@@ -59,7 +59,7 @@ typedef struct STfsDir {
SDiskID
did
;
char
dirname
[
TSDB_FILENAME_LEN
];
STfsFile
tfile
;
DIR
*
d
ir
;
TdDirPtr
pD
ir
;
STfs
*
pTfs
;
}
STfsDir
;
...
...
source/libs/tfs/src/tfs.c
浏览文件 @
ce71ba91
...
...
@@ -192,14 +192,14 @@ void tfsBasename(const STfsFile *pFile, char *dest) {
char
tname
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
tstrncpy
(
tname
,
pFile
->
aname
,
TSDB_FILENAME_LEN
);
tstrncpy
(
dest
,
basen
ame
(
tname
),
TSDB_FILENAME_LEN
);
tstrncpy
(
dest
,
taosDirEntryBaseN
ame
(
tname
),
TSDB_FILENAME_LEN
);
}
void
tfsDirname
(
const
STfsFile
*
pFile
,
char
*
dest
)
{
char
tname
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
tstrncpy
(
tname
,
pFile
->
aname
,
TSDB_FILENAME_LEN
);
tstrncpy
(
dest
,
dirn
ame
(
tname
),
TSDB_FILENAME_LEN
);
tstrncpy
(
dest
,
taosDirN
ame
(
tname
),
TSDB_FILENAME_LEN
);
}
int32_t
tfsRemoveFile
(
const
STfsFile
*
pFile
)
{
return
taosRemoveFile
(
pFile
->
aname
);
}
...
...
@@ -233,7 +233,7 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId) {
// the pointer directly in this recursion.
// See
// https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man3/dirname.3.html
char
*
dir
=
strdup
(
dirn
ame
(
s
));
char
*
dir
=
strdup
(
taosDirN
ame
(
s
));
if
(
tfsMkdirRecurAt
(
pTfs
,
dir
,
diskId
)
<
0
)
{
free
(
s
);
...
...
@@ -324,45 +324,46 @@ STfsDir *tfsOpendir(STfs *pTfs, const char *rname) {
return
pDir
;
}
const
STfsFile
*
tfsReaddir
(
STfsDir
*
pDir
)
{
if
(
p
Dir
==
NULL
||
pDir
->
d
ir
==
NULL
)
return
NULL
;
const
STfsFile
*
tfsReaddir
(
STfsDir
*
p
Tfs
Dir
)
{
if
(
p
TfsDir
==
NULL
||
pTfsDir
->
pD
ir
==
NULL
)
return
NULL
;
char
bname
[
TMPNAME_LEN
*
2
]
=
"
\0
"
;
while
(
true
)
{
struct
dirent
*
dp
=
NULL
;
dp
=
readdir
(
pDir
->
d
ir
);
if
(
dp
!=
NULL
)
{
TdDirEntryPtr
pDirEntry
=
NULL
;
pDirEntry
=
taosReadDir
(
pTfsDir
->
pD
ir
);
if
(
pDirEntry
!=
NULL
)
{
// Skip . and ..
if
(
strcmp
(
dp
->
d_name
,
"."
)
==
0
||
strcmp
(
dp
->
d_name
,
".."
)
==
0
)
continue
;
char
*
name
=
taosGetDirEntryName
(
pDirEntry
);
if
(
strcmp
(
name
,
"."
)
==
0
||
strcmp
(
name
,
".."
)
==
0
)
continue
;
if
(
p
Dir
->
dirname
==
NULL
||
p
Dir
->
dirname
[
0
]
==
0
)
{
snprintf
(
bname
,
TMPNAME_LEN
*
2
,
"%s"
,
dp
->
d_
name
);
if
(
p
TfsDir
->
dirname
==
NULL
||
pTfs
Dir
->
dirname
[
0
]
==
0
)
{
snprintf
(
bname
,
TMPNAME_LEN
*
2
,
"%s"
,
name
);
}
else
{
snprintf
(
bname
,
TMPNAME_LEN
*
2
,
"%s%s%s"
,
p
Dir
->
dirname
,
TD_DIRSEP
,
dp
->
d_
name
);
snprintf
(
bname
,
TMPNAME_LEN
*
2
,
"%s%s%s"
,
p
TfsDir
->
dirname
,
TD_DIRSEP
,
name
);
}
tfsInitFile
(
p
Dir
->
pTfs
,
&
pDir
->
tfile
,
p
Dir
->
did
,
bname
);
return
&
pDir
->
tfile
;
tfsInitFile
(
p
TfsDir
->
pTfs
,
&
pTfsDir
->
tfile
,
pTfs
Dir
->
did
,
bname
);
return
&
p
Tfs
Dir
->
tfile
;
}
if
(
tfsOpendirImpl
(
p
Dir
->
pTfs
,
p
Dir
)
<
0
)
{
if
(
tfsOpendirImpl
(
p
TfsDir
->
pTfs
,
pTfs
Dir
)
<
0
)
{
return
NULL
;
}
if
(
p
Dir
->
d
ir
==
NULL
)
{
if
(
p
TfsDir
->
pD
ir
==
NULL
)
{
terrno
=
TSDB_CODE_SUCCESS
;
return
NULL
;
}
}
}
void
tfsClosedir
(
STfsDir
*
pDir
)
{
if
(
pDir
)
{
if
(
p
Dir
->
d
ir
!=
NULL
)
{
closedir
(
pDir
->
d
ir
);
p
Dir
->
d
ir
=
NULL
;
void
tfsClosedir
(
STfsDir
*
p
Tfs
Dir
)
{
if
(
p
Tfs
Dir
)
{
if
(
p
TfsDir
->
pD
ir
!=
NULL
)
{
taosCloseDir
(
pTfsDir
->
pD
ir
);
p
TfsDir
->
pD
ir
=
NULL
;
}
free
(
pDir
);
free
(
p
Tfs
Dir
);
}
}
...
...
@@ -487,29 +488,29 @@ static STfsDisk *tfsGetDiskByName(STfs *pTfs, const char *dir) {
return
pDisk
;
}
static
int32_t
tfsOpendirImpl
(
STfs
*
pTfs
,
STfsDir
*
pDir
)
{
static
int32_t
tfsOpendirImpl
(
STfs
*
pTfs
,
STfsDir
*
p
Tfs
Dir
)
{
STfsDisk
*
pDisk
=
NULL
;
char
adir
[
TMPNAME_LEN
*
2
]
=
"
\0
"
;
if
(
p
Dir
->
d
ir
!=
NULL
)
{
closedir
(
pDir
->
d
ir
);
p
Dir
->
d
ir
=
NULL
;
if
(
p
TfsDir
->
pD
ir
!=
NULL
)
{
taosCloseDir
(
pTfsDir
->
pD
ir
);
p
TfsDir
->
pD
ir
=
NULL
;
}
while
(
true
)
{
pDisk
=
tfsNextDisk
(
pTfs
,
&
pDir
->
iter
);
pDisk
=
tfsNextDisk
(
pTfs
,
&
p
Tfs
Dir
->
iter
);
if
(
pDisk
==
NULL
)
return
0
;
pDir
->
did
.
level
=
pDisk
->
level
;
pDir
->
did
.
id
=
pDisk
->
id
;
p
Tfs
Dir
->
did
.
level
=
pDisk
->
level
;
p
Tfs
Dir
->
did
.
id
=
pDisk
->
id
;
if
(
pDisk
->
path
==
NULL
||
pDisk
->
path
[
0
]
==
0
)
{
snprintf
(
adir
,
TMPNAME_LEN
*
2
,
"%s"
,
pDir
->
dirname
);
snprintf
(
adir
,
TMPNAME_LEN
*
2
,
"%s"
,
p
Tfs
Dir
->
dirname
);
}
else
{
snprintf
(
adir
,
TMPNAME_LEN
*
2
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
pDir
->
dirname
);
snprintf
(
adir
,
TMPNAME_LEN
*
2
,
"%s%s%s"
,
pDisk
->
path
,
TD_DIRSEP
,
p
Tfs
Dir
->
dirname
);
}
p
Dir
->
dir
=
opend
ir
(
adir
);
if
(
p
Dir
->
d
ir
!=
NULL
)
break
;
p
TfsDir
->
pDir
=
taosOpenD
ir
(
adir
);
if
(
p
TfsDir
->
pD
ir
!=
NULL
)
break
;
}
return
0
;
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
ce71ba91
...
...
@@ -130,16 +130,16 @@ int walCheckAndRepairMeta(SWal* pWal) {
regcomp
(
&
logRegPattern
,
logPattern
,
REG_EXTENDED
);
regcomp
(
&
idxRegPattern
,
idxPattern
,
REG_EXTENDED
);
DIR
*
dir
=
opend
ir
(
pWal
->
path
);
if
(
d
ir
==
NULL
)
{
TdDirPtr
pDir
=
taosOpenD
ir
(
pWal
->
path
);
if
(
pD
ir
==
NULL
)
{
wError
(
"vgId:%d, path:%s, failed to open since %s"
,
pWal
->
cfg
.
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
-
1
;
}
// scan log files and build new meta
struct
dirent
*
ent
;
while
((
ent
=
readdir
(
d
ir
))
!=
NULL
)
{
char
*
name
=
basename
(
ent
->
d_name
);
TdDirEntryPtr
pDirEntry
;
while
((
pDirEntry
=
taosReadDir
(
pD
ir
))
!=
NULL
)
{
char
*
name
=
taosDirEntryBaseName
(
taosGetDirEntryName
(
pDirEntry
)
);
int
code
=
regexec
(
&
logRegPattern
,
name
,
0
,
NULL
,
0
);
if
(
code
==
0
)
{
SWalFileInfo
fileInfo
;
...
...
@@ -149,7 +149,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
}
closedir
(
d
ir
);
taosCloseDir
(
pD
ir
);
regfree
(
&
logRegPattern
);
regfree
(
&
idxRegPattern
);
...
...
@@ -337,25 +337,25 @@ static int walFindCurMetaVer(SWal* pWal) {
regex_t
walMetaRegexPattern
;
regcomp
(
&
walMetaRegexPattern
,
pattern
,
REG_EXTENDED
);
DIR
*
dir
=
opend
ir
(
pWal
->
path
);
if
(
d
ir
==
NULL
)
{
TdDirPtr
pDir
=
taosOpenD
ir
(
pWal
->
path
);
if
(
pD
ir
==
NULL
)
{
wError
(
"vgId:%d, path:%s, failed to open since %s"
,
pWal
->
cfg
.
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
-
1
;
}
struct
dirent
*
ent
;
TdDirEntryPtr
pDirEntry
;
// find existing meta-ver[x].json
int
metaVer
=
-
1
;
while
((
ent
=
readdir
(
d
ir
))
!=
NULL
)
{
char
*
name
=
basename
(
ent
->
d_name
);
while
((
pDirEntry
=
taosReadDir
(
pD
ir
))
!=
NULL
)
{
char
*
name
=
taosDirEntryBaseName
(
taosGetDirEntryName
(
pDirEntry
)
);
int
code
=
regexec
(
&
walMetaRegexPattern
,
name
,
0
,
NULL
,
0
);
if
(
code
==
0
)
{
sscanf
(
name
,
"meta-ver%d"
,
&
metaVer
);
break
;
}
}
closedir
(
d
ir
);
taosCloseDir
(
pD
ir
);
regfree
(
&
walMetaRegexPattern
);
return
metaVer
;
}
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
ce71ba91
...
...
@@ -34,9 +34,6 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
int32_t
walRollback
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
;
char
fnameStr
[
WAL_FILE_LEN
];
if
(
ver
==
pWal
->
vers
.
lastVer
)
{
return
0
;
}
if
(
ver
>
pWal
->
vers
.
lastVer
||
ver
<
pWal
->
vers
.
commitVer
)
{
terrno
=
TSDB_CODE_WAL_INVALID_VER
;
return
-
1
;
...
...
source/libs/wal/test/walMetaTest.cpp
浏览文件 @
ce71ba91
...
...
@@ -124,13 +124,8 @@ class WalRetentionEnv : public ::testing::Test {
void
SetUp
()
override
{
SWalCfg
cfg
;
cfg
.
rollPeriod
=
-
1
,
cfg
.
segSize
=
-
1
,
cfg
.
retentionPeriod
=
-
1
,
cfg
.
retentionSize
=
0
,
cfg
.
rollPeriod
=
0
,
cfg
.
vgId
=
0
,
cfg
.
level
=
TAOS_WAL_FSYNC
;
cfg
.
rollPeriod
=
-
1
,
cfg
.
segSize
=
-
1
,
cfg
.
retentionPeriod
=
-
1
,
cfg
.
retentionSize
=
0
,
cfg
.
rollPeriod
=
0
,
cfg
.
vgId
=
0
,
cfg
.
level
=
TAOS_WAL_FSYNC
;
pWal
=
walOpen
(
pathName
,
&
cfg
);
ASSERT
(
pWal
!=
NULL
);
}
...
...
@@ -241,6 +236,12 @@ TEST_F(WalCleanEnv, rollback) {
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
i
);
}
code
=
walRollback
(
pWal
,
12
);
ASSERT_NE
(
code
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
9
);
code
=
walRollback
(
pWal
,
9
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
8
);
code
=
walRollback
(
pWal
,
5
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
4
);
...
...
@@ -324,7 +325,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
TEST_F
(
WalRetentionEnv
,
repairMeta1
)
{
walResetEnv
();
int
code
;
int
i
;
for
(
i
=
0
;
i
<
100
;
i
++
)
{
char
newStr
[
100
];
...
...
@@ -336,14 +337,14 @@ TEST_F(WalRetentionEnv, repairMeta1) {
TearDown
();
//getchar();
//
getchar();
char
buf
[
100
];
sprintf
(
buf
,
"%s/meta-ver%d"
,
pathName
,
0
);
taosRemoveFile
(
buf
);
sprintf
(
buf
,
"%s/meta-ver%d"
,
pathName
,
1
);
taosRemoveFile
(
buf
);
SetUp
();
//getchar();
//
getchar();
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
99
);
...
...
@@ -401,5 +402,4 @@ TEST_F(WalRetentionEnv, repairMeta1) {
EXPECT_EQ
(
newStr
[
j
],
pRead
->
pHead
->
head
.
body
[
j
]);
}
}
}
source/os/src/osDir.c
浏览文件 @
ce71ba91
...
...
@@ -14,6 +14,7 @@
*/
#define _DEFAULT_SOURCE
#define ALLOW_FORBID_FUNC
#include "os.h"
#include "osString.h"
...
...
@@ -36,6 +37,10 @@
#include <unistd.h>
#include <wordexp.h>
typedef
struct
dirent
dirent
;
typedef
struct
DIR
TdDir
;
typedef
struct
dirent
TdDirent
;
void
taosRemoveDir
(
const
char
*
dirname
)
{
DIR
*
dir
=
opendir
(
dirname
);
if
(
dir
==
NULL
)
return
;
...
...
@@ -149,4 +154,47 @@ bool taosIsDir(const char *dirname) {
return
false
;
}
char
*
taosDirName
(
char
*
name
)
{
return
dirname
(
name
);
}
char
*
taosDirEntryBaseName
(
char
*
name
)
{
return
basename
(
name
);
}
TdDirPtr
taosOpenDir
(
const
char
*
dirname
)
{
if
(
dirname
==
NULL
)
{
return
NULL
;
}
return
(
TdDirPtr
)
opendir
(
dirname
);
}
TdDirEntryPtr
taosReadDir
(
TdDirPtr
pDir
)
{
if
(
pDir
==
NULL
)
{
return
NULL
;
}
return
(
TdDirEntryPtr
)
readdir
((
DIR
*
)
pDir
);
}
bool
taosDirEntryIsDir
(
TdDirEntryPtr
pDirEntry
)
{
if
(
pDirEntry
==
NULL
)
{
return
false
;
}
return
(((
dirent
*
)
pDirEntry
)
->
d_type
&
DT_DIR
)
!=
0
;
}
char
*
taosGetDirEntryName
(
TdDirEntryPtr
pDirEntry
)
{
if
(
pDirEntry
==
NULL
)
{
return
NULL
;
}
return
((
dirent
*
)
pDirEntry
)
->
d_name
;
}
int32_t
taosCloseDir
(
TdDirPtr
pDir
)
{
if
(
pDir
==
NULL
)
{
return
-
1
;
}
return
closedir
((
DIR
*
)
pDir
);
}
#endif
source/os/src/osSemaphore.c
浏览文件 @
ce71ba91
...
...
@@ -99,7 +99,7 @@ static void *sem_thread_routine(void *arg) {
sem_port
=
mach_task_self
();
kern_return_t
ret
=
semaphore_create
(
sem_port
,
&
sem_exit
,
SYNC_POLICY_FIFO
,
0
);
if
(
ret
!=
KERN_SUCCESS
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create sem_exit
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
);
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create sem_exit
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
);
sem_inited
=
-
1
;
return
NULL
;
}
...
...
@@ -112,7 +112,7 @@ static void once_init(void) {
int
r
=
0
;
r
=
pthread_create
(
&
sem_thread
,
NULL
,
sem_thread_routine
,
NULL
);
if
(
r
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create thread
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
);
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create thread
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
);
return
;
}
while
(
sem_inited
==
0
)
{
...
...
@@ -139,14 +139,14 @@ struct tsem_s {
};
int
tsem_init
(
tsem_t
*
sem
,
int
pshared
,
unsigned
int
value
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==creating\n",
basen
ame(__FILE__), __LINE__, __func__, sem);
// fprintf(stderr, "==%s[%d]%s():[%p]==creating\n",
taosDirEntryBaseN
ame(__FILE__), __LINE__, __func__, sem);
if
(
*
sem
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already initialized
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already initialized
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
struct
tsem_s
*
p
=
(
struct
tsem_s
*
)
calloc
(
1
,
sizeof
(
*
p
));
if
(
!
p
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==out of memory
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==out of memory
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
...
...
@@ -162,7 +162,7 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
p
->
val
=
value
;
}
while
(
0
);
if
(
r
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#elif defined(SEM_USE_POSIX)
...
...
@@ -181,27 +181,27 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
int
e
=
errno
;
if
(
e
==
EEXIST
)
continue
;
if
(
e
==
EINTR
)
continue
;
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created[%d]%s
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created[%d]%s
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
strerror
(
e
));
abort
();
}
while
(
p
->
sem
==
SEM_FAILED
);
#elif defined(SEM_USE_SEM)
pthread_once
(
&
sem_once
,
once_init
);
if
(
sem_inited
!=
1
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal resource init failed
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal resource init failed
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
errno
=
ENOMEM
;
return
-
1
;
}
kern_return_t
ret
=
semaphore_create
(
sem_port
,
&
p
->
sem
,
SYNC_POLICY_FIFO
,
value
);
if
(
ret
!=
KERN_SUCCESS
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==semophore_create failed
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==semophore_create failed
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
// we fail-fast here, because we have less-doc about semaphore_create for the moment
abort
();
}
#else // SEM_USE_PTHREAD
p
->
sem
=
dispatch_semaphore_create
(
value
);
if
(
p
->
sem
==
NULL
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#endif // SEM_USE_PTHREAD
...
...
@@ -215,28 +215,28 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
int
tsem_wait
(
tsem_t
*
sem
)
{
if
(
!*
sem
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not initialized
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not initialized
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
struct
tsem_s
*
p
=
*
sem
;
if
(
!
p
->
valid
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already destroyed
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already destroyed
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#ifdef SEM_USE_PTHREAD
if
(
pthread_mutex_lock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
val
-=
1
;
if
(
p
->
val
<
0
)
{
if
(
pthread_cond_wait
(
&
p
->
cond
,
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
}
if
(
pthread_mutex_unlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
return
0
;
...
...
@@ -251,28 +251,28 @@ int tsem_wait(tsem_t *sem) {
int
tsem_post
(
tsem_t
*
sem
)
{
if
(
!*
sem
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not initialized
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not initialized
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
struct
tsem_s
*
p
=
*
sem
;
if
(
!
p
->
valid
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already destroyed
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already destroyed
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#ifdef SEM_USE_PTHREAD
if
(
pthread_mutex_lock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
val
+=
1
;
if
(
p
->
val
<=
0
)
{
if
(
pthread_cond_signal
(
&
p
->
cond
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
}
if
(
pthread_mutex_unlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
return
0
;
...
...
@@ -286,34 +286,34 @@ int tsem_post(tsem_t *sem) {
}
int
tsem_destroy
(
tsem_t
*
sem
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n",
basen
ame(__FILE__), __LINE__, __func__, sem);
// fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n",
taosDirEntryBaseN
ame(__FILE__), __LINE__, __func__, sem);
if
(
!*
sem
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n",
basen
ame(__FILE__), __LINE__, __func__, sem);
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n",
taosDirEntryBaseN
ame(__FILE__), __LINE__, __func__, sem);
// abort();
return
0
;
}
struct
tsem_s
*
p
=
*
sem
;
if
(
!
p
->
valid
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n",
basen
ame(__FILE__), __LINE__, __func__, sem);
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n",
taosDirEntryBaseN
ame(__FILE__), __LINE__, __func__, sem);
// abort();
return
0
;
}
#ifdef SEM_USE_PTHREAD
if
(
pthread_mutex_lock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
valid
=
0
;
if
(
pthread_cond_destroy
(
&
p
->
cond
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
if
(
pthread_mutex_unlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
if
(
pthread_mutex_destroy
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#elif defined(SEM_USE_POSIX)
...
...
@@ -322,7 +322,7 @@ int tsem_destroy(tsem_t *sem) {
int
r
=
sem_unlink
(
name
);
if
(
r
)
{
int
e
=
errno
;
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==unlink failed[%d]%s
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==unlink failed[%d]%s
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
strerror
(
e
));
abort
();
}
...
...
source/os/src/osSocket.c
浏览文件 @
ce71ba91
...
...
@@ -809,7 +809,8 @@ int32_t taosGetFqdn(char *fqdn) {
char
hostname
[
1024
];
hostname
[
1023
]
=
'\0'
;
if
(
gethostname
(
hostname
,
1023
)
==
-
1
)
{
// printf("failed to get hostname, reason:%s", strerror(errno));
printf
(
"failed to get hostname, reason:%s"
,
strerror
(
errno
));
assert
(
0
);
return
-
1
;
}
...
...
@@ -826,7 +827,8 @@ int32_t taosGetFqdn(char *fqdn) {
#endif // __APPLE__
int32_t
ret
=
getaddrinfo
(
hostname
,
NULL
,
&
hints
,
&
result
);
if
(
!
result
)
{
// printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
printf
(
"failed to get fqdn, code:%d, reason:%s"
,
ret
,
gai_strerror
(
ret
));
assert
(
0
);
return
-
1
;
}
...
...
source/os/src/osTimer.c
浏览文件 @
ce71ba91
...
...
@@ -79,7 +79,7 @@ static void* timer_routine(void* arg) {
struct
kevent64_s
kev
[
10
]
=
{
0
};
r
=
kevent64
(
timer_kq
,
NULL
,
0
,
kev
,
sizeof
(
kev
)
/
sizeof
(
kev
[
0
]),
0
,
&
to
);
if
(
r
!=
0
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==kevent64 failed
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
);
fprintf
(
stderr
,
"==%s[%d]%s()==kevent64 failed
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
);
abort
();
}
timer_callback
(
SIGALRM
);
// just mock
...
...
@@ -97,14 +97,14 @@ int taosInitTimer(void (*callback)(int), int ms) {
timer_kq
=
kqueue
();
if
(
timer_kq
==
-
1
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create timer kq
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
);
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create timer kq
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
);
// since no caller of this func checks the return value for the moment
abort
();
}
r
=
pthread_create
(
&
timer_thread
,
NULL
,
timer_routine
,
NULL
);
if
(
r
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create timer thread
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
);
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create timer thread
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
);
// since no caller of this func checks the return value for the moment
abort
();
}
...
...
@@ -116,7 +116,7 @@ void taosUninitTimer() {
timer_stop
=
1
;
r
=
pthread_join
(
timer_thread
,
NULL
);
if
(
r
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to join timer thread
\n
"
,
basen
ame
(
__FILE__
),
__LINE__
,
__func__
);
fprintf
(
stderr
,
"==%s[%d]%s()==failed to join timer thread
\n
"
,
taosDirEntryBaseN
ame
(
__FILE__
),
__LINE__
,
__func__
);
// since no caller of this func checks the return value for the moment
abort
();
}
...
...
tests/test/c/tmqDemo.c
浏览文件 @
ce71ba91
...
...
@@ -192,11 +192,11 @@ static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
// calc dir size (not include itself 4096Byte)
int64_t
getDirectorySize
(
char
*
dir
)
{
DIR
*
dp
;
struct
dirent
*
e
ntry
;
TdDirPtr
pDir
;
TdDirEntryPtr
pDirE
ntry
;
int64_t
totalSize
=
0
;
if
((
dp
=
opend
ir
(
dir
))
==
NULL
)
{
if
((
pDir
=
taosOpenD
ir
(
dir
))
==
NULL
)
{
fprintf
(
stderr
,
"Cannot open dir: %s
\n
"
,
dir
);
return
-
1
;
}
...
...
@@ -204,26 +204,27 @@ int64_t getDirectorySize(char *dir)
//lstat(dir, &statbuf);
//totalSize+=statbuf.st_size;
while
((
entry
=
readdir
(
dp
))
!=
NULL
)
{
while
((
pDirEntry
=
taosReadDir
(
pDir
))
!=
NULL
)
{
char
subdir
[
1024
];
sprintf
(
subdir
,
"%s/%s"
,
dir
,
entry
->
d_name
);
char
*
fileName
=
taosGetDirEntryName
(
pDirEntry
);
sprintf
(
subdir
,
"%s/%s"
,
dir
,
fileName
);
//printf("===d_name: %s\n", entry->d_name);
if
(
taosIsDir
(
subdir
))
{
if
(
strcmp
(
"."
,
entry
->
d_name
)
==
0
||
strcmp
(
".."
,
entry
->
d_n
ame
)
==
0
)
{
if
(
strcmp
(
"."
,
fileName
)
==
0
||
strcmp
(
".."
,
fileN
ame
)
==
0
)
{
continue
;
}
int64_t
subDirSize
=
getDirectorySize
(
subdir
);
totalSize
+=
subDirSize
;
}
else
if
(
0
==
strcmp
(
strchr
(
entry
->
d_n
ame
,
'.'
),
".log"
))
{
// only calc .log file size, and not include .idx file
}
else
if
(
0
==
strcmp
(
strchr
(
fileN
ame
,
'.'
),
".log"
))
{
// only calc .log file size, and not include .idx file
int64_t
file_size
=
0
;
taosStatFile
(
subdir
,
&
file_size
,
NULL
);
totalSize
+=
file_size
;
}
}
closedir
(
dp
);
taosCloseDir
(
pDir
);
return
totalSize
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录