Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ce8e1177
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ce8e1177
编写于
11月 04, 2020
作者:
P
Ping Xiao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/update' into feature/update_test
上级
7f88bc2c
f605be10
变更
38
隐藏空白更改
内联
并排
Showing
38 changed file
with
276 addition
and
252 deletion
+276
-252
documentation20/webdocs/markdowndocs/administrator-ch.md
documentation20/webdocs/markdowndocs/administrator-ch.md
+1
-1
documentation20/webdocs/markdowndocs/faq-ch.md
documentation20/webdocs/markdowndocs/faq-ch.md
+2
-2
packaging/tools/post.sh
packaging/tools/post.sh
+5
-2
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+2
-2
src/common/src/tdataformat.c
src/common/src/tdataformat.c
+3
-1
src/dnode/src/dnodeTelemetry.c
src/dnode/src/dnodeTelemetry.c
+1
-1
src/inc/twal.h
src/inc/twal.h
+1
-0
src/kit/shell/src/shellEngine.c
src/kit/shell/src/shellEngine.c
+1
-1
src/os/inc/os.h
src/os/inc/os.h
+2
-1
src/os/inc/osAlloc.h
src/os/inc/osAlloc.h
+3
-5
src/os/inc/osDarwin.h
src/os/inc/osDarwin.h
+0
-2
src/os/inc/osFile.h
src/os/inc/osFile.h
+24
-22
src/os/inc/osSocket.h
src/os/inc/osSocket.h
+9
-11
src/os/inc/osWindows.h
src/os/inc/osWindows.h
+1
-5
src/os/src/darwin/darwinFile.c
src/os/src/darwin/darwinFile.c
+8
-11
src/os/src/detail/osAlloc.c
src/os/src/detail/osAlloc.c
+2
-2
src/os/src/detail/osFail.c
src/os/src/detail/osFail.c
+13
-13
src/os/src/detail/osFile.c
src/os/src/detail/osFile.c
+34
-18
src/os/src/detail/osSocket.c
src/os/src/detail/osSocket.c
+4
-4
src/os/src/windows/wFile.c
src/os/src/windows/wFile.c
+8
-8
src/os/src/windows/wSocket.c
src/os/src/windows/wSocket.c
+3
-3
src/query/src/qTsbuf.c
src/query/src/qTsbuf.c
+1
-1
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+2
-2
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+2
-2
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+2
-2
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+1
-0
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+9
-12
src/util/inc/tsocket.h
src/util/inc/tsocket.h
+12
-12
src/util/src/tkvstore.c
src/util/src/tkvstore.c
+9
-9
src/util/src/tlog.c
src/util/src/tlog.c
+10
-12
src/util/src/tnote.c
src/util/src/tnote.c
+1
-1
src/util/src/tsocket.c
src/util/src/tsocket.c
+55
-55
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+4
-0
src/wal/inc/walInt.h
src/wal/inc/walInt.h
+2
-0
src/wal/src/walMgmt.c
src/wal/src/walMgmt.c
+11
-4
src/wal/src/walWrite.c
src/wal/src/walWrite.c
+12
-8
tests/script/general/insert/basic.sim
tests/script/general/insert/basic.sim
+15
-16
tests/test-all.sh
tests/test-all.sh
+1
-1
未找到文件。
documentation20/webdocs/markdowndocs/administrator-ch.md
浏览文件 @
ce8e1177
...
...
@@ -35,7 +35,7 @@ TDengine相对于通用数据库,有超高的压缩比,在绝大多数场景
Raw DataSize = numOfTables * rowSizePerTable * rowsPerTable
```
示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000
\*
128
\*
24
\*
60/15
*
365 = 44
851T。TDengine大概需要消耗44851/5=8970T, 8.9P
空间。
示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000
\*
128
\*
24
\*
60/15
*
365 = 44
.8512T。TDengine大概需要消耗44.851/5=8.97024T
空间。
用户可以通过参数keep,设置数据在磁盘中的最大保存时长。为进一步减少存储成本,TDengine还提供多级存储,最冷的数据可以存放在最廉价的存储介质上,应用的访问不用做任何调整,只是读取速度降低了。
...
...
documentation20/webdocs/markdowndocs/faq-ch.md
浏览文件 @
ce8e1177
...
...
@@ -38,9 +38,9 @@
6.
检查防火墙设置,确认TCP/UDP 端口6030-6042 是打开的
7.
对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保
*libtaos.so*
在目录
*/usr/local/
lib/taos*
里, 并且
*/usr/local/lib/taos
*
在系统库函数搜索路径
*LD_LIBRARY_PATH*
里
7.
对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保
*libtaos.so*
在目录
*/usr/local/
taos/driver*
里, 并且
*/usr/local/taos/driver
*
在系统库函数搜索路径
*LD_LIBRARY_PATH*
里
8.
对于windows上的JDBC, ODBC, Python, Go等连接,确保
*
driver/c/taos.dll*
在你的系统
搜索目录里 (建议
*taos.dll*
放在目录
*C:\Windows\System32*
)
8.
对于windows上的JDBC, ODBC, Python, Go等连接,确保
*
C:\TDengine\driver\taos.dll*
在你的系统库函数
搜索目录里 (建议
*taos.dll*
放在目录
*C:\Windows\System32*
)
9.
如果仍不能排除连接故障,请使用命令行工具nc来分别判断指定端口的TCP和UDP连接是否通畅
检查UDP端口连接是否工作:
`nc -vuz {hostIP} {port} `
...
...
packaging/tools/post.sh
浏览文件 @
ce8e1177
...
...
@@ -121,8 +121,11 @@ function install_config() {
echo
-e
-n
"
${
GREEN
}
Enter FQDN:port (like h1.taosdata.com:6030) of an existing TDengine cluster node to join
${
NC
}
"
echo
echo
-e
-n
"
${
GREEN
}
OR leave it blank to build one
${
NC
}
:"
read
firstEp
while
true
;
do
#read firstEp
if
exec
< /dev/tty
;
then
read
firstEp
;
fi
while
true
;
do
if
[
!
-z
"
$firstEp
"
]
;
then
# check the format of the firstEp
#if [[ $firstEp == $FQDN_PATTERN ]]; then
...
...
src/client/src/tscSQLParser.c
浏览文件 @
ce8e1177
...
...
@@ -4427,8 +4427,8 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
int32_t
parseOrderbyClause
(
SSqlCmd
*
pCmd
,
SQueryInfo
*
pQueryInfo
,
SQuerySQL
*
pQuerySql
,
SSchema
*
pSchema
)
{
const
char
*
msg0
=
"only support order by primary timestamp"
;
const
char
*
msg1
=
"invalid column name"
;
const
char
*
msg2
=
"only support order by primary timestamp
and
queried column"
;
const
char
*
msg3
=
"only support order by primary timestamp
and
first tag in groupby clause"
;
const
char
*
msg2
=
"only support order by primary timestamp
or
queried column"
;
const
char
*
msg3
=
"only support order by primary timestamp
or
first tag in groupby clause"
;
setDefaultOrderInfo
(
pQueryInfo
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
...
...
src/common/src/tdataformat.c
浏览文件 @
ce8e1177
...
...
@@ -434,12 +434,12 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
int
tdMergeDataCols
(
SDataCols
*
target
,
SDataCols
*
source
,
int
rowsToMerge
)
{
ASSERT
(
rowsToMerge
>
0
&&
rowsToMerge
<=
source
->
numOfRows
);
ASSERT
(
target
->
numOfRows
+
rowsToMerge
<=
target
->
maxPoints
);
ASSERT
(
target
->
numOfCols
==
source
->
numOfCols
);
SDataCols
*
pTarget
=
NULL
;
if
(
dataColsKeyLast
(
target
)
<
dataColsKeyFirst
(
source
))
{
// No overlap
ASSERT
(
target
->
numOfRows
+
rowsToMerge
<=
target
->
maxPoints
);
for
(
int
i
=
0
;
i
<
rowsToMerge
;
i
++
)
{
for
(
int
j
=
0
;
j
<
source
->
numOfCols
;
j
++
)
{
if
(
source
->
cols
[
j
].
len
>
0
)
{
...
...
@@ -509,6 +509,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
(
*
iter2
)
++
;
if
(
key1
==
key2
)
(
*
iter1
)
++
;
}
ASSERT
(
target
->
numOfRows
<=
target
->
maxPoints
);
}
}
...
...
src/dnode/src/dnodeTelemetry.c
浏览文件 @
ce8e1177
...
...
@@ -268,7 +268,7 @@ static void dnodeGetEmail(char* filepath) {
return
;
}
if
(
taos
T
Read
(
fd
,
(
void
*
)
tsEmail
,
TSDB_FQDN_LEN
)
<
0
)
{
if
(
taosRead
(
fd
,
(
void
*
)
tsEmail
,
TSDB_FQDN_LEN
)
<
0
)
{
dError
(
"failed to read %d bytes from file %s since %s"
,
TSDB_FQDN_LEN
,
filepath
,
strerror
(
errno
));
}
...
...
src/inc/twal.h
浏览文件 @
ce8e1177
...
...
@@ -51,6 +51,7 @@ void walCleanUp();
twalh
walOpen
(
char
*
path
,
SWalCfg
*
pCfg
);
int32_t
walAlter
(
twalh
pWal
,
SWalCfg
*
pCfg
);
void
walStop
(
twalh
);
void
walClose
(
twalh
);
int32_t
walRenew
(
twalh
);
int32_t
walWrite
(
twalh
,
SWalHead
*
);
...
...
src/kit/shell/src/shellEngine.c
浏览文件 @
ce8e1177
...
...
@@ -244,7 +244,7 @@ int32_t shellRunCommand(TAOS* con, char* command) {
}
*
p
++
=
c
;
if
(
c
==
';'
)
{
if
(
c
==
';'
&&
quote
==
0
)
{
c
=
*
p
;
*
p
=
0
;
if
(
shellRunSingleCommand
(
con
,
cmd
)
<
0
)
{
...
...
src/os/inc/os.h
浏览文件 @
ce8e1177
...
...
@@ -52,9 +52,10 @@ extern "C" {
#include "osWindows.h"
#endif
#include "osDef.h"
#include "osAlloc.h"
#include "osAtomic.h"
#include "osCommon.h"
#include "osDef.h"
#include "osDir.h"
#include "osFile.h"
#include "osLz4.h"
...
...
src/
util/inc/ta
lloc.h
→
src/
os/inc/osA
lloc.h
浏览文件 @
ce8e1177
...
...
@@ -13,16 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_
UTIL
_ALLOC_H
#define TDENGINE_
UTIL
_ALLOC_H
#ifndef TDENGINE_
OS
_ALLOC_H
#define TDENGINE_
OS
_ALLOC_H
#ifdef __cplusplus
extern
"C"
{
#endif
#define TSDB_USE_SYS_MEM
#ifdef TSDB_USE_SYS_MEM
#ifndef TAOS_OS_FUNC_ALLOC
#define tmalloc(size) malloc(size)
#define tcalloc(size) calloc(1, size)
#define trealloc(p, size) realloc(p, size)
...
...
src/os/inc/osDarwin.h
浏览文件 @
ce8e1177
...
...
@@ -72,8 +72,6 @@ extern "C" {
#include <sys/utsname.h>
#define TAOS_OS_FUNC_FILE_SENDIFLE
#define taosFSendFile(outfile, infile, offset, count) taosFSendFileImp(outfile, infile, offset, size)
#define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size)
#define TAOS_OS_FUNC_SEMPHONE
#define tsem_t dispatch_semaphore_t
...
...
src/os/inc/osFile.h
浏览文件 @
ce8e1177
...
...
@@ -20,20 +20,26 @@
extern
"C"
{
#endif
ssize_t
taosTReadImp
(
int
fd
,
void
*
buf
,
size_t
count
);
ssize_t
taosTWriteImp
(
int
fd
,
void
*
buf
,
size_t
count
);
#define tread(fd, buf, count) read(fd, buf, count)
#define twrite(fd, buf, count) write(fd, buf, count)
#define tlseek(fd, offset, whence) lseek(fd, offset, whence)
#define tclose(fd) \
{ \
if (FD_VALID(fd)) { \
close(fd); \
fd = FD_INITIALIZER; \
} \
}
ssize_t
taosTSendFileImp
(
int
dfd
,
int
sfd
,
off_t
*
offset
,
size_t
size
);
int
taosFSendFileImp
(
FILE
*
out_file
,
FILE
*
in_file
,
int64_t
*
offset
,
int32_t
count
);
int64_t
taosRead
(
int32_t
fd
,
void
*
buf
,
int64_t
count
);
int64_t
taosWrite
(
int32_t
fd
,
void
*
buf
,
int64_t
count
);
int64_t
taosLSeek
(
int32_t
fd
,
int64_t
offset
,
int32_t
whence
);
int32_t
taosRenameFile
(
char
*
fullPath
,
char
*
suffix
,
char
delimiter
,
char
**
dstPath
);
#define taosClose(x) tclose(x)
#ifndef TAOS_OS_FUNC_FILE_SENDIFLE
#define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size)
#define taosFSendFile(outfile, infile, offset, count) taosTSendFileImp(fileno(outfile), fileno(infile), offset, size)
#endif
#define taosTRead(fd, buf, count) taosTReadImp(fd, buf, count)
#define taosTWrite(fd, buf, count) taosTWriteImp(fd, buf, count)
#define taosLSeek(fd, offset, whence) lseek(fd, offset, whence)
// TAOS_OS_FUNC_FILE_SENDIFLE
int64_t
taosSendFile
(
int32_t
dfd
,
int32_t
sfd
,
int64_t
*
offset
,
int64_t
size
);
int64_t
taosFSendFile
(
FILE
*
outfile
,
FILE
*
infile
,
int64_t
*
offset
,
int64_t
size
);
#ifdef TAOS_RANDOM_FILE_FAIL
void
taosSetRandomFileFailFactor
(
int
factor
);
...
...
@@ -42,24 +48,20 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t
ssize_t
taosReadFileRandomFail
(
int
fd
,
void
*
buf
,
size_t
count
,
const
char
*
file
,
uint32_t
line
);
ssize_t
taosWriteFileRandomFail
(
int
fd
,
void
*
buf
,
size_t
count
,
const
char
*
file
,
uint32_t
line
);
off_t
taosLSeekRandomFail
(
int
fd
,
off_t
offset
,
int
whence
,
const
char
*
file
,
uint32_t
line
);
#undef taos
T
Read
#undef taos
T
Write
#undef taosRead
#undef taosWrite
#undef taosLSeek
#define taos
T
Read(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taos
T
Write(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosRead(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosWrite(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosLSeek(fd, offset, whence) taosLSeekRandomFail(fd, offset, whence, __FILE__, __LINE__)
#endif
#endif
int32_t
taosFileRename
(
char
*
fullPath
,
char
*
suffix
,
char
delimiter
,
char
**
dstPath
);
// TAOS_OS_FUNC_FILE_GETTMPFILEPATH
void
taosGetTmpfilePath
(
const
char
*
fileNamePrefix
,
char
*
dstPath
);
#ifndef TAOS_OS_FUNC_FILE_FTRUNCATE
#define taosFtruncate ftruncate
#endif
// TAOS_OS_FUNC_FILE_FTRUNCATE
int32_t
taosFtruncate
(
int32_t
fd
,
int64_t
length
);
#ifdef __cplusplus
}
#endif
...
...
src/os/inc/osSocket.h
浏览文件 @
ce8e1177
...
...
@@ -33,21 +33,19 @@ extern "C" {
x = FD_INITIALIZER; \
} \
}
typedef
int
SOCKET
;
typedef
int
32_t
SOCKET
;
#endif
#ifndef TAOS_OS_DEF_EPOLL
#define TAOS_EPOLL_WAIT_TIME -1
#endif
#define taosClose(x) taosCloseSocket(x)
#ifdef TAOS_RANDOM_NETWORK_FAIL
#ifdef TAOS_RANDOM_NETWORK_FAIL_TEST
ssize_t
taosSendRandomFail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
in
t
flags
);
ssize_t
taosSendToRandomFail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
in
t
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
);
ssize_t
taosReadSocketRandomFail
(
int
fd
,
void
*
buf
,
size_t
count
);
ssize_t
taosWriteSocketRandomFail
(
int
fd
,
const
void
*
buf
,
size_t
count
);
ssize_t
taosSendRandomFail
(
int
32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_
t
flags
);
ssize_t
taosSendToRandomFail
(
int
32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_
t
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
);
ssize_t
taosReadSocketRandomFail
(
int
32_t
fd
,
void
*
buf
,
size_t
count
);
ssize_t
taosWriteSocketRandomFail
(
int
32_t
fd
,
const
void
*
buf
,
size_t
count
);
#undef taosSend
#undef taosSendto
#undef taosReadSocket
...
...
@@ -60,14 +58,14 @@ extern "C" {
#endif
// TAOS_OS_FUNC_SOCKET
int
taosSetNonblocking
(
SOCKET
sock
,
in
t
on
);
void
taosBlockSIGPIPE
();
int
32_t
taosSetNonblocking
(
SOCKET
sock
,
int32_
t
on
);
void
taosBlockSIGPIPE
();
// TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int
taosSetSockOpt
(
SOCKET
socketfd
,
int
level
,
int
optname
,
void
*
optval
,
in
t
optlen
);
int
32_t
taosSetSockOpt
(
SOCKET
socketfd
,
int32_t
level
,
int32_t
optname
,
void
*
optval
,
int32_
t
optlen
);
// TAOS_OS_FUNC_SOCKET_INET
uint32_t
taosInetAddr
(
char
*
ipAddr
);
uint32_t
taosInetAddr
(
char
*
ipAddr
);
const
char
*
taosInetNtoa
(
struct
in_addr
ipInt
);
#ifdef __cplusplus
...
...
src/os/inc/osWindows.h
浏览文件 @
ce8e1177
...
...
@@ -62,11 +62,8 @@ extern "C" {
#define TAOS_OS_FUNC_FILE_ISDIR
#define TAOS_OS_FUNC_FILE_ISLNK
#define TAOS_OS_FUNC_FILE_SENDIFLE
#define taosFSendFile(outfile, infile, offset, count) taosFSendFileImp(outfile, infile, offset, size)
#define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size)
#define TAOS_OS_FUNC_FILE_GETTMPFILEPATH
#define TAOS_OS_FUNC_FILE_FTRUNCATE
extern
int
taosFtruncate
(
int
fd
,
int64_t
length
);
#define TAOS_OS_FUNC_FILE_FTRUNCATE
#define TAOS_OS_FUNC_MATH
#define SWAP(a, b, c) \
...
...
@@ -139,7 +136,6 @@ typedef int (*__compar_fn_t)(const void *, const void *);
#define in_addr_t unsigned long
#define socklen_t int
#define htobe64 htonll
#define twrite write
#define getpid _getpid
struct
tm
*
localtime_r
(
const
time_t
*
timep
,
struct
tm
*
result
);
...
...
src/os/src/darwin/darwinFile.c
浏览文件 @
ce8e1177
...
...
@@ -19,21 +19,19 @@
#define _SEND_FILE_STEP_ 1000
int
taosFSendFileImp
(
FILE
*
out_file
,
FILE
*
in_file
,
int64_t
*
offset
,
int32
_t
count
)
{
int
64_t
taosFSendFile
(
FILE
*
out_file
,
FILE
*
in_file
,
int64_t
*
offset
,
int64
_t
count
)
{
fseek
(
in_file
,
(
int32_t
)(
*
offset
),
0
);
int
writeLen
=
0
;
uint8_t
buffer
[
_SEND_FILE_STEP_
]
=
{
0
};
int
writeLen
=
0
;
uint8_t
buffer
[
_SEND_FILE_STEP_
]
=
{
0
};
for
(
int
len
=
0
;
len
<
(
count
-
_SEND_FILE_STEP_
);
len
+=
_SEND_FILE_STEP_
)
{
size_t
rlen
=
fread
(
buffer
,
1
,
_SEND_FILE_STEP_
,
in_file
);
if
(
rlen
<=
0
)
{
return
writeLen
;
}
else
if
(
rlen
<
_SEND_FILE_STEP_
)
{
}
else
if
(
rlen
<
_SEND_FILE_STEP_
)
{
fwrite
(
buffer
,
1
,
rlen
,
out_file
);
return
(
int
)(
writeLen
+
rlen
);
}
else
{
}
else
{
fwrite
(
buffer
,
1
,
_SEND_FILE_STEP_
,
in_file
);
writeLen
+=
_SEND_FILE_STEP_
;
}
...
...
@@ -44,8 +42,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
size_t
rlen
=
fread
(
buffer
,
1
,
remain
,
in_file
);
if
(
rlen
<=
0
)
{
return
writeLen
;
}
else
{
}
else
{
fwrite
(
buffer
,
1
,
remain
,
out_file
);
writeLen
+=
remain
;
}
...
...
@@ -54,7 +51,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
return
writeLen
;
}
ssize_t
taosTSendFileImp
(
int
dfd
,
int
sfd
,
off_t
*
offset
,
size
_t
size
)
{
uError
(
"not implemented yet"
);
int64_t
taosSendFile
(
int32_t
dfd
,
int32_t
sfd
,
int64_t
*
offset
,
int64
_t
size
)
{
uError
(
"
taosSendFile
not implemented yet"
);
return
-
1
;
}
\ No newline at end of file
src/
util/src/ta
lloc.c
→
src/
os/src/detail/osA
lloc.c
浏览文件 @
ce8e1177
...
...
@@ -17,10 +17,10 @@
#include "os.h"
#include "taoserror.h"
#include "tulog.h"
#include "
ta
lloc.h"
#include "
osA
lloc.h"
#define TSDB_HAVE_MEMALIGN
#if
ndef TSDB_USE_SYS_MEM
#if
def TAOS_OS_FUNC_ALLOC
void
*
tmalloc
(
int32_t
size
)
{
void
*
p
=
malloc
(
size
);
...
...
src/os/src/detail/osFail.c
浏览文件 @
ce8e1177
...
...
@@ -20,7 +20,7 @@
#ifdef TAOS_RANDOM_NETWORK_FAIL
ssize_t
taosSendRandomFail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
in
t
flags
)
{
ssize_t
taosSendRandomFail
(
int
32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_
t
flags
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
...
...
@@ -29,7 +29,7 @@ ssize_t taosSendRandomFail(int sockfd, const void *buf, size_t len, int flags) {
return
send
(
sockfd
,
buf
,
len
,
flags
);
}
ssize_t
taosSendToRandomFail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
in
t
flags
,
const
struct
sockaddr
*
dest_addr
,
ssize_t
taosSendToRandomFail
(
int
32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_
t
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
...
...
@@ -39,7 +39,7 @@ ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags,
return
sendto
(
sockfd
,
buf
,
len
,
flags
,
dest_addr
,
addrlen
);
}
ssize_t
taosReadSocketRandomFail
(
int
fd
,
void
*
buf
,
size_t
count
)
{
ssize_t
taosReadSocketRandomFail
(
int
32_t
fd
,
void
*
buf
,
size_t
count
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
...
...
@@ -48,7 +48,7 @@ ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count) {
return
read
(
fd
,
buf
,
count
);
}
ssize_t
taosWriteSocketRandomFail
(
int
fd
,
const
void
*
buf
,
size_t
count
)
{
ssize_t
taosWriteSocketRandomFail
(
int
32_t
fd
,
const
void
*
buf
,
size_t
count
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
EINTR
;
return
-
1
;
...
...
@@ -61,10 +61,10 @@ ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count) {
#ifdef TAOS_RANDOM_FILE_FAIL
static
int
random_file_fail_factor
=
20
;
static
int
32_t
random_file_fail_factor
=
20
;
static
FILE
*
fpRandomFileFailOutput
=
NULL
;
void
taosSetRandomFileFailFactor
(
int
factor
)
{
void
taosSetRandomFileFailFactor
(
int
32_t
factor
)
{
random_file_fail_factor
=
factor
;
}
...
...
@@ -77,7 +77,7 @@ static void close_random_file_fail_output() {
}
}
static
void
random_file_fail_output_sig
(
int
sig
)
{
static
void
random_file_fail_output_sig
(
int
32_t
sig
)
{
fprintf
(
fpRandomFileFailOutput
,
"signal %d received.
\n
"
,
sig
);
struct
sigaction
act
=
{
0
};
...
...
@@ -105,7 +105,7 @@ void taosSetRandomFileFailOutput(const char *path) {
sigaction
(
SIGILL
,
&
act
,
NULL
);
}
ssize_t
taosReadFileRandomFail
(
int
fd
,
void
*
buf
,
size_t
count
,
const
char
*
file
,
uint32_t
line
)
{
ssize_t
taosReadFileRandomFail
(
int
32_t
fd
,
void
*
buf
,
size_t
count
,
const
char
*
file
,
uint32_t
line
)
{
if
(
random_file_fail_factor
>
0
)
{
if
(
rand
()
%
random_file_fail_factor
==
0
)
{
errno
=
EIO
;
...
...
@@ -113,10 +113,10 @@ ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file
}
}
return
taos
TReadImp
(
fd
,
buf
,
count
);
return
taos
Read
(
fd
,
buf
,
count
);
}
ssize_t
taosWriteFileRandomFail
(
int
fd
,
void
*
buf
,
size_t
count
,
const
char
*
file
,
uint32_t
line
)
{
ssize_t
taosWriteFileRandomFail
(
int
32_t
fd
,
void
*
buf
,
size_t
count
,
const
char
*
file
,
uint32_t
line
)
{
if
(
random_file_fail_factor
>
0
)
{
if
(
rand
()
%
random_file_fail_factor
==
0
)
{
errno
=
EIO
;
...
...
@@ -124,10 +124,10 @@ ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *fil
}
}
return
taos
TWriteImp
(
fd
,
buf
,
count
);
return
taos
Write
(
fd
,
buf
,
count
);
}
off_t
taosLSeekRandomFail
(
int
fd
,
off_t
offset
,
in
t
whence
,
const
char
*
file
,
uint32_t
line
)
{
off_t
taosLSeekRandomFail
(
int
32_t
fd
,
off_t
offset
,
int32_
t
whence
,
const
char
*
file
,
uint32_t
line
)
{
if
(
random_file_fail_factor
>
0
)
{
if
(
rand
()
%
random_file_fail_factor
==
0
)
{
errno
=
EIO
;
...
...
@@ -135,7 +135,7 @@ off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, ui
}
}
return
ls
eek
(
fd
,
offset
,
whence
);
return
taosLS
eek
(
fd
,
offset
,
whence
);
}
#endif //TAOS_RANDOM_FILE_FAIL
src/os/src/detail/osFile.c
浏览文件 @
ce8e1177
...
...
@@ -18,6 +18,7 @@
#include "tglobal.h"
#ifndef TAOS_OS_FUNC_FILE_GETTMPFILEPATH
void
taosGetTmpfilePath
(
const
char
*
fileNamePrefix
,
char
*
dstPath
)
{
const
char
*
tdengineTmpFileNamePrefix
=
"tdengine-"
;
...
...
@@ -39,10 +40,10 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
taosRandStr
(
rand
,
tListLen
(
rand
)
-
1
);
snprintf
(
dstPath
,
PATH_MAX
,
tmpPath
,
getpid
(),
rand
);
}
#endif
// rename file name
int32_t
taosFileRename
(
char
*
fullPath
,
char
*
suffix
,
char
delimiter
,
char
**
dstPath
)
{
int32_t
taosRenameFile
(
char
*
fullPath
,
char
*
suffix
,
char
delimiter
,
char
**
dstPath
)
{
int32_t
ts
=
taosGetTimestampSec
();
char
fname
[
PATH_MAX
]
=
{
0
};
// max file name length must be less than 255
...
...
@@ -51,12 +52,13 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
if
(
delimiterPos
==
NULL
)
return
-
1
;
int32_t
fileNameLen
=
0
;
if
(
suffix
)
if
(
suffix
)
{
fileNameLen
=
snprintf
(
fname
,
PATH_MAX
,
"%s.%d.%s"
,
delimiterPos
+
1
,
ts
,
suffix
);
else
}
else
{
fileNameLen
=
snprintf
(
fname
,
PATH_MAX
,
"%s.%d"
,
delimiterPos
+
1
,
ts
);
}
size_t
len
=
(
size
_t
)((
delimiterPos
-
fullPath
)
+
fileNameLen
+
1
);
int32_t
len
=
(
int32
_t
)((
delimiterPos
-
fullPath
)
+
fileNameLen
+
1
);
if
(
*
dstPath
==
NULL
)
{
*
dstPath
=
calloc
(
1
,
len
+
1
);
if
(
*
dstPath
==
NULL
)
return
-
1
;
...
...
@@ -69,9 +71,9 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
return
rename
(
fullPath
,
*
dstPath
);
}
ssize_t
taosTReadImp
(
int
fd
,
void
*
buf
,
size
_t
count
)
{
size_t
leftbytes
=
count
;
ssize
_t
readbytes
;
int64_t
taosRead
(
int32_t
fd
,
void
*
buf
,
int64
_t
count
)
{
int64_t
leftbytes
=
count
;
int64
_t
readbytes
;
char
*
tbuf
=
(
char
*
)
buf
;
while
(
leftbytes
>
0
)
{
...
...
@@ -83,19 +85,19 @@ ssize_t taosTReadImp(int fd, void *buf, size_t count) {
return
-
1
;
}
}
else
if
(
readbytes
==
0
)
{
return
(
ssize
_t
)(
count
-
leftbytes
);
return
(
int64
_t
)(
count
-
leftbytes
);
}
leftbytes
-=
readbytes
;
tbuf
+=
readbytes
;
}
return
(
ssize_t
)
count
;
return
count
;
}
ssize_t
taosTWriteImp
(
int
fd
,
void
*
buf
,
size
_t
n
)
{
size_t
nleft
=
n
;
ssize
_t
nwritten
=
0
;
int64_t
taosWrite
(
int32_t
fd
,
void
*
buf
,
int64
_t
n
)
{
int64_t
nleft
=
n
;
int64
_t
nwritten
=
0
;
char
*
tbuf
=
(
char
*
)
buf
;
while
(
nleft
>
0
)
{
...
...
@@ -110,13 +112,14 @@ ssize_t taosTWriteImp(int fd, void *buf, size_t n) {
tbuf
+=
nwritten
;
}
return
(
ssize_t
)
n
;
return
n
;
}
#ifndef TAOS_OS_FUNC_FILE_SENDIFLE
ssize_t
taosTSendFileImp
(
int
dfd
,
int
sfd
,
off_t
*
offset
,
size_t
size
)
{
size_t
leftbytes
=
size
;
ssize_t
sentbytes
;
int64_t
taosSendFile
(
int32_t
dfd
,
int32_t
sfd
,
int64_t
*
offset
,
int64_t
size
)
{
int64_t
leftbytes
=
size
;
int64_t
sentbytes
;
while
(
leftbytes
>
0
)
{
/*
...
...
@@ -131,7 +134,7 @@ ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
return
-
1
;
}
}
else
if
(
sentbytes
==
0
)
{
return
(
ssize
_t
)(
size
-
leftbytes
);
return
(
int64
_t
)(
size
-
leftbytes
);
}
leftbytes
-=
sentbytes
;
...
...
@@ -139,4 +142,17 @@ ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
return
size
;
}
int64_t
taosFSendFile
(
FILE
*
outfile
,
FILE
*
infile
,
int64_t
*
offset
,
int64_t
size
)
{
return
taosSendFile
(
fileno
(
outfile
),
fileno
(
infile
),
offset
,
size
);
}
#endif
#ifndef TAOS_OS_FUNC_FILE_FTRUNCATE
int32_t
taosFtruncate
(
int32_t
fd
,
int64_t
length
)
{
return
ftruncate
(
fd
,
length
);
}
#endif
\ No newline at end of file
src/os/src/detail/osSocket.c
浏览文件 @
ce8e1177
...
...
@@ -19,8 +19,8 @@
#ifndef TAOS_OS_FUNC_SOCKET
int
taosSetNonblocking
(
SOCKET
sock
,
in
t
on
)
{
int
flags
=
0
;
int
32_t
taosSetNonblocking
(
SOCKET
sock
,
int32_
t
on
)
{
int
32_t
flags
=
0
;
if
((
flags
=
fcntl
(
sock
,
F_GETFL
,
0
))
<
0
)
{
uError
(
"fcntl(F_GETFL) error: %d (%s)
\n
"
,
errno
,
strerror
(
errno
));
return
1
;
...
...
@@ -43,7 +43,7 @@ void taosBlockSIGPIPE() {
sigset_t
signal_mask
;
sigemptyset
(
&
signal_mask
);
sigaddset
(
&
signal_mask
,
SIGPIPE
);
int
rc
=
pthread_sigmask
(
SIG_BLOCK
,
&
signal_mask
,
NULL
);
int
32_t
rc
=
pthread_sigmask
(
SIG_BLOCK
,
&
signal_mask
,
NULL
);
if
(
rc
!=
0
)
{
uError
(
"failed to block SIGPIPE"
);
}
...
...
@@ -53,7 +53,7 @@ void taosBlockSIGPIPE() {
#ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int
taosSetSockOpt
(
SOCKET
socketfd
,
int
level
,
int
optname
,
void
*
optval
,
in
t
optlen
)
{
int
32_t
taosSetSockOpt
(
SOCKET
socketfd
,
int32_t
level
,
int32_t
optname
,
void
*
optval
,
int32_
t
optlen
)
{
return
setsockopt
(
socketfd
,
level
,
optname
,
optval
,
(
socklen_t
)
optlen
);
}
...
...
src/os/src/windows/wFile.c
浏览文件 @
ce8e1177
...
...
@@ -43,19 +43,19 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
#define _SEND_FILE_STEP_ 1000
int
taosFSendFileImp
(
FILE
*
out_file
,
FILE
*
in_file
,
int64_t
*
offset
,
int32
_t
count
)
{
int
64_t
taosFSendFile
(
FILE
*
out_file
,
FILE
*
in_file
,
int64_t
*
offset
,
int64
_t
count
)
{
fseek
(
in_file
,
(
int32_t
)(
*
offset
),
0
);
int
writeLen
=
0
;
int
64_t
writeLen
=
0
;
uint8_t
buffer
[
_SEND_FILE_STEP_
]
=
{
0
};
for
(
int
len
=
0
;
len
<
(
count
-
_SEND_FILE_STEP_
);
len
+=
_SEND_FILE_STEP_
)
{
for
(
int
64_t
len
=
0
;
len
<
(
count
-
_SEND_FILE_STEP_
);
len
+=
_SEND_FILE_STEP_
)
{
size_t
rlen
=
fread
(
buffer
,
1
,
_SEND_FILE_STEP_
,
in_file
);
if
(
rlen
<=
0
)
{
return
writeLen
;
}
else
if
(
rlen
<
_SEND_FILE_STEP_
)
{
fwrite
(
buffer
,
1
,
rlen
,
out_file
);
return
(
int
)(
writeLen
+
rlen
);
return
(
int
64_t
)(
writeLen
+
rlen
);
}
else
{
fwrite
(
buffer
,
1
,
_SEND_FILE_STEP_
,
in_file
);
...
...
@@ -63,7 +63,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
}
}
int
remain
=
count
-
writeLen
;
int
64_t
remain
=
count
-
writeLen
;
if
(
remain
>
0
)
{
size_t
rlen
=
fread
(
buffer
,
1
,
remain
,
in_file
);
if
(
rlen
<=
0
)
{
...
...
@@ -78,12 +78,12 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
return
writeLen
;
}
ssize_t
taosTSendFileImp
(
int
dfd
,
int
sfd
,
off_t
*
offset
,
size
_t
size
)
{
uError
(
"taos
TSendFileImp
no implemented yet"
);
int64_t
taosSendFile
(
int32_t
dfd
,
int32_t
sfd
,
int64_t
*
offset
,
int64
_t
size
)
{
uError
(
"taos
SendFile
no implemented yet"
);
return
0
;
}
int
taosFtruncate
(
in
t
fd
,
int64_t
length
)
{
int
32_t
taosFtruncate
(
int32_
t
fd
,
int64_t
length
)
{
uError
(
"taosFtruncate no implemented yet"
);
return
0
;
}
\ No newline at end of file
src/os/src/windows/wSocket.c
浏览文件 @
ce8e1177
...
...
@@ -34,7 +34,7 @@ void taosWinSocketInit() {
}
}
int
taosSetNonblocking
(
SOCKET
sock
,
in
t
on
)
{
int
32_t
taosSetNonblocking
(
SOCKET
sock
,
int32_
t
on
)
{
u_long
mode
;
if
(
on
)
{
mode
=
1
;
...
...
@@ -48,7 +48,7 @@ int taosSetNonblocking(SOCKET sock, int on) {
void
taosBlockSIGPIPE
()
{}
int
taosSetSockOpt
(
SOCKET
socketfd
,
int
level
,
int
optname
,
void
*
optval
,
in
t
optlen
)
{
int
32_t
taosSetSockOpt
(
SOCKET
socketfd
,
int32_t
level
,
int32_t
optname
,
void
*
optval
,
int32_
t
optlen
)
{
if
(
level
==
SOL_SOCKET
&&
optname
==
TCP_KEEPCNT
)
{
return
0
;
}
...
...
@@ -72,7 +72,7 @@ int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int op
uint32_t
taosInetAddr
(
char
*
ipAddr
)
{
uint32_t
value
;
int
ret
=
inet_pton
(
AF_INET
,
ipAddr
,
&
value
);
int
32_t
ret
=
inet_pton
(
AF_INET
,
ipAddr
,
&
value
);
if
(
ret
<=
0
)
{
return
INADDR_NONE
;
}
else
{
...
...
src/query/src/qTsbuf.c
浏览文件 @
ce8e1177
...
...
@@ -772,7 +772,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
int64_t
offset
=
getDataStartOffset
();
int32_t
size
=
(
int32_t
)
pSrcBuf
->
fileSize
-
(
int32_t
)
offset
;
ssize
_t
rc
=
taosFSendFile
(
pDestBuf
->
f
,
pSrcBuf
->
f
,
&
offset
,
size
);
int64
_t
rc
=
taosFSendFile
(
pDestBuf
->
f
,
pSrcBuf
->
f
,
&
offset
,
size
);
if
(
rc
==
-
1
)
{
// tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno));
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
ce8e1177
...
...
@@ -149,7 +149,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
int
sfd
=
open
(
name
,
O_RDONLY
);
if
(
sfd
<
0
)
break
;
ret
=
taos
T
SendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
fileInfo
.
size
);
ret
=
taosSendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
fileInfo
.
size
);
close
(
sfd
);
if
(
ret
<
0
)
break
;
...
...
@@ -406,7 +406,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
int
sfd
=
open
(
fname
,
O_RDONLY
);
if
(
sfd
<
0
)
break
;
code
=
taos
T
SendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
size
);
code
=
taosSendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
size
);
close
(
sfd
);
if
(
code
<
0
)
break
;
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
ce8e1177
...
...
@@ -428,7 +428,7 @@ int tsdbUpdateFileHeader(SFile *pFile) {
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
if
(
taos
T
Write
(
pFile
->
fd
,
(
void
*
)
buf
,
TSDB_FILE_HEAD_SIZE
)
<
TSDB_FILE_HEAD_SIZE
)
{
if
(
taosWrite
(
pFile
->
fd
,
(
void
*
)
buf
,
TSDB_FILE_HEAD_SIZE
)
<
TSDB_FILE_HEAD_SIZE
)
{
tsdbError
(
"failed to write %d bytes to file %s since %s"
,
TSDB_FILE_HEAD_SIZE
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -493,7 +493,7 @@ int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) {
return
-
1
;
}
if
(
taos
T
Read
(
pFile
->
fd
,
buf
,
TSDB_FILE_HEAD_SIZE
)
<
TSDB_FILE_HEAD_SIZE
)
{
if
(
taosRead
(
pFile
->
fd
,
buf
,
TSDB_FILE_HEAD_SIZE
)
<
TSDB_FILE_HEAD_SIZE
)
{
tsdbError
(
"failed to read file %s header part with %d bytes, reason:%s"
,
pFile
->
fname
,
TSDB_FILE_HEAD_SIZE
,
strerror
(
errno
));
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
ce8e1177
...
...
@@ -582,7 +582,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
buf
,
TSDB_FILE_HEAD_SIZE
);
if
(
taos
T
Write
(
fd
,
(
void
*
)
buf
,
TSDB_FILE_HEAD_SIZE
)
<
TSDB_FILE_HEAD_SIZE
)
{
if
(
taosWrite
(
fd
,
(
void
*
)
buf
,
TSDB_FILE_HEAD_SIZE
)
<
TSDB_FILE_HEAD_SIZE
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
pCfg
->
tsdbId
,
TSDB_FILE_HEAD_SIZE
,
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -623,7 +623,7 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) {
goto
_err
;
}
if
(
taos
T
Read
(
fd
,
(
void
*
)
buf
,
TSDB_FILE_HEAD_SIZE
)
<
TSDB_FILE_HEAD_SIZE
)
{
if
(
taosRead
(
fd
,
(
void
*
)
buf
,
TSDB_FILE_HEAD_SIZE
)
<
TSDB_FILE_HEAD_SIZE
)
{
tsdbError
(
"failed to read %d bytes from file %s since %s"
,
TSDB_FILE_HEAD_SIZE
,
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
ce8e1177
...
...
@@ -322,6 +322,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
memset
(
pMergeInfo
,
0
,
sizeof
(
*
pMergeInfo
));
pMergeInfo
->
keyFirst
=
INT64_MAX
;
pMergeInfo
->
keyLast
=
INT64_MIN
;
if
(
pCols
)
tdResetDataCols
(
pCols
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
dataRowKey
(
row
)
>
maxKey
)
{
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
ce8e1177
...
...
@@ -338,7 +338,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
return -1;
}
if (taos
T
SendFile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) {
if (taosSendFile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) {
tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo),
helperLastF(pHelper)->fname, helperNewLastF(pHelper)->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
...
...
@@ -383,7 +383,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pIdx
->
tid
=
pHelper
->
tableInfo
.
tid
;
ASSERT
(
pIdx
->
offset
>=
TSDB_FILE_HEAD_SIZE
);
if
(
taos
T
Write
(
pFile
->
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
(
int
)
pIdx
->
len
)
{
if
(
taosWrite
(
pFile
->
fd
,
(
void
*
)(
pHelper
->
pCompInfo
),
pIdx
->
len
)
<
(
int
)
pIdx
->
len
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pIdx
->
len
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -435,7 +435,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
ASSERT
(
offset
==
pFile
->
info
.
size
);
if
(
taos
T
Write
(
pFile
->
fd
,
(
void
*
)
pHelper
->
pWIdx
,
pFile
->
info
.
len
)
<
(
int
)
pFile
->
info
.
len
)
{
if
(
taosWrite
(
pFile
->
fd
,
(
void
*
)
pHelper
->
pWIdx
,
pFile
->
info
.
len
)
<
(
int
)
pFile
->
info
.
len
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pFile
->
info
.
len
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -457,7 +457,7 @@ int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffe
return
-
1
;
}
if
(
taos
T
Read
(
pFile
->
fd
,
buffer
,
len
)
<
len
)
{
if
(
taosRead
(
pFile
->
fd
,
buffer
,
len
)
<
len
)
{
tsdbError
(
"%s: read file %s offset %u len %u failed since %s"
,
prefixMsg
,
pFile
->
fname
,
offset
,
len
,
strerror
(
errno
));
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
...
...
@@ -554,7 +554,7 @@ int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) {
return
-
1
;
}
if
(
taos
T
Read
(
pFile
->
fd
,
(
void
*
)(
*
ppCompInfo
),
pIdx
->
len
)
<
(
int
)
pIdx
->
len
)
{
if
(
taosRead
(
pFile
->
fd
,
(
void
*
)(
*
ppCompInfo
),
pIdx
->
len
)
<
(
int
)
pIdx
->
len
)
{
tsdbError
(
"%s: read file %s offset %u len %u failed since %s"
,
prefixMsg
,
pFile
->
fname
,
pIdx
->
offset
,
pIdx
->
len
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -611,7 +611,7 @@ int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
return
-
1
;
}
if
(
taos
T
Read
(
pFile
->
fd
,
(
void
*
)
pHelper
->
pCompData
,
tsize
)
<
tsize
)
{
if
(
taosRead
(
pFile
->
fd
,
(
void
*
)
pHelper
->
pCompData
,
tsize
)
<
tsize
)
{
tsdbError
(
"vgId:%d failed to read %"
PRIzu
" bytes from file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
tsize
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -826,7 +826,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
sizeof
(
TSCKSUM
));
// Write the whole block to file
if
(
taos
T
Write
(
pFile
->
fd
,
(
void
*
)
pCompData
,
lsize
)
<
lsize
)
{
if
(
taosWrite
(
pFile
->
fd
,
(
void
*
)
pCompData
,
lsize
)
<
lsize
)
{
tsdbError
(
"vgId:%d failed to write %d bytes to file %s since %s"
,
REPO_ID
(
helperRepo
(
pHelper
)),
lsize
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -1252,7 +1252,7 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
return
-
1
;
}
if
(
taos
T
Read
(
pFile
->
fd
,
pHelper
->
pBuffer
,
pCompCol
->
len
)
<
pCompCol
->
len
)
{
if
(
taosRead
(
pFile
->
fd
,
pHelper
->
pBuffer
,
pCompCol
->
len
)
<
pCompCol
->
len
)
{
tsdbError
(
"vgId:%d failed to read %d bytes from file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pCompCol
->
len
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -1367,7 +1367,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
if
(
taos
T
Read
(
fd
,
(
void
*
)
pCompData
,
pCompBlock
->
len
)
<
pCompBlock
->
len
)
{
if
(
taosRead
(
fd
,
(
void
*
)
pCompData
,
pCompBlock
->
len
)
<
pCompBlock
->
len
)
{
tsdbError
(
"vgId:%d failed to read %d bytes from file %s since %s"
,
REPO_ID
(
pHelper
->
pRepo
),
pCompBlock
->
len
,
pFile
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -1495,7 +1495,6 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
ASSERT
(
pIdx
->
len
>
0
);
SCompBlock
*
pCompBlock
=
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
);
ASSERT
(
pCompBlock
->
last
&&
pCompBlock
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
);
tdResetDataCols
(
pDataCols
);
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
maxKey
,
defaultRowsInBlock
-
pCompBlock
->
numOfRows
,
pDataCols
,
NULL
,
0
,
pCfg
->
update
,
pMergeInfo
);
...
...
@@ -1525,7 +1524,6 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
}
}
else
{
ASSERT
(
!
pHelper
->
hasOldLastBlock
);
tdResetDataCols
(
pDataCols
);
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
maxKey
,
defaultRowsInBlock
,
pDataCols
,
NULL
,
0
,
pCfg
->
update
,
pMergeInfo
);
ASSERT
(
pMergeInfo
->
rowsInserted
==
pMergeInfo
->
nOperations
&&
pMergeInfo
->
nOperations
==
pDataCols
->
numOfRows
);
...
...
@@ -1571,7 +1569,6 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
if
((
!
TSDB_IS_LAST_BLOCK
(
&
oBlock
))
&&
keyFirst
<
pCompBlock
->
keyFirst
)
{
while
(
true
)
{
tdResetDataCols
(
pDataCols
);
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
oBlock
.
keyFirst
-
1
,
defaultRowsInBlock
,
pDataCols
,
NULL
,
0
,
pCfg
->
update
,
pMergeInfo
);
ASSERT
(
pMergeInfo
->
rowsInserted
==
pMergeInfo
->
nOperations
&&
pMergeInfo
->
nOperations
==
pDataCols
->
numOfRows
);
...
...
src/util/inc/tsocket.h
浏览文件 @
ce8e1177
...
...
@@ -20,21 +20,21 @@
extern
"C"
{
#endif
int
taosReadn
(
SOCKET
sock
,
char
*
buffer
,
in
t
len
);
int
taosWriteMsg
(
SOCKET
fd
,
void
*
ptr
,
in
t
nbytes
);
int
taosReadMsg
(
SOCKET
fd
,
void
*
ptr
,
in
t
nbytes
);
int
taosNonblockwrite
(
SOCKET
fd
,
char
*
ptr
,
in
t
nbytes
);
int
taosCopyFds
(
SOCKET
sfd
,
SOCKET
dfd
,
int64_t
len
);
int
taosSetNonblocking
(
SOCKET
sock
,
in
t
on
);
int
32_t
taosReadn
(
SOCKET
sock
,
char
*
buffer
,
int32_
t
len
);
int
32_t
taosWriteMsg
(
SOCKET
fd
,
void
*
ptr
,
int32_
t
nbytes
);
int
32_t
taosReadMsg
(
SOCKET
fd
,
void
*
ptr
,
int32_
t
nbytes
);
int
32_t
taosNonblockwrite
(
SOCKET
fd
,
char
*
ptr
,
int32_
t
nbytes
);
int
32_t
taosCopyFds
(
SOCKET
sfd
,
SOCKET
dfd
,
int64_t
len
);
int
32_t
taosSetNonblocking
(
SOCKET
sock
,
int32_
t
on
);
SOCKET
taosOpenUdpSocket
(
uint32_t
localIp
,
uint16_t
localPort
);
SOCKET
taosOpenTcpClientSocket
(
uint32_t
ip
,
uint16_t
port
,
uint32_t
localIp
);
SOCKET
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
);
int
taosKeepTcpAlive
(
SOCKET
sockFd
);
SOCKET
taosOpenUdpSocket
(
uint32_t
localIp
,
uint16_t
localPort
);
SOCKET
taosOpenTcpClientSocket
(
uint32_t
ip
,
uint16_t
port
,
uint32_t
localIp
);
SOCKET
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
);
int
32_t
taosKeepTcpAlive
(
SOCKET
sockFd
);
int
taosGetFqdn
(
char
*
);
int
32_t
taosGetFqdn
(
char
*
);
uint32_t
taosGetIpFromFqdn
(
const
char
*
);
void
tinet_ntoa
(
char
*
ipstr
,
u
nsigned
in
t
ip
);
void
tinet_ntoa
(
char
*
ipstr
,
u
int32_
t
ip
);
uint32_t
ip2uint
(
const
char
*
const
ip_addr
);
#ifdef __cplusplus
...
...
src/util/src/tkvstore.c
浏览文件 @
ce8e1177
...
...
@@ -188,7 +188,7 @@ int tdKVStoreStartCommit(SKVStore *pStore) {
goto
_err
;
}
if
(
taos
T
SendFile
(
pStore
->
sfd
,
pStore
->
fd
,
NULL
,
TD_KVSTORE_HEADER_SIZE
)
<
TD_KVSTORE_HEADER_SIZE
)
{
if
(
taosSendFile
(
pStore
->
sfd
,
pStore
->
fd
,
NULL
,
TD_KVSTORE_HEADER_SIZE
)
<
TD_KVSTORE_HEADER_SIZE
)
{
uError
(
"failed to send file %d bytes since %s"
,
TD_KVSTORE_HEADER_SIZE
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
...
...
@@ -248,13 +248,13 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
ASSERT
(
tlen
==
POINTER_DISTANCE
(
pBuf
,
buf
));
ASSERT
(
tlen
==
sizeof
(
SKVRecord
));
if
(
taos
T
Write
(
pStore
->
fd
,
buf
,
tlen
)
<
tlen
)
{
if
(
taosWrite
(
pStore
->
fd
,
buf
,
tlen
)
<
tlen
)
{
uError
(
"failed to write %d bytes to file %s since %s"
,
tlen
,
pStore
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
if
(
taos
T
Write
(
pStore
->
fd
,
cont
,
contLen
)
<
contLen
)
{
if
(
taosWrite
(
pStore
->
fd
,
cont
,
contLen
)
<
contLen
)
{
uError
(
"failed to write %d bytes to file %s since %s"
,
contLen
,
pStore
->
fname
,
strerror
(
errno
));
return
-
1
;
}
...
...
@@ -292,7 +292,7 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
void
*
pBuf
=
buf
;
tdEncodeKVRecord
(
&
pBuf
,
&
rInfo
);
if
(
taos
T
Write
(
pStore
->
fd
,
buf
,
POINTER_DISTANCE
(
pBuf
,
buf
))
<
POINTER_DISTANCE
(
pBuf
,
buf
))
{
if
(
taosWrite
(
pStore
->
fd
,
buf
,
POINTER_DISTANCE
(
pBuf
,
buf
))
<
POINTER_DISTANCE
(
pBuf
,
buf
))
{
uError
(
"failed to write %"
PRId64
" bytes to file %s since %s"
,
(
int64_t
)(
POINTER_DISTANCE
(
pBuf
,
buf
)),
pStore
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -339,7 +339,7 @@ void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size) {
int
fd
=
open
(
fname
,
O_RDONLY
);
if
(
fd
<
0
)
goto
_err
;
if
(
taos
T
Read
(
fd
,
buf
,
TD_KVSTORE_HEADER_SIZE
)
<
TD_KVSTORE_HEADER_SIZE
)
goto
_err
;
if
(
taosRead
(
fd
,
buf
,
TD_KVSTORE_HEADER_SIZE
)
<
TD_KVSTORE_HEADER_SIZE
)
goto
_err
;
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
buf
,
TD_KVSTORE_HEADER_SIZE
))
goto
_err
;
void
*
pBuf
=
(
void
*
)
buf
;
...
...
@@ -368,7 +368,7 @@ static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t
return
-
1
;
}
if
(
taos
T
Read
(
fd
,
buf
,
TD_KVSTORE_HEADER_SIZE
)
<
TD_KVSTORE_HEADER_SIZE
)
{
if
(
taosRead
(
fd
,
buf
,
TD_KVSTORE_HEADER_SIZE
)
<
TD_KVSTORE_HEADER_SIZE
)
{
uError
(
"failed to read %d bytes from file %s since %s"
,
TD_KVSTORE_HEADER_SIZE
,
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -402,7 +402,7 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
ASSERT
(
POINTER_DISTANCE
(
pBuf
,
buf
)
+
sizeof
(
TSCKSUM
)
<=
TD_KVSTORE_HEADER_SIZE
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
buf
,
TD_KVSTORE_HEADER_SIZE
);
if
(
taos
T
Write
(
fd
,
buf
,
TD_KVSTORE_HEADER_SIZE
)
<
TD_KVSTORE_HEADER_SIZE
)
{
if
(
taosWrite
(
fd
,
buf
,
TD_KVSTORE_HEADER_SIZE
)
<
TD_KVSTORE_HEADER_SIZE
)
{
uError
(
"failed to write %d bytes to file %s since %s"
,
TD_KVSTORE_HEADER_SIZE
,
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -535,7 +535,7 @@ static int tdRestoreKVStore(SKVStore *pStore) {
ASSERT
(
pStore
->
info
.
size
==
TD_KVSTORE_HEADER_SIZE
);
while
(
true
)
{
ssize_t
tsize
=
taosT
Read
(
pStore
->
fd
,
tbuf
,
sizeof
(
SKVRecord
));
int64_t
tsize
=
taos
Read
(
pStore
->
fd
,
tbuf
,
sizeof
(
SKVRecord
));
if
(
tsize
==
0
)
break
;
if
(
tsize
<
sizeof
(
SKVRecord
))
{
uError
(
"failed to read %"
PRIzu
" bytes from file %s at offset %"
PRId64
"since %s"
,
sizeof
(
SKVRecord
),
pStore
->
fname
,
...
...
@@ -598,7 +598,7 @@ static int tdRestoreKVStore(SKVStore *pStore) {
goto
_err
;
}
if
(
taos
T
Read
(
pStore
->
fd
,
buf
,
(
size_t
)
pRecord
->
size
)
<
pRecord
->
size
)
{
if
(
taosRead
(
pStore
->
fd
,
buf
,
(
size_t
)
pRecord
->
size
)
<
pRecord
->
size
)
{
uError
(
"failed to read %"
PRId64
" bytes from file %s since %s, offset %"
PRId64
,
pRecord
->
size
,
pStore
->
fname
,
strerror
(
errno
),
pRecord
->
offset
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
src/util/src/tlog.c
浏览文件 @
ce8e1177
...
...
@@ -336,11 +336,11 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
lseek
(
tsLogObj
.
logHandle
->
fd
,
0
,
SEEK_END
);
sprintf
(
name
,
"==================================================
\n
"
);
taos
T
Write
(
tsLogObj
.
logHandle
->
fd
,
name
,
(
uint32_t
)
strlen
(
name
));
taosWrite
(
tsLogObj
.
logHandle
->
fd
,
name
,
(
uint32_t
)
strlen
(
name
));
sprintf
(
name
,
" new log file
\n
"
);
taos
T
Write
(
tsLogObj
.
logHandle
->
fd
,
name
,
(
uint32_t
)
strlen
(
name
));
taosWrite
(
tsLogObj
.
logHandle
->
fd
,
name
,
(
uint32_t
)
strlen
(
name
));
sprintf
(
name
,
"==================================================
\n
"
);
taos
T
Write
(
tsLogObj
.
logHandle
->
fd
,
name
,
(
uint32_t
)
strlen
(
name
));
taosWrite
(
tsLogObj
.
logHandle
->
fd
,
name
,
(
uint32_t
)
strlen
(
name
));
return
0
;
}
...
...
@@ -390,7 +390,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
if
(
tsAsyncLog
)
{
taosPushLogBuffer
(
tsLogObj
.
logHandle
,
buffer
,
len
);
}
else
{
taos
T
Write
(
tsLogObj
.
logHandle
->
fd
,
buffer
,
len
);
taosWrite
(
tsLogObj
.
logHandle
->
fd
,
buffer
,
len
);
}
if
(
tsLogObj
.
maxLines
>
0
)
{
...
...
@@ -400,7 +400,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
}
}
if
(
dflag
&
DEBUG_SCREEN
)
taos
T
Write
(
1
,
buffer
,
(
uint32_t
)
len
);
if
(
dflag
&
DEBUG_SCREEN
)
taosWrite
(
1
,
buffer
,
(
uint32_t
)
len
);
}
void
taosDumpData
(
unsigned
char
*
msg
,
int32_t
len
)
{
...
...
@@ -419,7 +419,7 @@ void taosDumpData(unsigned char *msg, int32_t len) {
pos
+=
3
;
if
(
c
>=
16
)
{
temp
[
pos
++
]
=
'\n'
;
taos
T
Write
(
tsLogObj
.
logHandle
->
fd
,
temp
,
(
uint32_t
)
pos
);
taosWrite
(
tsLogObj
.
logHandle
->
fd
,
temp
,
(
uint32_t
)
pos
);
c
=
0
;
pos
=
0
;
}
...
...
@@ -427,9 +427,7 @@ void taosDumpData(unsigned char *msg, int32_t len) {
temp
[
pos
++
]
=
'\n'
;
taosTWrite
(
tsLogObj
.
logHandle
->
fd
,
temp
,
(
uint32_t
)
pos
);
return
;
taosWrite
(
tsLogObj
.
logHandle
->
fd
,
temp
,
(
uint32_t
)
pos
);
}
void
taosPrintLongString
(
const
char
*
flags
,
int32_t
dflag
,
const
char
*
format
,
...)
{
...
...
@@ -467,7 +465,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
if
(
tsAsyncLog
)
{
taosPushLogBuffer
(
tsLogObj
.
logHandle
,
buffer
,
len
);
}
else
{
taos
T
Write
(
tsLogObj
.
logHandle
->
fd
,
buffer
,
len
);
taosWrite
(
tsLogObj
.
logHandle
->
fd
,
buffer
,
len
);
}
if
(
tsLogObj
.
maxLines
>
0
)
{
...
...
@@ -477,7 +475,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
}
}
if
(
dflag
&
DEBUG_SCREEN
)
taos
T
Write
(
1
,
buffer
,
(
uint32_t
)
len
);
if
(
dflag
&
DEBUG_SCREEN
)
taosWrite
(
1
,
buffer
,
(
uint32_t
)
len
);
}
#if 0
...
...
@@ -606,7 +604,7 @@ static void *taosAsyncOutputLog(void *param) {
while
(
1
)
{
log_size
=
taosPollLogBuffer
(
tLogBuff
,
tempBuffer
,
TSDB_DEFAULT_LOG_BUF_UNIT
);
if
(
log_size
)
{
taos
T
Write
(
tLogBuff
->
fd
,
tempBuffer
,
log_size
);
taosWrite
(
tLogBuff
->
fd
,
tempBuffer
,
log_size
);
LOG_BUF_START
(
tLogBuff
)
=
(
LOG_BUF_START
(
tLogBuff
)
+
log_size
)
%
LOG_BUF_SIZE
(
tLogBuff
);
}
else
{
break
;
...
...
src/util/src/tnote.c
浏览文件 @
ce8e1177
...
...
@@ -265,7 +265,7 @@ void taosNotePrint(taosNoteInfo * pNote, const char * const format, ...)
buffer
[
len
]
=
0
;
if
(
pNote
->
taosNoteFd
>=
0
)
{
taos
T
Write
(
pNote
->
taosNoteFd
,
buffer
,
(
unsigned
int
)
len
);
taosWrite
(
pNote
->
taosNoteFd
,
buffer
,
(
unsigned
int
)
len
);
if
(
pNote
->
taosNoteMaxLines
>
0
)
{
pNote
->
taosNoteLines
++
;
...
...
src/util/src/tsocket.c
浏览文件 @
ce8e1177
...
...
@@ -18,7 +18,7 @@
#include "tsocket.h"
#include "taoserror.h"
int
taosGetFqdn
(
char
*
fqdn
)
{
int
32_t
taosGetFqdn
(
char
*
fqdn
)
{
char
hostname
[
1024
];
hostname
[
1023
]
=
'\0'
;
if
(
gethostname
(
hostname
,
1023
)
==
-
1
)
{
...
...
@@ -26,10 +26,10 @@ int taosGetFqdn(char *fqdn) {
return
-
1
;
}
struct
addrinfo
hints
=
{
0
};
struct
addrinfo
hints
=
{
0
};
struct
addrinfo
*
result
=
NULL
;
hints
.
ai_flags
=
AI_CANONNAME
;
int
ret
=
getaddrinfo
(
hostname
,
NULL
,
&
hints
,
&
result
);
int
32_t
ret
=
getaddrinfo
(
hostname
,
NULL
,
&
hints
,
&
result
);
if
(
!
result
)
{
uError
(
"failed to get fqdn, code:%d, reason:%s"
,
ret
,
gai_strerror
(
ret
));
return
-
1
;
...
...
@@ -49,10 +49,10 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) {
int32_t
ret
=
getaddrinfo
(
fqdn
,
NULL
,
&
hints
,
&
result
);
if
(
result
)
{
struct
sockaddr
*
sa
=
result
->
ai_addr
;
struct
sockaddr_in
*
si
=
(
struct
sockaddr_in
*
)
sa
;
struct
in_addr
ia
=
si
->
sin_addr
;
uint32_t
ip
=
ia
.
s_addr
;
struct
sockaddr
*
sa
=
result
->
ai_addr
;
struct
sockaddr_in
*
si
=
(
struct
sockaddr_in
*
)
sa
;
struct
in_addr
ia
=
si
->
sin_addr
;
uint32_t
ip
=
ia
.
s_addr
;
freeaddrinfo
(
result
);
return
ip
;
}
else
{
...
...
@@ -70,7 +70,7 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) {
}
}
// Function converting an IP address string to an u
nsigned in
t.
// Function converting an IP address string to an u
int32_
t.
uint32_t
ip2uint
(
const
char
*
const
ip_addr
)
{
char
ip_addr_cpy
[
20
];
char
ip
[
5
];
...
...
@@ -81,7 +81,7 @@ uint32_t ip2uint(const char *const ip_addr) {
s_start
=
ip_addr_cpy
;
s_end
=
ip_addr_cpy
;
int
k
;
int
32_t
k
;
for
(
k
=
0
;
*
s_start
!=
'\0'
;
s_start
=
s_end
)
{
for
(
s_end
=
s_start
;
*
s_end
!=
'.'
&&
*
s_end
!=
'\0'
;
s_end
++
)
{
...
...
@@ -95,17 +95,17 @@ uint32_t ip2uint(const char *const ip_addr) {
ip
[
k
]
=
'\0'
;
return
*
((
u
nsigned
in
t
*
)
ip
);
return
*
((
u
int32_
t
*
)
ip
);
}
int
taosWriteMsg
(
SOCKET
fd
,
void
*
buf
,
in
t
nbytes
)
{
int
nleft
,
nwritten
;
char
*
ptr
=
(
char
*
)
buf
;
int
32_t
taosWriteMsg
(
SOCKET
fd
,
void
*
buf
,
int32_
t
nbytes
)
{
int
32_t
nleft
,
nwritten
;
char
*
ptr
=
(
char
*
)
buf
;
nleft
=
nbytes
;
while
(
nleft
>
0
)
{
nwritten
=
(
int
)
taosWriteSocket
(
fd
,
(
char
*
)
ptr
,
(
size_t
)
nleft
);
nwritten
=
(
int
32_t
)
taosWriteSocket
(
fd
,
(
char
*
)
ptr
,
(
size_t
)
nleft
);
if
(
nwritten
<=
0
)
{
if
(
errno
==
EINTR
)
continue
;
...
...
@@ -120,16 +120,16 @@ int taosWriteMsg(SOCKET fd, void *buf, int nbytes) {
return
(
nbytes
-
nleft
);
}
int
taosReadMsg
(
SOCKET
fd
,
void
*
buf
,
in
t
nbytes
)
{
int
nleft
,
nread
;
char
*
ptr
=
(
char
*
)
buf
;
int
32_t
taosReadMsg
(
SOCKET
fd
,
void
*
buf
,
int32_
t
nbytes
)
{
int
32_t
nleft
,
nread
;
char
*
ptr
=
(
char
*
)
buf
;
nleft
=
nbytes
;
if
(
fd
<
0
)
return
-
1
;
while
(
nleft
>
0
)
{
nread
=
(
int
)
taosReadSocket
(
fd
,
ptr
,
(
size_t
)
nleft
);
nread
=
(
int
32_t
)
taosReadSocket
(
fd
,
ptr
,
(
size_t
)
nleft
);
if
(
nread
==
0
)
{
break
;
}
else
if
(
nread
<
0
)
{
...
...
@@ -147,11 +147,11 @@ int taosReadMsg(SOCKET fd, void *buf, int nbytes) {
return
(
nbytes
-
nleft
);
}
int
taosNonblockwrite
(
SOCKET
fd
,
char
*
ptr
,
in
t
nbytes
)
{
int
32_t
taosNonblockwrite
(
SOCKET
fd
,
char
*
ptr
,
int32_
t
nbytes
)
{
taosSetNonblocking
(
fd
,
1
);
int
nleft
,
nwritten
,
nready
;
fd_set
fset
;
int
32_t
nleft
,
nwritten
,
nready
;
fd_set
fset
;
struct
timeval
tv
;
nleft
=
nbytes
;
...
...
@@ -160,7 +160,7 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
tv
.
tv_usec
=
0
;
FD_ZERO
(
&
fset
);
FD_SET
(
fd
,
&
fset
);
if
((
nready
=
select
((
int
)(
fd
+
1
),
NULL
,
&
fset
,
NULL
,
&
tv
))
==
0
)
{
if
((
nready
=
select
((
int
32_t
)(
fd
+
1
),
NULL
,
&
fset
,
NULL
,
&
tv
))
==
0
)
{
errno
=
ETIMEDOUT
;
uError
(
"fd %d timeout, no enough space to write"
,
fd
);
break
;
...
...
@@ -172,7 +172,7 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
return
-
1
;
}
nwritten
=
(
int
)
taosSend
(
fd
,
ptr
,
(
size_t
)
nleft
,
MSG_NOSIGNAL
);
nwritten
=
(
int
32_t
)
taosSend
(
fd
,
ptr
,
(
size_t
)
nleft
,
MSG_NOSIGNAL
);
if
(
nwritten
<=
0
)
{
if
(
errno
==
EAGAIN
||
errno
==
EINTR
)
continue
;
...
...
@@ -189,10 +189,10 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
return
(
nbytes
-
nleft
);
}
int
taosReadn
(
SOCKET
fd
,
char
*
ptr
,
in
t
nbytes
)
{
int
nread
,
nready
,
nleft
=
nbytes
;
int
32_t
taosReadn
(
SOCKET
fd
,
char
*
ptr
,
int32_
t
nbytes
)
{
int
32_t
nread
,
nready
,
nleft
=
nbytes
;
fd_set
fset
;
fd_set
fset
;
struct
timeval
tv
;
while
(
nleft
>
0
)
{
...
...
@@ -200,7 +200,7 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
tv
.
tv_usec
=
0
;
FD_ZERO
(
&
fset
);
FD_SET
(
fd
,
&
fset
);
if
((
nready
=
select
((
int
)(
fd
+
1
),
NULL
,
&
fset
,
NULL
,
&
tv
))
==
0
)
{
if
((
nready
=
select
((
int
32_t
)(
fd
+
1
),
NULL
,
&
fset
,
NULL
,
&
tv
))
==
0
)
{
errno
=
ETIMEDOUT
;
uError
(
"fd %d timeout
\n
"
,
fd
);
break
;
...
...
@@ -210,7 +210,7 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
return
-
1
;
}
if
((
nread
=
(
int
)
taosReadSocket
(
fd
,
ptr
,
(
size_t
)
nleft
))
<
0
)
{
if
((
nread
=
(
int
32_t
)
taosReadSocket
(
fd
,
ptr
,
(
size_t
)
nleft
))
<
0
)
{
if
(
errno
==
EINTR
)
continue
;
uError
(
"read error, %d (%s)"
,
errno
,
strerror
(
errno
));
return
-
1
;
...
...
@@ -229,8 +229,8 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
SOCKET
taosOpenUdpSocket
(
uint32_t
ip
,
uint16_t
port
)
{
struct
sockaddr_in
localAddr
;
SOCKET
sockFd
;
int
bufSize
=
1024000
;
SOCKET
sockFd
;
int
32_t
bufSize
=
1024000
;
uDebug
(
"open udp socket:0x%x:%hu"
,
ip
,
port
);
...
...
@@ -239,7 +239,7 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
localAddr
.
sin_addr
.
s_addr
=
ip
;
localAddr
.
sin_port
=
(
uint16_t
)
htons
(
port
);
if
((
sockFd
=
(
int
)
socket
(
AF_INET
,
SOCK_DGRAM
,
0
))
<=
2
)
{
if
((
sockFd
=
(
int
32_t
)
socket
(
AF_INET
,
SOCK_DGRAM
,
0
))
<=
2
)
{
uError
(
"failed to open udp socket: %d (%s)"
,
errno
,
strerror
(
errno
));
taosCloseSocketNoCheck
(
sockFd
);
return
-
1
;
...
...
@@ -268,9 +268,9 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
}
SOCKET
taosOpenTcpClientSocket
(
uint32_t
destIp
,
uint16_t
destPort
,
uint32_t
clientIp
)
{
SOCKET
sockFd
=
0
;
SOCKET
sockFd
=
0
;
int32_t
ret
;
struct
sockaddr_in
serverAddr
,
clientAddr
;
int
ret
;
sockFd
=
socket
(
PF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
);
...
...
@@ -281,7 +281,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
}
/* set REUSEADDR option, so the portnumber can be re-used */
int
reuse
=
1
;
int
32_t
reuse
=
1
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_SOCKET
,
SO_REUSEADDR
,
(
void
*
)
&
reuse
,
sizeof
(
reuse
))
<
0
)
{
uError
(
"setsockopt SO_REUSEADDR failed: %d (%s)"
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
...
...
@@ -296,8 +296,8 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
/* bind socket to client address */
if
(
bind
(
sockFd
,
(
struct
sockaddr
*
)
&
clientAddr
,
sizeof
(
clientAddr
))
<
0
)
{
uError
(
"bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)"
,
clientIp
,
destIp
,
destPort
,
strerror
(
errno
));
uError
(
"bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)"
,
clientIp
,
destIp
,
destPort
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
return
-
1
;
}
...
...
@@ -311,7 +311,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
ret
=
connect
(
sockFd
,
(
struct
sockaddr
*
)
&
serverAddr
,
sizeof
(
serverAddr
));
if
(
ret
!=
0
)
{
//uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
//
uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket
(
sockFd
);
sockFd
=
-
1
;
}
else
{
...
...
@@ -321,36 +321,36 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
return
sockFd
;
}
int
taosKeepTcpAlive
(
SOCKET
sockFd
)
{
int
alive
=
1
;
int
32_t
taosKeepTcpAlive
(
SOCKET
sockFd
)
{
int
32_t
alive
=
1
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_SOCKET
,
SO_KEEPALIVE
,
(
void
*
)
&
alive
,
sizeof
(
alive
))
<
0
)
{
uError
(
"fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)"
,
sockFd
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
return
-
1
;
}
int
probes
=
3
;
int
32_t
probes
=
3
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_TCP
,
TCP_KEEPCNT
,
(
void
*
)
&
probes
,
sizeof
(
probes
))
<
0
)
{
uError
(
"fd:%d setsockopt SO_KEEPCNT failed: %d (%s)"
,
sockFd
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
return
-
1
;
}
int
alivetime
=
10
;
int
32_t
alivetime
=
10
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_TCP
,
TCP_KEEPIDLE
,
(
void
*
)
&
alivetime
,
sizeof
(
alivetime
))
<
0
)
{
uError
(
"fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)"
,
sockFd
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
return
-
1
;
}
int
interval
=
3
;
int
32_t
interval
=
3
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_TCP
,
TCP_KEEPINTVL
,
(
void
*
)
&
interval
,
sizeof
(
interval
))
<
0
)
{
uError
(
"fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)"
,
sockFd
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
return
-
1
;
}
int
nodelay
=
1
;
int
32_t
nodelay
=
1
;
if
(
taosSetSockOpt
(
sockFd
,
IPPROTO_TCP
,
TCP_NODELAY
,
(
void
*
)
&
nodelay
,
sizeof
(
nodelay
))
<
0
)
{
uError
(
"fd:%d setsockopt TCP_NODELAY failed %d (%s)"
,
sockFd
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
...
...
@@ -371,8 +371,8 @@ int taosKeepTcpAlive(SOCKET sockFd) {
SOCKET
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
)
{
struct
sockaddr_in
serverAdd
;
SOCKET
sockFd
;
int
reuse
;
SOCKET
sockFd
;
int
32_t
reuse
;
uDebug
(
"open tcp server socket:0x%x:%hu"
,
ip
,
port
);
...
...
@@ -381,7 +381,7 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
serverAdd
.
sin_addr
.
s_addr
=
ip
;
serverAdd
.
sin_port
=
(
uint16_t
)
htons
(
port
);
if
((
sockFd
=
(
int
)
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
))
<=
2
)
{
if
((
sockFd
=
(
int
32_t
)
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
))
<=
2
)
{
uError
(
"failed to open TCP socket: %d (%s)"
,
errno
,
strerror
(
errno
));
taosCloseSocketNoCheck
(
sockFd
);
return
-
1
;
...
...
@@ -417,38 +417,38 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
return
sockFd
;
}
void
tinet_ntoa
(
char
*
ipstr
,
u
nsigned
in
t
ip
)
{
void
tinet_ntoa
(
char
*
ipstr
,
u
int32_
t
ip
)
{
sprintf
(
ipstr
,
"%d.%d.%d.%d"
,
ip
&
0xFF
,
(
ip
>>
8
)
&
0xFF
,
(
ip
>>
16
)
&
0xFF
,
ip
>>
24
);
}
#define COPY_SIZE 32768
// sendfile shall be used
int
taosCopyFds
(
SOCKET
sfd
,
SOCKET
dfd
,
int64_t
len
)
{
int
32_t
taosCopyFds
(
SOCKET
sfd
,
SOCKET
dfd
,
int64_t
len
)
{
int64_t
leftLen
;
int
readLen
,
writeLen
;
int
32_t
readLen
,
writeLen
;
char
temp
[
COPY_SIZE
];
leftLen
=
len
;
while
(
leftLen
>
0
)
{
if
(
leftLen
<
COPY_SIZE
)
readLen
=
(
int
)
leftLen
;
readLen
=
(
int
32_t
)
leftLen
;
else
readLen
=
COPY_SIZE
;
// 4K
int
retLen
=
taosReadMsg
(
sfd
,
temp
,
(
in
t
)
readLen
);
int
32_t
retLen
=
taosReadMsg
(
sfd
,
temp
,
(
int32_
t
)
readLen
);
if
(
readLen
!=
retLen
)
{
uError
(
"read error, readLen:%d retLen:%d len:%"
PRId64
" leftLen:%"
PRId64
", reason:%s"
,
readLen
,
retLen
,
len
,
leftLen
,
strerror
(
errno
));
uError
(
"read error, readLen:%d retLen:%d len:%"
PRId64
" leftLen:%"
PRId64
", reason:%s"
,
readLen
,
retLen
,
len
,
leftLen
,
strerror
(
errno
));
return
-
1
;
}
writeLen
=
taosWriteMsg
(
dfd
,
temp
,
readLen
);
if
(
readLen
!=
writeLen
)
{
uError
(
"copy error, readLen:%d writeLen:%d len:%"
PRId64
" leftLen:%"
PRId64
", reason:%s"
,
readLen
,
writeLen
,
len
,
leftLen
,
strerror
(
errno
));
uError
(
"copy error, readLen:%d writeLen:%d len:%"
PRId64
" leftLen:%"
PRId64
", reason:%s"
,
readLen
,
writeLen
,
len
,
leftLen
,
strerror
(
errno
));
return
-
1
;
}
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
ce8e1177
...
...
@@ -387,6 +387,10 @@ void vnodeRelease(void *pVnodeRaw) {
pVnode
->
qMgmt
=
NULL
;
}
if
(
pVnode
->
wal
)
{
walStop
(
pVnode
->
wal
);
}
if
(
pVnode
->
tsdb
)
{
tsdbCloseRepo
(
pVnode
->
tsdb
,
1
);
pVnode
->
tsdb
=
NULL
;
...
...
src/wal/inc/walInt.h
浏览文件 @
ce8e1177
...
...
@@ -49,6 +49,8 @@ typedef struct {
int32_t
level
;
int32_t
fsyncPeriod
;
int32_t
fsyncSeq
;
int8_t
stop
;
int8_t
reserved
[
3
];
char
path
[
WAL_PATH_LEN
];
char
name
[
WAL_FILE_LEN
];
pthread_mutex_t
mutex
;
...
...
src/wal/src/walMgmt.c
浏览文件 @
ce8e1177
...
...
@@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "talloc.h"
#include "tref.h"
#include "twal.h"
#include "walInt.h"
...
...
@@ -110,6 +109,16 @@ int32_t walAlter(void *handle, SWalCfg *pCfg) {
return
TSDB_CODE_SUCCESS
;
}
void
walStop
(
void
*
handle
)
{
if
(
handle
==
NULL
)
return
;
SWal
*
pWal
=
handle
;
pthread_mutex_lock
(
&
pWal
->
mutex
);
pWal
->
stop
=
1
;
pthread_mutex_unlock
(
&
pWal
->
mutex
);
wDebug
(
"vgId:%d, stop write wal"
,
pWal
->
vgId
);
}
void
walClose
(
void
*
handle
)
{
if
(
handle
==
NULL
)
return
;
...
...
@@ -123,9 +132,7 @@ void walClose(void *handle) {
while
(
walGetNextFile
(
pWal
,
&
fileId
)
>=
0
)
{
snprintf
(
pWal
->
name
,
sizeof
(
pWal
->
name
),
"%s/%s%"
PRId64
,
pWal
->
path
,
WAL_PREFIX
,
fileId
);
if
(
fileId
==
pWal
->
fileId
)
{
wDebug
(
"vgId:%d, wal:%p file:%s, it is closed and kept"
,
pWal
->
vgId
,
pWal
,
pWal
->
name
);
}
else
if
(
remove
(
pWal
->
name
)
<
0
)
{
if
(
remove
(
pWal
->
name
)
<
0
)
{
wError
(
"vgId:%d, wal:%p file:%s, failed to remove"
,
pWal
->
vgId
,
pWal
,
pWal
->
name
);
}
else
{
wDebug
(
"vgId:%d, wal:%p file:%s, it is removed"
,
pWal
->
vgId
,
pWal
,
pWal
->
name
);
...
...
src/wal/src/walWrite.c
浏览文件 @
ce8e1177
...
...
@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "talloc.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "twal.h"
...
...
@@ -29,10 +28,15 @@ int32_t walRenew(void *handle) {
SWal
*
pWal
=
handle
;
int32_t
code
=
0
;
if
(
pWal
->
stop
)
{
wDebug
(
"vgId:%d, do not create a new wal file"
,
pWal
->
vgId
);
return
0
;
}
pthread_mutex_lock
(
&
pWal
->
mutex
);
if
(
pWal
->
fd
>=
0
)
{
close
(
pWal
->
fd
);
t
close
(
pWal
->
fd
);
wDebug
(
"vgId:%d, file:%s, it is closed"
,
pWal
->
vgId
,
pWal
->
name
);
}
...
...
@@ -90,7 +94,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
pthread_mutex_lock
(
&
pWal
->
mutex
);
if
(
taos
T
Write
(
pWal
->
fd
,
pHead
,
contLen
)
!=
contLen
)
{
if
(
taosWrite
(
pWal
->
fd
,
pHead
,
contLen
)
!=
contLen
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, file:%s, failed to write since %s"
,
pWal
->
vgId
,
pWal
->
name
,
strerror
(
errno
));
}
else
{
...
...
@@ -151,7 +155,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *
if
(
!
pWal
->
keep
)
return
TSDB_CODE_SUCCESS
;
if
(
count
==
0
)
{
wDebug
(
"vgId:%d,
file:%s not exist, renew it"
,
pWal
->
vgId
,
pWal
->
name
);
wDebug
(
"vgId:%d,
wal file not exist, renew it"
,
pWal
->
vgId
);
return
walRenew
(
pWal
);
}
else
{
// open the existing WAL file in append mode
...
...
@@ -204,7 +208,7 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, i
return
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
if
(
taos
T
Read
(
fd
,
pHead
,
sizeof
(
SWalHead
))
<=
0
)
{
if
(
taosRead
(
fd
,
pHead
,
sizeof
(
SWalHead
))
<=
0
)
{
wError
(
"vgId:%d, read to end of corrupted wal file, offset:%"
PRId64
,
pWal
->
vgId
,
pos
);
return
TSDB_CODE_WAL_FILE_CORRUPTED
;
}
...
...
@@ -245,7 +249,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
SWalHead
*
pHead
=
buffer
;
while
(
1
)
{
int32_t
ret
=
taos
T
Read
(
fd
,
pHead
,
sizeof
(
SWalHead
));
int32_t
ret
=
taosRead
(
fd
,
pHead
,
sizeof
(
SWalHead
));
if
(
ret
==
0
)
break
;
if
(
ret
<
0
)
{
...
...
@@ -282,7 +286,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
pHead
=
buffer
;
}
ret
=
taos
T
Read
(
fd
,
pHead
->
cont
,
pHead
->
len
);
ret
=
taosRead
(
fd
,
pHead
->
cont
,
pHead
->
len
);
if
(
ret
<
0
)
{
wError
(
"vgId:%d, file:%s, failed to read wal body since %s"
,
pWal
->
vgId
,
name
,
strerror
(
errno
));
code
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -305,7 +309,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
(
*
writeFp
)(
pVnode
,
pHead
,
TAOS_QTYPE_WAL
);
}
close
(
fd
);
t
close
(
fd
);
tfree
(
buffer
);
return
code
;
...
...
tests/script/general/insert/basic.sim
浏览文件 @
ce8e1177
...
...
@@ -8,8 +8,8 @@ sleep 3000
sql connect
$i = 0
$dbPrefix =
tb_in_db
$tbPrefix = t
b_in_tb
$dbPrefix =
d
$tbPrefix = t
$db = $dbPrefix . $i
$tb = $tbPrefix . $i
...
...
@@ -22,28 +22,27 @@ sql create table $tb (ts timestamp, speed int)
$x = 0
while $x < 10
$ms = $x . m
sql insert into $tb values (now + $ms , $x )
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
print =============== step 2
sql insert into $tb values (now - 5m , 10)
sql insert into $tb values (now - 6m , 10)
sql insert into $tb values (now - 7m , 10)
sql insert into $tb values (now - 8m , 10)
$x = 0
while $x < 5
$cc = $x * 60000
$ms = 1551481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
sql select * from $tb
print $rows points data are retrieved
if $rows != 14 then
return -1
endi
sql drop database $db
sleep 1000
sql show databases
if $rows != 0 then
if $rows != 15 then
return -1
endi
...
...
tests/test-all.sh
浏览文件 @
ce8e1177
...
...
@@ -9,7 +9,7 @@ NC='\033[0m'
function
runSimCaseOneByOne
{
while
read
-r
line
;
do
if
[[
$line
=
~ ^./test.sh
*
]]
;
then
if
[[
$line
=
~ ^./test.sh
*
]]
||
[[
$line
=
~ ^run
*
]]
;
then
case
=
`
echo
$line
|
grep
sim
$
|awk
'{print $NF}'
`
start_time
=
`
date
+%s
`
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录